[MCOL-5193] Scale up complex queries that tends to be processed by a single central UM node. Created: 2022-08-15  Updated: 2022-10-25

Status: Open
Project: MariaDB ColumnStore
Component/s: None
Affects Version/s: 22.08.1
Fix Version/s: Icebox

Type: New Feature Priority: Major
Reporter: Roman Assignee: Roman
Resolution: Unresolved Votes: 0
Labels: None


 Description   

It is a known fact that complex queries with nested sub-queries tend to be single-node processed except initial BPS or PM-side hash join steps. This reduces an importance of a multi-node cluster b/c UM(the node where CSEP was translated into a JobList) node processes most of the data. Here is the example of a complex query that processes all of the data at UM.

query 1
 SELECT i, t FROM
(SELECT s1.i,s1.t FROM 
	(SELECT i, t FROM cs1 GROUP BY i,t) s1 
INNER JOIN
	(SELECT i,t FROM cs1 GROUP BY i,t) s2
ON s1.i=s2.i AND s1.t=s2.t)
s3;
 
*************************** 1. row ***************************
calgettrace(0): 
Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows 
BPS  PM   cs1   3000     (i,t)             1   2   0   0.008   0    
TAS  UM   -     -        -                 -   -   -   0.007   0    
TNS  UM   -     -        -                 -   -   -   0.000   0    
BPS  PM   cs1   3000     (i,t)             0   2   0   0.008   0    
TAS  UM   -     -        -                 -   -   -   0.007   0    
TNS  UM   -     -        -                 -   -   -   0.000   0    
HJS  UM   s1-s2 -        -                 -   -   -   -----   -    
TNS  UM   -     -        -                 -   -   -   0.000   0    
TNS  UM   -     -        -                 -   -   -   0.000   0   

The suggestion here is to leverage the fact that GROUP BY operator stream tends to produce a smaller data set comparing to the original data set. Thus the original query will become

SELECT i, t FROM
(SELECT s1.i,s1.t FROM
	(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s01 GROUP BY i,t
	) s1
INNER JOIN
	(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s02 GROUP BY i,t
	) s2 
ON s1.i=s2.i AND s1.t=s2.t)
s3;

where s01 and s02 are run at every node in the cluster producing a partial GROUP BY operator stream(see [1] for idblocalpm and idbpm functions semantics) Given 3 nodes in the cluster we have s01@1, s01@2, s01@3 streams. The resulting s1 stream equals with s1 from the original query however the RAM and CPU resources consumption is distributed across 3 nodes(if the resulting s1 stream is smaller then the original). Such distribution reduces the timings + leverages the cluster resources more efficiently.

There is another scenario that benefits from a similar distributed UM processing, namely ORDER BY. Consider the query:

query 2
SELECT i, t FROM cs1 ORDER BY t;
 
*************************** 1. row ***************************
calgettrace(0): 
Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows 
BPS  PM   cs1   3000     (i,t)             1   1   0   0.004   0    
TNS  UM   -     -        -                 -   -   -   0.004   0    

Sorting takes place at the UM that translates CSEP into JobList and if we sort the data locally we would be able to leverage merge sort merging sorted sub-streams from the nodes of the cluster.

There are two approaches how to implement such distributed UM processing with a local PM data.

  • rewrite the queries using the optimizer/rewriter. The original query 1 will become [2]. It is harder to rewrite the ORDER BY query though.
  • introduce a new JobList step, namely Distributed Subquery Step

Here is the algo that JobList translator will use.
JobList translator detects if a certain sub-tree of CSEP can be replaced with a DSS. If there is one DSS replaces a sub-query of the original CSEP. DSS has its own CSEP that is distributed across all UMs with a local-only execution flag(uses local PM data for the processing only).

  • DSS distributes its CSEP to all UMs
  • DSS collects all the input streams
  • DSS processes the collected input streams with the additional JobList steps, e.g. aggregation
  • DSS outputs its stream to the output DL.

1. https://mariadb.com/kb/en/columnstore-information-functions/
2.

SELECT i, t FROM
(SELECT s1.i,s1.t FROM
	(SELECT UNION ALL 
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
		) s01 GROUP BY i,t
) s1
INNER JOIN
(SELECT s1.i,s1.t FROM
	(SELECT UNION ALL 
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
		SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
	) s02 GROUP BY i,t
) s2 
ON s1.i=s2.i AND s1.t=s2.t)
s3;


Generated at Thu Feb 08 02:56:02 UTC 2024 using Jira 8.20.16#820016-sha1:9d11dbea5f4be3d4cc21f03a88dd11d8c8687422.