Uploaded image for project: 'MariaDB Server'
  1. MariaDB Server
  2. MDEV-39491

ParallelQuery: split the table/index into chunks to read: Study

    XMLWordPrintable

Details

    Description

      Suppose we want to do a full table/index scan using N parallel workers.

      The idea is to split the index into N approximately equal portions, and then give each portion to a separate worker.

      This should be a storage-engine call. Reverse to what records_in_range() does.
      records_in_range() gets a key (i.e. range endpoint) and produces a percentage "this endpoint is at 0.12345 fraction of the table".
      Here, the idea would be to do the reverse: given a fraction=0.123456, dive into the table and position a read cursor at the specified fraction of the index.

      According to Monty, MySQL has the code to do this?

      Example of how to do this: Split the range

      suppose we want to a parallel full table scan with N=10 workers.
      In InnoDB, full table scan is a full index scan on a [hidden] primary key.

      The primary worker requests innodb to provide boundary points for 10%, 20% ... 90% of the table.
      It gets:

      range_endpoints= {  
        pk=val1,   -- this record is located after 10% of the table records
        pk=val2,     -- this record is located after 20% of the table records 
        ...
      }
      

      Then,

      • worker1 will be requested to scan the range: -inf < pk < val1
      • worker2 will be requested to scan the range: val1 <= pk < val2 (note the left endpoint is inclusve, the right is non-inclusive)
      • ...

      worker2 can position at its start location and then scan the rows it needs to scan by doing this:

      h->index_init(clustered_pk);
      h->index_read_map(val2);  // position at 20% of the records.
      then it will call
      h->index_next();
      

      Parallel read code in MySQL

      Storage Engine interface (unused)

      This is present but not used for anything in the public code.
      It seems it is used to do parallel data transfers from InnoDB to somewhere else (called RAPID at one location)

      Methods in ha_innobase, ha_innopart:

      • parallel_scan_init()
      • parallel_scan()
      • parallel_scan_end()

      Implementation uses Parallel_reader_adapter (see below) .
      parallel_scan() takes callback functions as arguments and passes them to the adapter.

      Parallel_reader_adapter

      It is in

      storage/innobase/row/row0pread-adapter.cc
      storage/innobase/include/row0pread-adapter.h
      

      This is used ONLY by ha_innobase::parallel_scan* methods. So, there is no publicly-available user of this.

      It uses Parallel_reader m_parallel_reader to do actual reads.
      Then, it converts the obtained records into MySQL format.
      It buffers them in per-thread buffers of ADAPTER_SEND_BUFFER_SIZE=2MB each.
      Then, it passes them to the callback it got from parallel_scan_XXX functions.

      Some functions:

      Parallel_reader_adapter::process_rows()
       - Calls this->send_batch()
       - converts row from InnoDB to MySQL 
      

      Parallel_reader_adapter::send_batch()  calls  m_load_fn() to pass records to the user.
      

      Parallel_reader_adapter::end()
      - Calls send_batch() to flush what is not sent yet.
      - Calls m_end_fn callback.
      

      Parallel_reader

      This does parallel reads. The code is here:

      storage/innobase/include/row0pread.h
      storage/innobase/row/row0pread.cc
      

      Users:

      • Parallel_reader_adapter, see above.
      • row_mysql_parallel_select_count_star() - COUNT(*) computation
      • parallel_check_table() - CHECK table.
      • Histogram_sampler - this is for ha_innobase::sample_init,sample_next,sample_end
      • ddl/ddl0par-scan.cc, Parallel_cursor - scans the index in parallel and inserts rows it sees into a secondary index.

      How Parallel_reader creates chunks to scan

      This is different from "Split the range" proposal described above.

      Short: they select "sub-trees" in the BTREE. This is not based on the desired fraction (e.g. give worker#1 10% of table). This is based on the BTREE shape (e.g. worker #1 will get the first child page of the root page and all its subtrees).

      Splitting into chunks ( they call them "shards") is done in two steps:
      1. The user thread creates shards for every subtree (equivalent to a range of keys) in the root node and puts the ranges into the task queue. The first `innodb_parallel_read_threads` shards are not marked, while subsequent are marked as "needs further splitting".

      2. When a worker thread fetches a task from the queue, first it checks the flag "needs further splitting". If it's not set, then the range is processed by the worker as is.

      3. If the flag is set, the worker doesn't process the range directly but instead it runs the same N-way split of the current subtree as was done for the root node and pushes the new sub-ranges into the task queue.
      This eliminates the situation of workers underutilization (for example: 5 root subtrees, 4 workers; after first 4 subtrees are processed, the last one remaining will be processed by a single worker while others will be idle).

      The maximum depth of the B-tree lookup is 2 (root node and one level below the root).
      The maximum number of pages read during the splitting is 1 for root node + (number of root node children - `innodb_parallel_read_threads`).

      (https://www.alibabacloud.com/blog/an-introduction-to-mysql-parallel-query-and-ddl_601920)

      Milestone 0.1: Do full table scan in chunks, in one thread

      When we see that on the first table in the join we're about to do a full scan and we have @@parallel_workers=N, N!=0 N!=1, then

      // handler_scan_range is an opaque structure where the storage engine describes what range in the index should be scanned.
      std::vector<handler_scan_range> scan_bounds;
      // this creates a list of disjoint and touching ranges for N workers.
      // the list may have N elements, or much more that that. If there are more elements than N, 
      // they will be given to workers in any (e.g. round robin) way.
      h->initialize_for_n_workers(&scan_bounds,  thd->variables.parallel_workers); (edited) 
      

      then, the SQL layer will do this:

      h->rnd_init(scan=true);
      h->position_scan_at(scan_bounds.elem(0));
      while (h->rnd_next()) != HA_ERR_END_OF_FILE) { process record; };
      h->position_scan_at(scan_bounds.elem(1));
      

      Attachments

        Issue Links

          Activity

            People

              oleg.smirnov Oleg Smirnov
              psergei Sergei Petrunia
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:

                Git Integration

                  Error rendering 'com.xiplink.jira.git.jira_git_plugin:git-issue-webpanel'. Please contact your Jira administrators.