Uploaded image for project: 'MariaDB ColumnStore'
  1. MariaDB ColumnStore
  2. MCOL-5193

Scale up complex queries that tends to be processed by a single central UM node.

    XMLWordPrintable

Details

    • New Feature
    • Status: Open (View Workflow)
    • Major
    • Resolution: Unresolved
    • 22.08.1
    • Icebox
    • None
    • None

    Description

      It is a known fact that complex queries with nested sub-queries tend to be single-node processed except initial BPS or PM-side hash join steps. This reduces an importance of a multi-node cluster b/c UM(the node where CSEP was translated into a JobList) node processes most of the data. Here is the example of a complex query that processes all of the data at UM.

      query 1
       SELECT i, t FROM
      (SELECT s1.i,s1.t FROM 
      	(SELECT i, t FROM cs1 GROUP BY i,t) s1 
      INNER JOIN
      	(SELECT i,t FROM cs1 GROUP BY i,t) s2
      ON s1.i=s2.i AND s1.t=s2.t)
      s3;
       
      *************************** 1. row ***************************
      calgettrace(0): 
      Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows 
      BPS  PM   cs1   3000     (i,t)             1   2   0   0.008   0    
      TAS  UM   -     -        -                 -   -   -   0.007   0    
      TNS  UM   -     -        -                 -   -   -   0.000   0    
      BPS  PM   cs1   3000     (i,t)             0   2   0   0.008   0    
      TAS  UM   -     -        -                 -   -   -   0.007   0    
      TNS  UM   -     -        -                 -   -   -   0.000   0    
      HJS  UM   s1-s2 -        -                 -   -   -   -----   -    
      TNS  UM   -     -        -                 -   -   -   0.000   0    
      TNS  UM   -     -        -                 -   -   -   0.000   0   
      

      The suggestion here is to leverage the fact that GROUP BY operator stream tends to produce a smaller data set comparing to the original data set. Thus the original query will become

      SELECT i, t FROM
      (SELECT s1.i,s1.t FROM
      	(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s01 GROUP BY i,t
      	) s1
      INNER JOIN
      	(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s02 GROUP BY i,t
      	) s2 
      ON s1.i=s2.i AND s1.t=s2.t)
      s3;
      

      where s01 and s02 are run at every node in the cluster producing a partial GROUP BY operator stream(see [1] for idblocalpm and idbpm functions semantics) Given 3 nodes in the cluster we have s01@1, s01@2, s01@3 streams. The resulting s1 stream equals with s1 from the original query however the RAM and CPU resources consumption is distributed across 3 nodes(if the resulting s1 stream is smaller then the original). Such distribution reduces the timings + leverages the cluster resources more efficiently.

      There is another scenario that benefits from a similar distributed UM processing, namely ORDER BY. Consider the query:

      query 2
      SELECT i, t FROM cs1 ORDER BY t;
       
      *************************** 1. row ***************************
      calgettrace(0): 
      Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows 
      BPS  PM   cs1   3000     (i,t)             1   1   0   0.004   0    
      TNS  UM   -     -        -                 -   -   -   0.004   0    
      

      Sorting takes place at the UM that translates CSEP into JobList and if we sort the data locally we would be able to leverage merge sort merging sorted sub-streams from the nodes of the cluster.

      There are two approaches how to implement such distributed UM processing with a local PM data.

      • rewrite the queries using the optimizer/rewriter. The original query 1 will become [2]. It is harder to rewrite the ORDER BY query though.
      • introduce a new JobList step, namely Distributed Subquery Step

      Here is the algo that JobList translator will use.
      JobList translator detects if a certain sub-tree of CSEP can be replaced with a DSS. If there is one DSS replaces a sub-query of the original CSEP. DSS has its own CSEP that is distributed across all UMs with a local-only execution flag(uses local PM data for the processing only).

      • DSS distributes its CSEP to all UMs
      • DSS collects all the input streams
      • DSS processes the collected input streams with the additional JobList steps, e.g. aggregation
      • DSS outputs its stream to the output DL.

      1. https://mariadb.com/kb/en/columnstore-information-functions/
      2.

      SELECT i, t FROM
      (SELECT s1.i,s1.t FROM
      	(SELECT UNION ALL 
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
      		) s01 GROUP BY i,t
      ) s1
      INNER JOIN
      (SELECT s1.i,s1.t FROM
      	(SELECT UNION ALL 
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
      		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
      	) s02 GROUP BY i,t
      ) s2 
      ON s1.i=s2.i AND s1.t=s2.t)
      s3;
      

      Attachments

        Activity

          People

            drrtuy Roman
            drrtuy Roman
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:

              Git Integration

                Error rendering 'com.xiplink.jira.git.jira_git_plugin:git-issue-webpanel'. Please contact your Jira administrators.