Status: Closed (View Workflow)
Resolution: Fixed
This issue covers the feature MCS lacks to pass query #2 from TPC-H test suit.
Here are some preliminary info.
Join in MCS can be run in two modes:
- centralized at UM
- distributed at PP
The next paragraph describes distributed processing at PP. The current Join processing model has two types of Join sides: small-side table and large-side table(it doesn't correlate with the cardinality of the tables). Small side tables are broadcasted to all PP to build a hashmap/-s and large side is processed row by row using small side hash maps.
Consider the query
SELECT c1, c2, c3 FROM f, dim1, dim2 WHERE
f.c1 = dim1.c1 AND dim1.c2 = dim2.c2 and dim1.c3 = (SELECT max(dim1.c3) FROM dim1, dim2 WHERE f.c1 = dim1.c1 AND dim1.c2 = dim2.c2);
This query's last WHERE expression filters on the maximum dim1.c3 value for the equi-JOIN condition f.c1 = dim1.c1 AND dim1.c2 = dim2.c2. The semantics of the the query is a equi-JOIN result where every record has dim1.c3 is the maximum for every combination f1.c1 = dim1.c1.
Let's consider more generic variant where max(dim1.c3) can be an expression of a single aggregate function of one of JOINED tables, e.g. min(dim1.c3)+1 or (if avg(dim1.c3) == 5 then X ELSE Y). Let's say conditions of a Correlated SubQuery are fully correlated thus they are the same as at one level above this CSQ, namely WHERE f.c1 = dim1.c1 AND dim1.c2 = dim2.c2. To find a correlated maximum of dim1.c3 for a given f.c1 = dim1.c1 MCS needs to have a GROUP BY operator results. Let's consider GROUP BY properties in the context of MCS code. GROUP BY works with dim1 contents that satisfies simple filters of the query(there is no such in this example). It is important to note that PP will have a full dim1 available at this point b/c all small side tables are broadcasted to all PP. The key column for this GROUP BY is dim1.c1 and the result of this aggregation is max(dim1.c3). In the context of MCS needs to construct UMRowAggregation for input RowGroup of dim1.c1 and dim1.c3. RowAggregation output RowGroup has a single column for max(dim1.c3). This RowAggregation can be constructed on the fly in parallel with PP constructing the hashmap for the top-level JOIN.
If PP has such UMRowAggregation instance available it will be able to look into UMRowAggregation using f1.c1 as a GROUP BY key value traversing large side table and applying an expression on top of UMRowAggregation result.
The comments will describe state of art implementations from the open source databases.
Here is the Postgres 14 query plan for the original TPC-H q2 without indices.
Hash Join (cost=68.38..217.00 rows=1 width=220)
Hash Cond: ((part.p_partkey = partsupp.ps_partkey) AND ((SubPlan 1) = partsupp.ps_supplycost))
-> Seq Scan on part (cost=0.00..16.15 rows=1 width=33)
Filter: (((p_type)::text ~~ '%STEEL'::text) AND (p_size = 5))
-> Hash (cost=67.78..67.78 rows=40 width=195)
-> Hash Join (cost=49.88..67.78 rows=40 width=195)
Hash Cond: (partsupp.ps_suppkey = supplier.s_suppkey)
-> Seq Scan on partsupp (cost=0.00..15.00 rows=500 width=12)
-> Hash (cost=49.67..49.67 rows=16 width=191)
-> Hash Join (cost=33.73..49.67 rows=16 width=191)
Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan on supplier (cost=0.00..14.20 rows=420 width=166)
-> Hash (cost=33.63..33.63 rows=8 width=33)
-> Hash Join (cost=16.54..33.63 rows=8 width=33)
Hash Cond: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on nation (cost=0.00..15.10 rows=510 width=37)
-> Hash (cost=16.50..16.50 rows=3 width=4)
-> Seq Scan on region (cost=0.00..16.50 rows=3 width=4)
Filter: (r_name = 'ASIA'::bpchar)
SubPlan 1
-> Aggregate (cost=66.21..66.22 rows=1 width=4)
-> Nested Loop (cost=32.14..66.21 rows=1 width=4)
Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
-> Hash Join (cost=32.14..49.25 rows=10 width=8)
Hash Cond: (nation_1.n_nationkey = supplier_1.s_nationkey)
-> Seq Scan on nation nation_1 (cost=0.00..15.10 rows=510 width=8)
-> Hash (cost=32.09..32.09 rows=4 width=8)
-> Hash Join (cost=16.27..32.09 rows=4 width=8)
Hash Cond: (supplier_1.s_suppkey = partsupp_1.ps_suppkey)
-> Seq Scan on supplier supplier_1 (cost=0.00..14.20 rows=420 width=8)
-> Hash (cost=16.25..16.25 rows=2 width=8)
-> Seq Scan on partsupp partsupp_1 (cost=0.00..16.25 rows=2 width=8)
Filter: (part.p_partkey = ps_partkey)
-> Materialize (cost=0.00..16.52 rows=3 width=4)
-> Seq Scan on region region_1 (cost=0.00..16.50 rows=3 width=4)
Filter: (r_name = 'ASIA'::bpchar)
As you can see Subplan 1 does an aggregation on top of number of hash joins with the top nested loop join. It is worth to mention that the plan will surely change with the data ingested.