Details
-
New Feature
-
Status: Open (View Workflow)
-
Major
-
Resolution: Unresolved
-
22.08.1
-
None
-
None
Description
It is a known fact that complex queries with nested sub-queries tend to be single-node processed except initial BPS or PM-side hash join steps. This reduces an importance of a multi-node cluster b/c UM(the node where CSEP was translated into a JobList) node processes most of the data. Here is the example of a complex query that processes all of the data at UM.
query 1
|
SELECT i, t FROM
|
(SELECT s1.i,s1.t FROM
|
(SELECT i, t FROM cs1 GROUP BY i,t) s1
|
INNER JOIN
|
(SELECT i,t FROM cs1 GROUP BY i,t) s2
|
ON s1.i=s2.i AND s1.t=s2.t)
|
s3;
|
|
*************************** 1. row ***************************
|
calgettrace(0):
|
Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows
|
BPS PM cs1 3000 (i,t) 1 2 0 0.008 0
|
TAS UM - - - - - - 0.007 0
|
TNS UM - - - - - - 0.000 0
|
BPS PM cs1 3000 (i,t) 0 2 0 0.008 0
|
TAS UM - - - - - - 0.007 0
|
TNS UM - - - - - - 0.000 0
|
HJS UM s1-s2 - - - - - ----- -
|
TNS UM - - - - - - 0.000 0
|
TNS UM - - - - - - 0.000 0
|
The suggestion here is to leverage the fact that GROUP BY operator stream tends to produce a smaller data set comparing to the original data set. Thus the original query will become
SELECT i, t FROM
|
(SELECT s1.i,s1.t FROM
|
(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s01 GROUP BY i,t
|
) s1
|
INNER JOIN
|
(SELECT i, t FROM (SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() GROUP BY i,t) s02 GROUP BY i,t
|
) s2
|
ON s1.i=s2.i AND s1.t=s2.t)
|
s3;
|
where s01 and s02 are run at every node in the cluster producing a partial GROUP BY operator stream(see [1] for idblocalpm and idbpm functions semantics) Given 3 nodes in the cluster we have s01@1, s01@2, s01@3 streams. The resulting s1 stream equals with s1 from the original query however the RAM and CPU resources consumption is distributed across 3 nodes(if the resulting s1 stream is smaller then the original). Such distribution reduces the timings + leverages the cluster resources more efficiently.
There is another scenario that benefits from a similar distributed UM processing, namely ORDER BY. Consider the query:
query 2
|
SELECT i, t FROM cs1 ORDER BY t;
|
|
*************************** 1. row ***************************
|
calgettrace(0):
|
Desc Mode Table TableOID ReferencedColumns PIO LIO PBE Elapsed Rows
|
BPS PM cs1 3000 (i,t) 1 1 0 0.004 0
|
TNS UM - - - - - - 0.004 0
|
Sorting takes place at the UM that translates CSEP into JobList and if we sort the data locally we would be able to leverage merge sort merging sorted sub-streams from the nodes of the cluster.
There are two approaches how to implement such distributed UM processing with a local PM data.
- rewrite the queries using the optimizer/rewriter. The original query 1 will become [2]. It is harder to rewrite the ORDER BY query though.
- introduce a new JobList step, namely Distributed Subquery Step
Here is the algo that JobList translator will use.
JobList translator detects if a certain sub-tree of CSEP can be replaced with a DSS. If there is one DSS replaces a sub-query of the original CSEP. DSS has its own CSEP that is distributed across all UMs with a local-only execution flag(uses local PM data for the processing only).
- DSS distributes its CSEP to all UMs
- DSS collects all the input streams
- DSS processes the collected input streams with the additional JobList steps, e.g. aggregation
- DSS outputs its stream to the output DL.
1. https://mariadb.com/kb/en/columnstore-information-functions/
2.
SELECT i, t FROM
|
(SELECT s1.i,s1.t FROM
|
(SELECT UNION ALL
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
|
) s01 GROUP BY i,t
|
) s1
|
INNER JOIN
|
(SELECT s1.i,s1.t FROM
|
(SELECT UNION ALL
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm1' GROUP BY i,t
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm2' GROUP BY i,t
|
SELECT i, t FROM cs1 WHERE idbpm(cs1.i) = idblocalpm() and idblocalpm() == 'pm3' GROUP BY i,t
|
) s02 GROUP BY i,t
|
) s2
|
ON s1.i=s2.i AND s1.t=s2.t)
|
s3;
|