Uploaded image for project: 'MariaDB Server'
  1. MariaDB Server
  2. MDEV-27717

Parallel execution on partitions in scans where multiple partitions are needed

Details

    • New Feature
    • Status: Open (View Workflow)
    • Critical
    • Resolution: Unresolved
    • None
    • Partitioning
    • None

    Description

      Currently when a table uses partitioning and multiple partitions only the needed partitions are scanned in queries that have WHERE clauses on the partitioning key. However for other queries all partitions need to be scanned and furthermore the scans are done sequentially.

      This task is to launch scans on different partitions in parallel instead of sequentially whenever more than one partition needs to be scanned (similarly to what the SPIDER engine does). This would allow the execution to use multiple cores (when available) and provide much faster overall execution times.

      Attachments

        Issue Links

          Activity

            sanja Oleksandr Byelkin added a comment - - edited

            All tasks which include paralell execution menan ability to clone structures or make them reenterant.

            So it shoud include as prerequisite reentrant Items (MDEV-6897) or non-existing mdev about cloning any structure (at least items).

            Cloning is simplier especially if cheat and make it via parser (and we already have bugs where it is needed) but reentrant Items is way more perspective thing (can solve more problems, especially for new arhitectures as ARM).

            sanja Oleksandr Byelkin added a comment - - edited All tasks which include paralell execution menan ability to clone structures or make them reenterant. So it shoud include as prerequisite reentrant Items ( MDEV-6897 ) or non-existing mdev about cloning any structure (at least items). Cloning is simplier especially if cheat and make it via parser (and we already have bugs where it is needed) but reentrant Items is way more perspective thing (can solve more problems, especially for new arhitectures as ARM).
            maxmether Max Mether added a comment - - edited

            sanja Considering that you can already emulate this with Spider (create a spider table where each partition just points to a local table that holds that partition) it shouldn't be that hard to do for partitioning... Can we not just do the same thing as with Spider? Or this there some kind of fundamental difference?

            For example, if we had a machine with 16 cores, we could create an InnoDB table and then create 16 views, with each view seeing a subset of the data. Finally, we would then create an additional Spider table that uses the 16 view as the partitions. Then we have parallel scans on the same table (as SPIDER does scans on different partitions in parallel) without having done any development. Would it not be possible to do the same inside partitioning?

            maxmether Max Mether added a comment - - edited sanja Considering that you can already emulate this with Spider (create a spider table where each partition just points to a local table that holds that partition) it shouldn't be that hard to do for partitioning... Can we not just do the same thing as with Spider? Or this there some kind of fundamental difference? For example, if we had a machine with 16 cores, we could create an InnoDB table and then create 16 views, with each view seeing a subset of the data. Finally, we would then create an additional Spider table that uses the 16 view as the partitions. Then we have parallel scans on the same table (as SPIDER does scans on different partitions in parallel) without having done any development. Would it not be possible to do the same inside partitioning?
            sanja Oleksandr Byelkin added a comment - - edited

            With spider it looks like some dirty hack which if we bring we will not know how to get rid of it in future. In sens (IMHO) it will be more harm than good. But it is my personal opinion.

            sanja Oleksandr Byelkin added a comment - - edited With spider it looks like some dirty hack which if we bring we will not know how to get rid of it in future. In sens (IMHO) it will be more harm than good. But it is my personal opinion.
            maxmether Max Mether added a comment - - edited

            I agree that we should not do exactly what spider does but have a "cleaner" solution. However considering the example below it seems like it shouldn't need re-entrant items, you just need to treat each partition as a table to some extent. Just saying that there should be an easier way to achieve this than needing the full re-entrant items feature.

            Right now partitions are scanned in sequence, but why is that? How and why would it change with re-entrant items?

            maxmether Max Mether added a comment - - edited I agree that we should not do exactly what spider does but have a "cleaner" solution. However considering the example below it seems like it shouldn't need re-entrant items, you just need to treat each partition as a table to some extent. Just saying that there should be an easier way to achieve this than needing the full re-entrant items feature. Right now partitions are scanned in sequence, but why is that? How and why would it change with re-entrant items?
            marko Marko Mäkelä added a comment - - edited

            Parallel execution of reads requires the ability to clone a pre-existing read view, so that each reader will see the same version of the data. This needs new handler interface. It could also require a slight change of semantics: I do not think that we can create the read view lazily when reading the first row from any partition that happens to be read first in any of the concurrent threads, but instead we must create and clone the read view upfront, before starting execution, as if START TRANSACTION WITH CONSISTENT SNAPSHOT had been invoked.

            We would need a new handlerton function for cloning a read view. For Spider, the read views are broken until MDEV-16610 has been fixed. In Spider, the read view creation is more complicated, because the view will have to be cloned across servers, and for that to be feasible, each server must share a sequence for generating transaction start and commit identifiers.

            marko Marko Mäkelä added a comment - - edited Parallel execution of reads requires the ability to clone a pre-existing read view, so that each reader will see the same version of the data. This needs new handler interface. It could also require a slight change of semantics: I do not think that we can create the read view lazily when reading the first row from any partition that happens to be read first in any of the concurrent threads, but instead we must create and clone the read view upfront, before starting execution, as if START TRANSACTION WITH CONSISTENT SNAPSHOT had been invoked. We would need a new handlerton function for cloning a read view. For Spider, the read views are broken until MDEV-16610 has been fixed. In Spider, the read view creation is more complicated, because the view will have to be cloned across servers, and for that to be feasible, each server must share a sequence for generating transaction start and commit identifiers.

            As far as i can see, the parallel execution on partitions can't work for engines as the pushdown conditions checks are not reentrant. (These are usually done in handler_index_cond_check/handler_rowid_filter_check callbacks)

            Then the problem is that some engines modify THD data, which has to be protected im this case.

            So seems too difficult task to do in general case.
            Though having that possibility is beneficial to the Spider engine. Now it implements similar
            parallelism internally but it is not natural to the handler level. And makes the code too complicated.
            I'd do the parallel execution limited for engines that are capable of doing it and adapt the Spider to that form of execution.

            holyfoot Alexey Botchkov added a comment - As far as i can see, the parallel execution on partitions can't work for engines as the pushdown conditions checks are not reentrant. (These are usually done in handler_index_cond_check/handler_rowid_filter_check callbacks) Then the problem is that some engines modify THD data, which has to be protected im this case. So seems too difficult task to do in general case. Though having that possibility is beneficial to the Spider engine. Now it implements similar parallelism internally but it is not natural to the handler level. And makes the code too complicated. I'd do the parallel execution limited for engines that are capable of doing it and adapt the Spider to that form of execution.
            TheWitness Larry Adams added a comment -

            The last time I tested spider, even though a table was broken into many pieces, the query execution through the various spider tables was still serial. One table on one server, followed by another table on another server, then finally to combine all the results. So, though there was a claim of parallelization, the queries did not actually occur in parallel, they simply leveraged different server for different pieces.

            I prefer the map reduce approach as it is the most simple to decompose the queries into parts and to take action on them. Say we have a queries like the two below (partitioned and union example)

            SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, AVG(memory) AS avgMemory
            FROM partitionted_table
            WHERE last_updated BETWEEN ? AND ? — partition by range timestamp. Yes yes, just wishful thinking
            GROUP BY cluster

            or

            SELECT cluster, SUM(jobs) AS jobs, MAX(maxCPU) AS maxCPU, SUM(memory) AS memory, SUM(total_memory)/SUM(jobs) AS avgMemory
            FROM (
            SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory
            FROM unpartitioned_tableA
            WHERE last_updated BETWEEN ? AND ?
            GROUP BY cluster
            UNION ALL
            SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory
            FROM unpartitioned_tableB
            WHERE last_updated BETWEEN ? AND ?
            GROUP BY cluster
            ) AS rs
            GROUP BY cluster;

            You sould notice how I achieved the same outcome in the UNION case by simply abstracting the AVG() into it's parts in the union sub-queries. This same abstraction can be done with other consolidation functions and also works if you build a UNION using the PARTITION keyword on the resulting UNION queries. Consolidation functions like SUM(), MIN(), MAX() and AVG() are easy. Others not so easy.

            Each of these cases can be run in a map reduce fashion:

            For the partitioned case:
            1) Determine the partitions affected by the range query (on a timestamp, again just wishful thinking)
            2) Use the appropriate abstraction functions to change the form of the query using whatever logic is available to abstract
            3) Launch X parallel queries and store the results in an internal temporary table by PARTITION.
            4) Reduce the set using again the abstracted functions from the temporary table.
            5) Present the results to the user

            For the union case
            1) Determine any abstractions that are not already defined as in my case above to allow the unions to be independent of one another
            2) Move the WHERE and GROUP BY clauses under each specific UNION
            3) Launch X parallel queries and store the results in an internal temporary table by UNION
            4) Reduce the set using again the abstracted function from the temporary table.
            5) Present the results to the user

            In each of these cases, you can gain an X times speedup by simply using the consolidation function abstractions from a library of common abstractions and launching X threads. It's almost linear acceleration (not the kind I learned back in my Electrical Engineering days).

            In a single server case, you can simply have this map reduce algorithm buried into the query logic to search it for opportunities to parallelize, and then control the number of threads by configuration.

            In a MaxScale case, you could do this before the queries are sent to the various servers and handle the parallelization right in MaxScale. Again, the number of threads could be handled by configuration.

            I was planning on writing this in Cacti for certain table cases like for log searches or network flow analysis since it did not exist in either MySQL or MariaDB natively, but it keeps getting pushed down my priority list as there is so much more to do in Cacti. I continue to have hopes that this can be done internally by my preferred tool... MariaDB

            Be well.

            Larry

            TheWitness Larry Adams added a comment - The last time I tested spider, even though a table was broken into many pieces, the query execution through the various spider tables was still serial. One table on one server, followed by another table on another server, then finally to combine all the results. So, though there was a claim of parallelization, the queries did not actually occur in parallel, they simply leveraged different server for different pieces. I prefer the map reduce approach as it is the most simple to decompose the queries into parts and to take action on them. Say we have a queries like the two below (partitioned and union example) SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, AVG(memory) AS avgMemory FROM partitionted_table WHERE last_updated BETWEEN ? AND ? — partition by range timestamp. Yes yes, just wishful thinking GROUP BY cluster or SELECT cluster, SUM(jobs) AS jobs, MAX(maxCPU) AS maxCPU, SUM(memory) AS memory, SUM(total_memory)/SUM(jobs) AS avgMemory FROM ( SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory FROM unpartitioned_tableA WHERE last_updated BETWEEN ? AND ? GROUP BY cluster UNION ALL SELECT cluster, COUNT AS jobs, MAX(cpuTime) AS maxCPU, SUM(memory) AS memory, SUM(memory) AS total_memory FROM unpartitioned_tableB WHERE last_updated BETWEEN ? AND ? GROUP BY cluster ) AS rs GROUP BY cluster; You sould notice how I achieved the same outcome in the UNION case by simply abstracting the AVG() into it's parts in the union sub-queries. This same abstraction can be done with other consolidation functions and also works if you build a UNION using the PARTITION keyword on the resulting UNION queries. Consolidation functions like SUM(), MIN(), MAX() and AVG() are easy. Others not so easy. Each of these cases can be run in a map reduce fashion: For the partitioned case: 1) Determine the partitions affected by the range query (on a timestamp, again just wishful thinking) 2) Use the appropriate abstraction functions to change the form of the query using whatever logic is available to abstract 3) Launch X parallel queries and store the results in an internal temporary table by PARTITION. 4) Reduce the set using again the abstracted functions from the temporary table. 5) Present the results to the user For the union case 1) Determine any abstractions that are not already defined as in my case above to allow the unions to be independent of one another 2) Move the WHERE and GROUP BY clauses under each specific UNION 3) Launch X parallel queries and store the results in an internal temporary table by UNION 4) Reduce the set using again the abstracted function from the temporary table. 5) Present the results to the user In each of these cases, you can gain an X times speedup by simply using the consolidation function abstractions from a library of common abstractions and launching X threads. It's almost linear acceleration (not the kind I learned back in my Electrical Engineering days). In a single server case, you can simply have this map reduce algorithm buried into the query logic to search it for opportunities to parallelize, and then control the number of threads by configuration. In a MaxScale case, you could do this before the queries are sent to the various servers and handle the parallelization right in MaxScale. Again, the number of threads could be handled by configuration. I was planning on writing this in Cacti for certain table cases like for log searches or network flow analysis since it did not exist in either MySQL or MariaDB natively, but it keeps getting pushed down my priority list as there is so much more to do in Cacti. I continue to have hopes that this can be done internally by my preferred tool... MariaDB Be well. Larry

            People

              holyfoot Alexey Botchkov
              maxmether Max Mether
              Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated:

                Git Integration

                  Error rendering 'com.xiplink.jira.git.jira_git_plugin:git-issue-webpanel'. Please contact your Jira administrators.