[MDEV-27717] Parallel execution on partitions in scans where multiple partitions are needed Created: 2020-04-23  Updated: 2024-02-01

Status: Open
Project: MariaDB Server
Component/s: Partitioning
Fix Version/s: None

Type: New Feature Priority: Critical
Reporter: Max Mether Assignee: Alexey Botchkov
Resolution: Unresolved Votes: 0
Labels: None

Issue Links:
Blocks
blocks MDEV-28596 Replace Spider's parallel search feat... Open
is blocked by MDEV-6897 Making Items reentrant Open
PartOf
is part of MDEV-22162 Make partitions more flexible to use Open
Relates
relates to MDEV-16610 Torn reads from spider, with XA and i... Open
relates to MDEV-6096 Ideas about parallel query execution Open

 Description   

Currently when a table uses partitioning and multiple partitions only the needed partitions are scanned in queries that have WHERE clauses on the partitioning key. However for other queries all partitions need to be scanned and furthermore the scans are done sequentially.

This task is to launch scans on different partitions in parallel instead of sequentially whenever more than one partition needs to be scanned (similarly to what the SPIDER engine does). This would allow the execution to use multiple cores (when available) and provide much faster overall execution times.



 Comments   
Comment by Oleksandr Byelkin [ 2020-08-25 ]

All tasks which include paralell execution menan ability to clone structures or make them reenterant.

So it shoud include as prerequisite reentrant Items (MDEV-6897) or non-existing mdev about cloning any structure (at least items).

Cloning is simplier especially if cheat and make it via parser (and we already have bugs where it is needed) but reentrant Items is way more perspective thing (can solve more problems, especially for new arhitectures as ARM).

Comment by Max Mether [ 2020-09-04 ]

sanja Considering that you can already emulate this with Spider (create a spider table where each partition just points to a local table that holds that partition) it shouldn't be that hard to do for partitioning... Can we not just do the same thing as with Spider? Or this there some kind of fundamental difference?

For example, if we had a machine with 16 cores, we could create an InnoDB table and then create 16 views, with each view seeing a subset of the data. Finally, we would then create an additional Spider table that uses the 16 view as the partitions. Then we have parallel scans on the same table (as SPIDER does scans on different partitions in parallel) without having done any development. Would it not be possible to do the same inside partitioning?

Comment by Oleksandr Byelkin [ 2020-09-04 ]

With spider it looks like some dirty hack which if we bring we will not know how to get rid of it in future. In sens (IMHO) it will be more harm than good. But it is my personal opinion.

Comment by Max Mether [ 2020-09-04 ]

I agree that we should not do exactly what spider does but have a "cleaner" solution. However considering the example below it seems like it shouldn't need re-entrant items, you just need to treat each partition as a table to some extent. Just saying that there should be an easier way to achieve this than needing the full re-entrant items feature.

Right now partitions are scanned in sequence, but why is that? How and why would it change with re-entrant items?

Comment by Marko Mäkelä [ 2020-11-12 ]

Parallel execution of reads requires the ability to clone a pre-existing read view, so that each reader will see the same version of the data. This needs new handler interface. It could also require a slight change of semantics: I do not think that we can create the read view lazily when reading the first row from any partition that happens to be read first in any of the concurrent threads, but instead we must create and clone the read view upfront, before starting execution, as if START TRANSACTION WITH CONSISTENT SNAPSHOT had been invoked.

We would need a new handlerton function for cloning a read view. For Spider, the read views are broken until MDEV-16610 has been fixed. In Spider, the read view creation is more complicated, because the view will have to be cloned across servers, and for that to be feasible, each server must share a sequence for generating transaction start and commit identifiers.

Comment by Alexey Botchkov [ 2022-02-07 ]

As far as i can see, the parallel execution on partitions can't work for engines as the pushdown conditions checks are not reentrant. (These are usually done in handler_index_cond_check/handler_rowid_filter_check callbacks)

Then the problem is that some engines modify THD data, which has to be protected im this case.

So seems too difficult task to do in general case.
Though having that possibility is beneficial to the Spider engine. Now it implements similar
parallelism internally but it is not natural to the handler level. And makes the code too complicated.
I'd do the parallel execution limited for engines that are capable of doing it and adapt the Spider to that form of execution.

Comment by Larry Adams [ 2023-09-12 ]

The last time I tested spider, even though a table was broken into many pieces, the query execution through the various spider tables was still serial. One table on one server, followed by another table on another server, then finally to combine all the results. So, though there was a claim of parallelization, the queries did not actually occur in parallel, they simply leveraged different server for different pieces.

I prefer the map reduce approach as it is the most simple to decompose the queries into parts and to take action on them. Say we have a queries like the two below (partitioned and union example)

SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, AVG(memory) AS avgMemory
FROM partitionted_table
WHERE last_updated BETWEEN ? AND ? — partition by range timestamp. Yes yes, just wishful thinking
GROUP BY cluster

or

SELECT cluster, SUM(jobs) AS jobs, MAX(maxCPU) AS maxCPU, SUM(memory) AS memory, SUM(total_memory)/SUM(jobs) AS avgMemory
FROM (
SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory
FROM unpartitioned_tableA
WHERE last_updated BETWEEN ? AND ?
GROUP BY cluster
UNION ALL
SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory
FROM unpartitioned_tableB
WHERE last_updated BETWEEN ? AND ?
GROUP BY cluster
) AS rs
GROUP BY cluster;

You sould notice how I achieved the same outcome in the UNION case by simply abstracting the AVG() into it's parts in the union sub-queries. This same abstraction can be done with other consolidation functions and also works if you build a UNION using the PARTITION keyword on the resulting UNION queries. Consolidation functions like SUM(), MIN(), MAX() and AVG() are easy. Others not so easy.

Each of these cases can be run in a map reduce fashion:

For the partitioned case:
1) Determine the partitions affected by the range query (on a timestamp, again just wishful thinking)
2) Use the appropriate abstraction functions to change the form of the query using whatever logic is available to abstract
3) Launch X parallel queries and store the results in an internal temporary table by PARTITION.
4) Reduce the set using again the abstracted functions from the temporary table.
5) Present the results to the user

For the union case
1) Determine any abstractions that are not already defined as in my case above to allow the unions to be independent of one another
2) Move the WHERE and GROUP BY clauses under each specific UNION
3) Launch X parallel queries and store the results in an internal temporary table by UNION
4) Reduce the set using again the abstracted function from the temporary table.
5) Present the results to the user

In each of these cases, you can gain an X times speedup by simply using the consolidation function abstractions from a library of common abstractions and launching X threads. It's almost linear acceleration (not the kind I learned back in my Electrical Engineering days).

In a single server case, you can simply have this map reduce algorithm buried into the query logic to search it for opportunities to parallelize, and then control the number of threads by configuration.

In a MaxScale case, you could do this before the queries are sent to the various servers and handle the parallelization right in MaxScale. Again, the number of threads could be handled by configuration.

I was planning on writing this in Cacti for certain table cases like for log searches or network flow analysis since it did not exist in either MySQL or MariaDB natively, but it keeps getting pushed down my priority list as there is so much more to do in Cacti. I continue to have hopes that this can be done internally by my preferred tool... MariaDB

Be well.

Larry

Generated at Thu Feb 08 09:55:02 UTC 2024 using Jira 8.20.16#820016-sha1:9d11dbea5f4be3d4cc21f03a88dd11d8c8687422.