diff --git a/contrib/pax_storage/expected/cbdb_parallel.out b/contrib/pax_storage/expected/cbdb_parallel.out index db583090026..ec6ceba7e3c 100644 --- a/contrib/pax_storage/expected/cbdb_parallel.out +++ b/contrib/pax_storage/expected/cbdb_parallel.out @@ -41,13 +41,29 @@ set gp_appendonly_insert_files = 4; begin; set local enable_parallel = on; create table test_131_ao1(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao2(x int, y int) using ao_row with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao3(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_ao4(x int, y int) using ao_row with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco1(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco2(x int, y int) using ao_column with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco3(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table test_131_aoco4(x int, y int) using ao_column with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. select relname, reloptions from pg_catalog.pg_class where relname like 'test_131_ao%'; relname | reloptions ----------------+---------------------- @@ -155,8 +171,14 @@ explain(locus, costs off) select count(*) from test_131_aoco3, test_131_aoco4 wh abort; create table ao1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table ao2(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table aocs1(x int, y int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'x' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. begin; -- encourage use of parallel plans set local min_parallel_table_scan_size = 0; @@ -367,6 +389,8 @@ abort; begin; set local max_parallel_workers_per_gather = 2; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt1(a int, b int) with(parallel_workers=2) distributed replicated; create table rt2(a int, b int) distributed replicated; create table rt3(a int, b int) distributed replicated; @@ -599,6 +623,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -606,8 +632,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 6 | 7 | 5 | 6 | 6 | 7 7 | 8 | 6 | 7 | 7 | 8 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -650,13 +674,6 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -669,6 +686,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 9 | 10 | 8 | 9 | 9 | 10 1 | 2 | 1 | 1 | 1 | 2 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -702,6 +726,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -712,8 +738,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -779,6 +803,8 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; (19 rows) create table t2(a int, b int) with(parallel_workers=0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table rt4(a int, b int) with(parallel_workers=2) distributed replicated; insert into t2 select i, i+1 from generate_series(1, 10) i; insert into rt4 select i, i+1 from generate_series(1, 10000) i; @@ -788,16 +814,16 @@ set local enable_parallel = off; select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 (10 rows) set local enable_parallel = on; @@ -828,19 +854,21 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 + 6 | 5 | 5 + 7 | 6 | 6 + 10 | 9 | 9 + 11 | 10 | 10 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 - 6 | 5 | 5 - 7 | 6 | 6 - 10 | 9 | 9 - 11 | 10 | 10 + 2 | 1 | 1 (10 rows) create table t3(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t3 select i, i+1 from generate_series(1, 9000) i; analyze t3; set local enable_parallel = off; @@ -919,10 +947,10 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 2 | 3 | 3 4 | 5 | 5 + 3 | 4 | 4 5 | 6 | 6 (5 rows) @@ -931,11 +959,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 3 | 4 | 4 - 1 | 2 | 2 - 4 | 5 | 5 5 | 6 | 6 + 4 | 5 | 5 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -976,11 +1004,11 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- - 5 | 6 | 6 1 | 2 | 2 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 + 5 | 6 | 6 (5 rows) -- non parallel results @@ -1028,14 +1056,14 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1045,9 +1073,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1059,7 +1087,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -1071,17 +1103,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1091,7 +1123,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i%10, i from generate_series(1, 5) i; insert into t1 values (100000); insert into t2 select i%10, i from generate_series(1, 100000) i; @@ -1100,34 +1136,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1323,12 +1359,18 @@ begin; create table rt1(a int, b int) distributed replicated; create table rt2(a int, b int) with (parallel_workers = 0) distributed replicated; create table t1(a int, b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(a int, b int) with (parallel_workers = 0); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10000) i; insert into t2 select i, i+1 from generate_series(1, 10000) i; insert into rt1 select i, i+1 from generate_series(1, 10000) i; insert into rt2 select i, i+1 from generate_series(1, 10000) i; CREATE TABLE sq1 AS SELECT a, b FROM t1 WHERE gp_segment_id = 0; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. set local optimizer=off; set local enable_parallel=on; set local min_parallel_table_scan_size to 0; @@ -1385,7 +1427,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1409,7 +1451,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 2 - One-Time Filter: (gp_execution_segment() = 1) + One-Time Filter: (gp_execution_segment() = 0) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 2 @@ -1482,6 +1524,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1549,6 +1593,8 @@ abort; -- begin; create table t1(c1 int, c2 int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 100000) i; analyze t1; set local optimizer = off; @@ -1768,6 +1814,8 @@ set local optimizer = off; set local enable_parallel = on; -- ao table create table ao (a INT, b INT) using ao_row; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into ao select i as a, i as b from generate_series(1, 100) AS i; alter table ao set (parallel_workers = 2); explain(costs off) select count(*) from ao; @@ -1789,6 +1837,8 @@ select count(*) from ao; alter table ao reset (parallel_workers); -- aocs table create table aocs (a INT, b INT) using ao_column; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into aocs select i as a, i as b from generate_series(1, 100) AS i; alter table aocs set (parallel_workers = 2); explain(costs off) select count(*) from aocs; @@ -1862,9 +1912,14 @@ select * from abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -1939,7 +1994,11 @@ abort; -- begin; create table t1(a int, b int) with(parallel_workers=3); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2(b int, a int) with(parallel_workers=2); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t1 select i, i+1 from generate_series(1, 10) i; insert into t2 select i, i+1 from generate_series(1, 5) i; analyze t1; @@ -2329,6 +2388,8 @@ abort; -- prepare, execute locus is null begin; create table t1(c1 int, c2 int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'c1' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze t1; prepare t1_count(integer) as select count(*) from t1; explain(locus, costs off) execute t1_count(1); diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 9e3697a3b03..e9d7dac9895 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -3112,8 +3112,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, case JOIN_UNIQUE_INNER: case JOIN_RIGHT: case JOIN_FULL: - /* Join types are not supported in parallel yet. */ - goto fail; + outer.ok_to_replicate = false; + inner.ok_to_replicate = false; + break; case JOIN_DEDUP_SEMI: if (!enable_parallel_dedup_semi_join) goto fail; diff --git a/src/backend/cdb/cdbpathlocus.c b/src/backend/cdb/cdbpathlocus.c index 29930085429..dddae1aa64c 100644 --- a/src/backend/cdb/cdbpathlocus.c +++ b/src/backend/cdb/cdbpathlocus.c @@ -119,6 +119,11 @@ cdbpathlocus_equal(CdbPathLocus a, CdbPathLocus b) list_length(a.distkey) != list_length(b.distkey)) return false; + /* + * CBDB_PARALLEL: What if both a and b are HashedOJ with parallel workers > 0 ? + * Are they equal in practice? + */ + if ((CdbPathLocus_IsHashed(a) || CdbPathLocus_IsHashedOJ(a)) && (CdbPathLocus_IsHashed(b) || CdbPathLocus_IsHashedOJ(b))) return cdbpath_distkey_equal(a.distkey, b.distkey); @@ -544,7 +549,7 @@ cdbpathlocus_from_subquery(struct PlannerInfo *root, else { Assert(CdbPathLocus_IsHashedOJ(subpath->locus)); - CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&locus, distkeys, numsegments, subpath->locus.parallel_workers); } } else @@ -711,7 +716,7 @@ cdbpathlocus_pull_above_projection(struct PlannerInfo *root, CdbPathLocus_MakeHashedWorkers(&newlocus, newdistkeys, numsegments, locus.parallel_workers); } else - CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&newlocus, newdistkeys, numsegments, locus.parallel_workers); return newlocus; } else @@ -880,7 +885,7 @@ cdbpathlocus_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b) newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, 0 /* Both are 0 parallel here*/); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; @@ -1236,8 +1241,14 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo Assert(cdbpathlocus_is_valid(a)); Assert(cdbpathlocus_is_valid(b)); - /* Do both input rels have same locus? */ - if (cdbpathlocus_equal(a, b)) + /* + * Do both input rels have same locus? + * CBDB_PARALLEL: for FULL JOIN, it could be different even both + * are same loucs. Because the NULL values could be on any segments + * after join. + */ + + if (jointype != JOIN_FULL && cdbpathlocus_equal(a, b)) return a; /* @@ -1412,8 +1423,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo * If inner is hashed workers, and outer is hashed. Join locus will be hashed. * If outer is hashed workers, and inner is hashed. Join locus will be hashed workers. * Seems we should just return outer locus anyway. + * Things changed since we have parallel full join now. */ - if (parallel_aware) + if (parallel_aware && jointype != JOIN_FULL) return a; numsegments = CdbPathLocus_NumSegments(a); @@ -1469,7 +1481,9 @@ cdbpathlocus_parallel_join(JoinType jointype, CdbPathLocus a, CdbPathLocus b, bo newdistkeys = lappend(newdistkeys, newdistkey); } - CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments); + Assert(CdbPathLocus_NumParallelWorkers(a) == CdbPathLocus_NumParallelWorkers(b)); + + CdbPathLocus_MakeHashedOJ(&resultlocus, newdistkeys, numsegments, CdbPathLocus_NumParallelWorkers(a)); } Assert(cdbpathlocus_is_valid(resultlocus)); return resultlocus; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 62d3c2da790..c084f7e7c78 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -2004,6 +2004,7 @@ ExecParallelHashTableInsert(HashJoinTable hashtable, /* Store the hash value in the HashJoinTuple header. */ hashTuple->hashvalue = hashvalue; memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len); + HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); /* Push it onto the front of the bucket's list */ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], @@ -2388,6 +2389,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate) hjstate->hj_CurTuple = NULL; } +/* + * Decide if this process is allowed to run the unmatched scan. If so, the + * batch barrier is advanced to PHJ_BATCH_SCAN and true is returned. + * Otherwise the batch is detached and false is returned. + */ +bool +ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING); + + /* + * It would not be deadlock-free to wait on the batch barrier, because it + * is in PHJ_BATCH_PROBING phase, and thus processes attached to it have + * already emitted tuples. Therefore, we'll hold a wait-free election: + * only one process can continue to the next phase, and all others detach + * from this batch. They can still go any work on other batches, if there + * are any. + */ + if (!BarrierArriveAndDetachExceptLast(&batch->batch_barrier)) + { + /* This process considers the batch to be done. */ + hashtable->batches[hashtable->curbatch].done = true; + + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track largest batch we've seen, which would normally happen in + * ExecHashTableDetachBatch(). + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + hashtable->curbatch = -1; + return false; + } + + /* Now we are alone with this batch. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + Assert(BarrierParticipants(&batch->batch_barrier) == 1); + + /* + * Has another process decided to give up early and command all processes + * to skip the unmatched scan? + */ + if (batch->skip_unmatched) + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + return false; + } + + /* Now prepare the process local state, just as for non-parallel join. */ + ExecPrepHashTableForUnmatched(hjstate); + + return true; +} + /* * ExecScanHashTableForUnmatched * scan the hash table for unmatched inner tuples @@ -2462,6 +2526,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext) return false; } +/* + * ExecParallelScanHashTableForUnmatched + * scan the hash table for unmatched inner tuples, in parallel join + * + * On success, the inner tuple is stored into hjstate->hj_CurTuple and + * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot + * for the latter. + */ +bool +ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext) +{ + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; + + for (;;) + { + /* + * hj_CurTuple is the address of the tuple last returned from the + * current bucket, or NULL if it's time to start scanning a new + * bucket. + */ + if (hashTuple != NULL) + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + else if (hjstate->hj_CurBucketNo < hashtable->nbuckets) + hashTuple = ExecParallelHashFirstTuple(hashtable, + hjstate->hj_CurBucketNo++); + else + break; /* finished all buckets */ + + while (hashTuple != NULL) + { + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple))) + { + TupleTableSlot *inntuple; + + /* insert hashtable's tuple into exec slot */ + inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple), + hjstate->hj_HashTupleSlot, + false); /* do not pfree */ + econtext->ecxt_innertuple = inntuple; + + /* + * Reset temp memory each time; although this function doesn't + * do any qual eval, the caller will, so let's keep it + * parallel to ExecScanHashBucket. + */ + ResetExprContext(econtext); + + hjstate->hj_CurTuple = hashTuple; + return true; + } + + hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); + } + + /* allow this loop to be cancellable */ + CHECK_FOR_INTERRUPTS(); + } + + /* + * no more unmatched tuples + */ + return false; +} + /* * ExecHashTableReset * @@ -3793,6 +3923,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->outer_eof = false; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), hashtable->hjstate->worker_id, @@ -3838,25 +3969,63 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) { int curbatch = hashtable->curbatch; ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + bool attached = true; /* Make sure any temporary files are closed. */ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); - /* Detach from the batch we were last working on. */ + /* After attaching we always get at least to PHJ_BATCH_PROBING. */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING || + BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_SCAN); + + /* + * If we're abandoning the PHJ_BATCH_PROBING phase early without having + * reached the end of it, it means the plan doesn't want any more + * tuples, and it is happy to abandon any tuples buffered in this + * process's subplans. For correctness, we can't allow any process to + * execute the PHJ_BATCH_SCAN phase, because we will never have the + * complete set of match bits. Therefore we skip emitting unmatched + * tuples in all backends (if this is a full/right join), as if those + * tuples were all due to be emitted by this process and it has + * abandoned them too. + */ /* * CBDB_PARALLEL: Parallel Hash Left Anti Semi (Not-In) Join(parallel-aware) * If phs_lasj_has_null is true, that means we have found null when building hash table, * there were no batches to detach. */ - if (!hashtable->parallel_state->phs_lasj_has_null && BarrierArriveAndDetach(&batch->batch_barrier)) + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && + !hashtable->parallel_state->phs_lasj_has_null && /* CBDB_PARALLEL */ + !hashtable->batches[curbatch].outer_eof) + { + /* + * This flag may be written to by multiple backends during + * PHJ_BATCH_PROBING phase, but will only be read in PHJ_BATCH_SCAN + * phase so requires no extra locking. + */ + batch->skip_unmatched = true; + } + + /* + * Even if we aren't doing a full/right outer join, we'll step through + * the PHJ_BATCH_SCAN phase just to maintain the invariant that + * freeing happens in PHJ_BATCH_FREE, but that'll be wait-free. + */ + if (BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_PROBING && + !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */) + attached = BarrierArriveAndDetachExceptLast(&batch->batch_barrier); + if (attached && !hashtable->parallel_state->phs_lasj_has_null /* CBDB_PARALLEL */ && + BarrierArriveAndDetach(&batch->batch_barrier)) { /* - * Technically we shouldn't access the barrier because we're no - * longer attached, but since there is no way it's moving after - * this point it seems safe to make the following assertion. + * We are not longer attached to the batch barrier, but we're the + * process that was chosen to free resources and it's safe to + * assert the current phase. The ParallelHashJoinBatch can't go + * away underneath us while we are attached to the build barrier, + * making this access safe. */ - Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); /* Free shared chunks and buckets. */ while (DsaPointerIsValid(batch->chunks)) diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9ec70f16e31..a28e6a14cdb 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -80,11 +80,12 @@ * aren't enough to go around. For each batch there is a separate barrier * with the following phases: * - * PHJ_BATCH_ELECTING -- initial state - * PHJ_BATCH_ALLOCATING -- one allocates buckets - * PHJ_BATCH_LOADING -- all load the hash table from disk - * PHJ_BATCH_PROBING -- all probe - * PHJ_BATCH_DONE -- end + * PHJ_BATCH_ELECT -- initial state + * PHJ_BATCH_ALLOCATE* -- one allocates buckets + * PHJ_BATCH_LOAD -- all load the hash table from disk + * PHJ_BATCH_PROBING -- all probe + * PHJ_BATCH_SCAN* -- one does full/right unmatched scan + * PHJ_BATCH_FREE* -- one frees memory * * Batch 0 is a special case, because it starts out in phase * PHJ_BATCH_PROBING; populating batch 0's hash table is done during @@ -97,11 +98,17 @@ * * To avoid deadlocks, we never wait for any barrier unless it is known that * all other backends attached to it are actively executing the node or have - * already arrived. Practically, that means that we never return a tuple - * while attached to a barrier, unless the barrier has reached its final - * state. In the slightly special case of the per-batch barrier, we return - * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use - * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. + * finished. Practically, that means that we never emit a tuple while attached + * to a barrier, unless the barrier has reached a phase that means that no + * process will wait on it again. We emit tuples while attached to the build + * barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase + * PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_SCAN + * respectively without waiting, using BarrierArriveAndDetach() and + * BarrierArriveAndDetachExceptLast() respectively. The last to detach + * receives a different return value so that it knows that it's safe to + * clean up. Any straggler process that attaches after that phase is reached + * will see that it's too late to participate or access the relevant shared + * memory objects. * *------------------------------------------------------------------------- */ @@ -493,8 +500,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (HJ_FILL_INNER(node)) { /* set up to scan for unmatched inner tuples */ - ExecPrepHashTableForUnmatched(node); - node->hj_JoinState = HJ_FILL_INNER_TUPLES; + if (parallel) + { + /* + * Only one process is currently allow to handle + * each batch's unmatched tuples, in a parallel + * join. + */ + if (ExecParallelPrepHashTableForUnmatched(node)) + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + else + node->hj_JoinState = HJ_NEED_NEW_BATCH; + } + else + { + ExecPrepHashTableForUnmatched(node); + node->hj_JoinState = HJ_FILL_INNER_TUPLES; + } } else node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -605,25 +627,13 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) { node->hj_MatchedOuter = true; - if (parallel) - { - /* - * Full/right outer joins are currently not supported - * for parallel joins, so we don't need to set the - * match bit. Experiments show that it's worth - * avoiding the shared memory traffic on large - * systems. - */ - Assert(!HJ_FILL_INNER(node)); - } - else - { - /* - * This is really only needed if HJ_FILL_INNER(node), - * but we'll avoid the branch and just set it always. - */ + + /* + * This is really only needed if HJ_FILL_INNER(node), but + * we'll avoid the branch and just set it always. + */ + if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple))) HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple)); - } /* In an antijoin, we never return a matched tuple */ if (node->js.jointype == JOIN_ANTI || @@ -682,7 +692,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) * so any unmatched inner tuples in the hashtable have to be * emitted before we continue to the next batch. */ - if (!ExecScanHashTableForUnmatched(node, econtext)) + if (!(parallel ? ExecParallelScanHashTableForUnmatched(node, econtext) + : ExecScanHashTableForUnmatched(node, econtext))) { /* no more unmatched tuples */ node->hj_JoinState = HJ_NEED_NEW_BATCH; @@ -1241,6 +1252,8 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, } /* End of this batch */ + hashtable->batches[curbatch].outer_eof = true; + return NULL; } @@ -1521,15 +1534,34 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * hash table stays alive until everyone's finished * probing it, but no participant is allowed to wait at * this barrier again (or else a deadlock could occur). - * All attached participants must eventually call - * BarrierArriveAndDetach() so that the final phase - * PHJ_BATCH_DONE can be reached. + * All attached participants must eventually detach from + * the barrier and one worker must advance the phase so + * that the final phase is reached. */ ExecParallelHashTableSetCurrentBatch(hashtable, batchno); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + return true; + case PHJ_BATCH_SCAN: + + /* + * In principle, we could help scan for unmatched tuples, + * since that phase is already underway (the thing we + * can't do under current deadlock-avoidance rules is wait + * for others to arrive at PHJ_BATCH_SCAN, because + * PHJ_BATCH_PROBING emits tuples, but in this case we just + * got here without waiting). That is not yet done. For + * now, we just detach and go around again. We have to + * use ExecHashTableDetachBatch() because there's a small + * chance we'll be the last to detach, and then we're + * responsible for freeing memory. + */ + ExecParallelHashTableSetCurrentBatch(hashtable, batchno); + hashtable->batches[batchno].done = true; + ExecHashTableDetachBatch(hashtable); + break; - case PHJ_BATCH_DONE: + case PHJ_BATCH_FREE: /* * Already done. Detach and go around again (if any diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index ec50c66104a..bff31340128 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -2315,15 +2315,9 @@ hash_inner_and_outer(PlannerInfo *root, * able to properly guarantee uniqueness. Similarly, we can't handle * JOIN_FULL and JOIN_RIGHT, because they can produce false null * extended rows. Also, the resulting path must not be parameterized. - * We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel - * Hash, since in that case we're back to a single hash table with a - * single set of match bits for each batch, but that will require - * figuring out a deadlock-free way to wait for the probe to finish. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && - save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && bms_is_empty(joinrel->lateral_relids)) { @@ -2360,9 +2354,13 @@ hash_inner_and_outer(PlannerInfo *root, * total inner path will also be parallel-safe, but if not, we'll * have to search for the cheapest safe, unparameterized inner * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative - * inner path. + * inner path. If full or right join, we can't use parallelism + * (building the hash table in each backend) because no one + * process has all the match bits. */ - if (cheapest_total_inner->parallel_safe) + if (save_jointype == JOIN_FULL || save_jointype == JOIN_RIGHT) + cheapest_safe_inner = NULL; + else if (cheapest_total_inner->parallel_safe) cheapest_safe_inner = cheapest_total_inner; else if (save_jointype != JOIN_UNIQUE_INNER) cheapest_safe_inner = diff --git a/src/include/cdb/cdbpathlocus.h b/src/include/cdb/cdbpathlocus.h index 0f71ba55dfb..9f5a8227e68 100644 --- a/src/include/cdb/cdbpathlocus.h +++ b/src/include/cdb/cdbpathlocus.h @@ -292,13 +292,13 @@ typedef struct CdbPathLocus _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) -#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_) \ +#define CdbPathLocus_MakeHashedOJ(plocus, distkey_, numsegments_, parallel_workers_) \ do { \ CdbPathLocus *_locus = (plocus); \ _locus->locustype = CdbLocusType_HashedOJ; \ _locus->numsegments = (numsegments_); \ _locus->distkey = (distkey_); \ - _locus->parallel_workers = 0; \ + _locus->parallel_workers = (parallel_workers_); \ Assert(cdbpathlocus_is_valid(*_locus)); \ } while (0) #define CdbPathLocus_MakeHashedWorkers(plocus, distkey_, numsegments_, parallel_workers_) \ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index b1fbaacf5e9..b240d0ae555 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -195,6 +195,7 @@ typedef struct ParallelHashJoinBatch size_t ntuples; /* number of tuples loaded */ size_t old_ntuples; /* number of tuples before repartitioning */ bool space_exhausted; + bool skip_unmatched; /* whether to abandon unmatched scan */ /* * Variable-sized SharedTuplestore objects follow this struct in memory. @@ -239,7 +240,7 @@ typedef struct ParallelHashJoinBatchAccessor size_t estimated_size; /* size of partition on disk */ size_t old_ntuples; /* how many tuples before repartitioning? */ bool at_least_one_chunk; /* has this backend allocated a chunk? */ - + bool outer_eof; /* has this process hit end of batch? */ bool done; /* flag to remember that a batch is done */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; @@ -305,7 +306,8 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_LOADING 2 #define PHJ_BATCH_PROBING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_SCAN 4 +#define PHJ_BATCH_FREE 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 993de4519b5..36549376ef9 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -64,9 +64,12 @@ extern bool ExecScanHashBucket(HashState *hashState, HashJoinState *hjstate, extern bool ExecParallelScanHashBucket(HashState *hashState, HashJoinState *hjstate, ExprContext *econtext); extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate); +extern bool ExecParallelPrepHashTableForUnmatched(HashJoinState *hjstate); extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext); extern void ExecHashTableReset(HashState *hashState, HashJoinTable hashtable); +extern bool ExecParallelScanHashTableForUnmatched(HashJoinState *hjstate, + ExprContext *econtext); extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable); extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew, uint64 operatorMemKB, diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 35e90eebfa1..af975de50f4 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -112,8 +112,8 @@ set local enable_parallel_dedup_semi_reverse_join = on; set local enable_parallel_dedup_semi_join = on; explain (costs off) select sum(foo.a) from foo where exists (select 1 from bar where foo.a = bar.b); - QUERY PLAN ------------------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -1032,6 +1032,15 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 6 | 6 | 6 | 7 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 10 | 10 | 10 | 11 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1042,15 +1051,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 6 | 6 | 6 | 7 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 (19 rows) -- parallel hash join @@ -1093,13 +1093,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt2 on select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- - 5 | 6 | 5 | 5 | 5 | 6 - 6 | 7 | 5 | 6 | 6 | 7 - 6 | 7 | 6 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 9 | 10 | 9 | 9 | 9 | 10 - 10 | 11 | 9 | 10 | 10 | 11 - 10 | 11 | 10 | 10 | 10 | 11 + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 2 | 3 | 3 | 4 3 | 4 | 3 | 3 | 3 | 4 @@ -1110,8 +1105,13 @@ select * from rt1 join t1 on rt1.a = t1.b join rt2 on rt2.a = t1.b; 8 | 9 | 7 | 8 | 8 | 9 8 | 9 | 8 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 + 5 | 6 | 5 | 5 | 5 | 6 + 6 | 7 | 5 | 6 | 6 | 7 + 6 | 7 | 6 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 9 | 10 | 9 | 9 | 9 | 10 + 10 | 11 | 9 | 10 | 10 | 11 + 10 | 11 | 10 | 10 | 10 | 11 (19 rows) -- @@ -1145,6 +1145,8 @@ explain(locus, costs off) select * from rt1 join t1 on rt1.a = t1.b join rt3 on select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- + 1 | 2 | 1 | 1 | 1 | 2 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1155,8 +1157,6 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 - 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 @@ -1201,14 +1201,11 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; a | b | a | b | a | b ----+----+----+----+----+---- 1 | 2 | 1 | 1 | 1 | 2 - 2 | 3 | 1 | 2 | 2 | 3 5 | 6 | 5 | 5 | 5 | 6 6 | 7 | 6 | 6 | 6 | 7 9 | 10 | 9 | 9 | 9 | 10 10 | 11 | 10 | 10 | 10 | 11 - 6 | 7 | 5 | 6 | 6 | 7 - 7 | 8 | 6 | 7 | 7 | 8 - 10 | 11 | 9 | 10 | 10 | 11 + 2 | 3 | 1 | 2 | 2 | 3 2 | 3 | 2 | 2 | 2 | 3 3 | 4 | 3 | 3 | 3 | 4 4 | 5 | 4 | 4 | 4 | 5 @@ -1219,6 +1216,9 @@ select * from rt1 join t1 on rt1.a = t1.b join rt3 on rt3.a = t1.b; 5 | 6 | 4 | 5 | 5 | 6 8 | 9 | 7 | 8 | 8 | 9 9 | 10 | 8 | 9 | 9 | 10 + 6 | 7 | 5 | 6 | 6 | 7 + 7 | 8 | 6 | 7 | 7 | 8 + 10 | 11 | 9 | 10 | 10 | 11 (19 rows) create table t2(a int, b int) with(parallel_workers=0); @@ -1271,12 +1271,12 @@ explain(locus, costs off) select * from rt4 join t2 using(b); select * from rt4 join t2 using(b); b | a | a ----+----+---- - 2 | 1 | 1 3 | 2 | 2 4 | 3 | 3 5 | 4 | 4 8 | 7 | 7 9 | 8 | 8 + 2 | 1 | 1 6 | 5 | 5 7 | 6 | 6 10 | 9 | 9 @@ -1362,9 +1362,9 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1374,9 +1374,9 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_0 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 3 | 4 | 4 1 | 2 | 2 + 3 | 4 | 4 + 2 | 3 | 3 4 | 5 | 5 5 | 6 | 6 (5 rows) @@ -1419,9 +1419,9 @@ explain(locus, costs off) select * from t_replica_workers_2 right join t_random_ select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 2 | 3 | 3 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 (5 rows) @@ -1431,11 +1431,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 right join t_random_workers_2 using(a); a | b | b ---+---+--- + 5 | 6 | 6 1 | 2 | 2 - 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 - 5 | 6 | 6 + 2 | 3 | 3 (5 rows) abort; @@ -1471,13 +1471,13 @@ explain(locus, costs off) select * from t_replica_workers_2 join t_random_worker Locus: Strewn Parallel Workers: 2 Optimizer: Postgres query optimizer -(16 rows) +(15 rows) select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 1 | 2 | 2 + 2 | 3 | 3 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 @@ -1488,11 +1488,11 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); a | b | b ---+---+--- - 2 | 3 | 3 - 1 | 2 | 2 3 | 4 | 4 4 | 5 | 5 5 | 6 | 6 + 1 | 2 | 2 + 2 | 3 | 3 (5 rows) abort; @@ -1510,28 +1510,28 @@ analyze t1; analyze rt1; set local enable_parallel = on; explain(locus, costs off) select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; - QUERY PLAN ------------------------------------------------------- - Parallel Hash Left Join + QUERY PLAN +------------------------------------------------------------ + Parallel Hash Right Join Locus: Entry - Hash Cond: ((count(*)) = rt1.a) - -> Finalize Aggregate + Hash Cond: (rt1.a = (count(*))) + -> Gather Motion 2:1 (slice1; segments: 2) Locus: Entry - -> Gather Motion 6:1 (slice1; segments: 6) - Locus: Entry - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t1 - Locus: HashedWorkers - Parallel Workers: 2 + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 2 -> Parallel Hash Locus: Entry - -> Gather Motion 2:1 (slice2; segments: 2) + -> Finalize Aggregate Locus: Entry - -> Parallel Seq Scan on rt1 - Locus: SegmentGeneralWorkers - Parallel Workers: 2 + -> Gather Motion 6:1 (slice2; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t1 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (21 rows) @@ -1661,17 +1661,17 @@ explain(costs off) select * from t1 right join t2 on t1.b = t2.a; QUERY PLAN ------------------------------------------------------------------ Gather Motion 9:1 (slice1; segments: 9) - -> Parallel Hash Left Join - Hash Cond: (t2.a = t1.b) - -> Redistribute Motion 6:9 (slice2; segments: 6) - Hash Key: t2.a + -> Parallel Hash Right Join + Hash Cond: (t1.b = t2.a) + -> Redistribute Motion 9:9 (slice2; segments: 9) + Hash Key: t1.b Hash Module: 3 - -> Parallel Seq Scan on t2 + -> Parallel Seq Scan on t1 -> Parallel Hash - -> Redistribute Motion 9:9 (slice3; segments: 9) - Hash Key: t1.b + -> Redistribute Motion 6:9 (slice3; segments: 6) + Hash Key: t2.a Hash Module: 3 - -> Parallel Seq Scan on t1 + -> Parallel Seq Scan on t2 Optimizer: Postgres query optimizer (13 rows) @@ -1690,34 +1690,34 @@ analyze t2; set local enable_parallel = on; -- parallel hash join with shared table, SinglQE as outer partial path. explain(locus, costs off) select * from (select count(*) as a from t2) t2 left join t1 on t1.a = t2.a; - QUERY PLAN ------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------ Gather Motion 6:1 (slice1; segments: 6) Locus: Entry - -> Parallel Hash Left Join - Locus: Hashed + -> Parallel Hash Right Join + Locus: HashedWorkers Parallel Workers: 2 - Hash Cond: ((count(*)) = t1.a) - -> Redistribute Motion 1:6 (slice2; segments: 1) - Locus: Hashed + Hash Cond: (t1.a = (count(*))) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers Parallel Workers: 2 - Hash Key: (count(*)) - Hash Module: 3 - -> Finalize Aggregate - Locus: SingleQE - -> Gather Motion 6:1 (slice3; segments: 6) - Locus: SingleQE - -> Partial Aggregate - Locus: HashedWorkers - Parallel Workers: 2 - -> Parallel Seq Scan on t2 - Locus: HashedWorkers - Parallel Workers: 2 -> Parallel Hash Locus: Hashed - -> Parallel Seq Scan on t1 - Locus: HashedWorkers + -> Redistribute Motion 1:6 (slice2; segments: 1) + Locus: Hashed Parallel Workers: 2 + Hash Key: (count(*)) + Hash Module: 3 + -> Finalize Aggregate + Locus: SingleQE + -> Gather Motion 6:1 (slice3; segments: 6) + Locus: SingleQE + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 2 Optimizer: Postgres query optimizer (27 rows) @@ -1975,7 +1975,7 @@ explain (locus, costs off) select * from rt1 union all select * from t1; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -1999,7 +1999,7 @@ explain (locus, costs off) select * from rt1 union all select * from t2; -> Result Locus: Strewn Parallel Workers: 3 - One-Time Filter: (gp_execution_segment() = 0) + One-Time Filter: (gp_execution_segment() = 1) -> Parallel Seq Scan on rt1 Locus: SegmentGeneralWorkers Parallel Workers: 3 @@ -2296,8 +2296,8 @@ analyze t1; analyze t2; analyze t3_null; explain(costs off) select sum(t1.c1) from t1 where c1 not in (select c2 from t2); - QUERY PLAN ------------------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 6:1 (slice1; segments: 6) -> Partial Aggregate @@ -2317,8 +2317,8 @@ select sum(t1.c1) from t1 where c1 not in (select c2 from t2); (1 row) explain(costs off) select * from t1 where c1 not in (select c2 from t3_null); - QUERY PLAN ------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Gather Motion 6:1 (slice1; segments: 6) -> Parallel Hash Left Anti Semi (Not-In) Join Hash Cond: (t1.c1 = t3_null.c2) @@ -2457,8 +2457,11 @@ abort; begin; create table pagg_tab (a int, b int, c text, d int) partition by list(c); create table pagg_tab_p1 partition of pagg_tab for values in ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p2 partition of pagg_tab for values in ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table create table pagg_tab_p3 partition of pagg_tab for values in ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table insert into pagg_tab select i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 from generate_series(0, 2999) i; analyze pagg_tab; set local enable_parallel to off; @@ -2972,7 +2975,7 @@ create table t2_anti(a int, b int) with(parallel_workers=2) distributed by (b); insert into t2_anti values(generate_series(5, 10)); explain(costs off, verbose) select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_anti.a where t2_anti.a is null; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) Output: t1_anti.a, t1_anti.b @@ -3068,8 +3071,8 @@ select t1_anti.a, t1_anti.b from t1_anti left join t2_anti on t1_anti.a = t2_ant ---+--- 3 | 4 | - 1 | 2 | + 1 | (4 rows) abort; @@ -3098,7 +3101,7 @@ insert into t_distinct_0 select * from t_distinct_0; analyze t_distinct_0; explain(costs off) select distinct a from t_distinct_0; - QUERY PLAN + QUERY PLAN ------------------------------------------------------------ Gather Motion 3:1 (slice1; segments: 3) -> HashAggregate @@ -3232,8 +3235,6 @@ select distinct a, b from t_distinct_0; drop table if exists t_distinct_1; NOTICE: table "t_distinct_1" does not exist, skipping create table t_distinct_1(a int, b int) using ao_column; -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into t_distinct_1 select * from t_distinct_0; analyze t_distinct_1; set enable_parallel = off; @@ -3520,10 +3521,7 @@ WHERE e.salary > ( -- Test https://github.com/apache/cloudberry/issues/1376 -- create table t1(a int, b int); -NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. -HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table t2 (like t1); -NOTICE: table doesn't have 'DISTRIBUTED BY' clause, defaulting to distribution columns from LIKE table set gp_cte_sharing = on; explain(locus, costs off) with x as (select a, count(*) as b from t1 group by a union all @@ -3571,8 +3569,184 @@ explain(locus, costs off) with x as reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + Parallel Workers: 2 + -> Parallel Hash Full Join + Locus: HashedOJ + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + count +------- + 35000 +(1 row) + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); + QUERY PLAN +---------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash Right Join + Locus: HashedWorkers + Parallel Workers: 2 + Hash Cond: (pj_t1.id = pj_t2.id) + -> Parallel Seq Scan on pj_t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on pj_t2 + Locus: HashedWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(20 rows) + +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + count +------- + 10000 +(1 row) + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + QUERY PLAN +--------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Replicated + -> Broadcast Motion 3:3 (slice2; segments: 3) + Locus: Replicated + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(25 rows) + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + QUERY PLAN +-------------------------------------------------------------------------- + Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice1; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: HashedOJ + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (COALESCE(pj_t1.id, pj_t2.id) = pj_t3.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Locus: Hashed + Hash Key: COALESCE(pj_t1.id, pj_t2.id) + -> Hash Full Join + Locus: HashedOJ + Hash Cond: (pj_t1.id = pj_t2.id) + -> Seq Scan on pj_t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t2 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on pj_t3 + Locus: Hashed + Optimizer: Postgres query optimizer +(26 rows) + +abort; -- start_ignore drop schema test_parallel cascade; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table t_distinct_0 +drop cascades to table t_distinct_1 +drop cascades to table departments +drop cascades to table employees +drop cascades to table t1 +drop cascades to table t2 -- end_ignore reset gp_appendonly_insert_files; reset force_parallel_mode; diff --git a/src/test/regress/expected/join_hash.out b/src/test/regress/expected/join_hash.out index 5171a7d9cf3..e5f74c18d28 100644 --- a/src/test/regress/expected/join_hash.out +++ b/src/test/regress/expected/join_hash.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -58,12 +61,16 @@ $$; -- estimated size. create table simple as select generate_series(1, 60000) AS id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table simple set (parallel_workers = 2); analyze simple; -- Make a relation whose size we will under-estimate. We want stats -- to say 1000 rows, but actually there are 20,000 rows. create table bigger_than_it_looks as select generate_series(1, 60000) as id, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table bigger_than_it_looks set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table bigger_than_it_looks set (parallel_workers = 2); @@ -73,6 +80,8 @@ update pg_class set reltuples = 1000 where relname = 'bigger_than_it_looks'; -- kind of skew that breaks our batching scheme. We want stats to say -- 2 rows, but actually there are 20,000 rows with the same key. create table extremely_skewed (id int, t text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table extremely_skewed set (autovacuum_enabled = 'false'); WARNING: autovacuum is not supported in Cloudberry alter table extremely_skewed set (parallel_workers = 2); @@ -85,6 +94,8 @@ update pg_class where relname = 'extremely_skewed'; -- Make a relation with a couple of enormous tuples. create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. alter table wide set (parallel_workers = 2); ANALYZE wide; -- The "optimal" case: the hash table fits in memory; we plan for 1 @@ -315,6 +326,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -567,9 +585,13 @@ rollback to settings; -- Exercise rescans. We'll turn off parallel_leader_participation so -- that we can check that instrumentation comes back correctly. create table join_foo as select generate_series(1, 3) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_foo; alter table join_foo set (parallel_workers = 0); create table join_bar as select generate_series(1, 20000) as id, 'xxxxx'::text as t; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. analyze join_bar; alter table join_bar set (parallel_workers = 2); -- multi-batch with rescan, parallel-oblivious @@ -816,7 +838,33 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +---------------------------------------------------- + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = s.id) + -> Seq Scan on simple r + -> Hash + -> Seq Scan on simple s + Optimizer: Postgres query optimizer +(9 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + +rollback to settings; +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -841,7 +889,7 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -869,7 +917,35 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: ((0 - s.id) = r.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + -> Hash + -> Seq Scan on simple r + Optimizer: Postgres query optimizer +(11 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +-------- + 120000 +(1 row) + +rollback to settings; +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -954,7 +1030,11 @@ rollback to settings; savepoint settings; set max_parallel_workers_per_gather = 0; create table join_hash_t_small(a int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. create table join_hash_t_big(b int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'b' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. insert into join_hash_t_small select i%100 from generate_series(0, 3000)i; insert into join_hash_t_big select i%100000 from generate_series(1, 100000)i ; analyze join_hash_t_small; @@ -973,6 +1053,52 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +begin; +SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + | 2 + 1 | +(2 rows) + +ROLLBACK TO settings; +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's -- need to reference Hash's outer plan (which is below HashJoin's @@ -988,7 +1114,11 @@ BEGIN; SET LOCAL enable_sort = OFF; -- avoid mergejoins SET LOCAL from_collapse_limit = 1; -- allows easy changing of join order CREATE TABLE hjtest_1 (a text, b int, id int, c bool); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE hjtest_2 (a bool, id int, b text, c int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 2, 1, false); -- matches INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 1, 2, false); -- fails id join condition INSERT INTO hjtest_1(a, b, id, c) VALUES ('text', 20, 1, false); -- fails < 50 @@ -1045,8 +1175,8 @@ WHERE SubPlan 2 -> Result Output: (hjtest_1.b * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 @@ -1109,8 +1239,8 @@ WHERE SubPlan 3 -> Result Output: (hjtest_2.c * 5) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'off' Optimizer: Postgres query optimizer - Settings: enable_sort=off, from_collapse_limit=1 (38 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 diff --git a/src/test/regress/expected/join_hash_optimizer.out b/src/test/regress/expected/join_hash_optimizer.out index 053d0ef4898..1835bfa4f31 100644 --- a/src/test/regress/expected/join_hash_optimizer.out +++ b/src/test/regress/expected/join_hash_optimizer.out @@ -10,6 +10,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or -- buckets) will be required because it can vary, but we can in some @@ -115,7 +118,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -156,7 +159,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -197,7 +200,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -241,7 +244,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -283,7 +286,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -325,7 +328,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: s.id -> Seq Scan on simple s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select count(*) from simple r join simple s using (id); @@ -344,6 +347,13 @@ $$); t | f (1 row) +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + rollback to settings; -- The "bad" case: during execution we need to increase number of -- batches; in this case we plan for 1 batch, and increase at least a @@ -356,8 +366,8 @@ set local work_mem = '128kB'; set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_mem explain (costs off) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -367,8 +377,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) FROM simple r JOIN bigger_than_it_looks s USING (id); count @@ -395,8 +405,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = off; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -406,8 +416,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -434,8 +444,8 @@ set local statement_mem = '1000kB'; -- GPDB uses statement_mem instead of work_m set local enable_parallel_hash = on; explain (costs off) select count(*) from simple r join bigger_than_it_looks s using (id); - QUERY PLAN ------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------- Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -445,8 +455,8 @@ explain (costs off) -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on bigger_than_it_looks s - Optimizer: Pivotal Optimizer (GPORCA) -(13 rows) + Optimizer: GPORCA +(10 rows) select count(*) from simple r join bigger_than_it_looks s using (id); count @@ -490,7 +500,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -534,7 +544,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -578,7 +588,7 @@ HINT: For non-partitioned tables, run analyze (). For -> Hash -> Broadcast Motion 3:3 (slice2; segments: 3) -> Seq Scan on extremely_skewed s - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (10 rows) select count(*) from simple r join extremely_skewed s using (id); @@ -643,8 +653,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -662,7 +672,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -701,8 +711,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -720,7 +730,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -760,8 +770,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -779,7 +789,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -818,8 +828,8 @@ explain (costs off) select count(*) from join_foo left join (select b1.id, b1.t from join_bar b1 join join_bar b2 using (id)) ss on join_foo.id < ss.id + 1 and join_foo.id > ss.id - 1; - QUERY PLAN ------------------------------------------------------------------------------------------------------------- + QUERY PLAN +------------------------------------------------------------------------------------------------ Finalize Aggregate -> Gather Motion 3:1 (slice1; segments: 3) -> Partial Aggregate @@ -837,7 +847,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice4; segments: 3) Hash Key: b2.id -> Seq Scan on join_bar b2 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (18 rows) select count(*) from join_foo @@ -891,8 +901,9 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join savepoint settings; +set enable_parallel_hash = off; set local max_parallel_workers_per_gather = 2; explain (costs off) select count(*) from simple r full outer join simple s using (id); @@ -920,7 +931,36 @@ select count(*) from simple r full outer join simple s using (id); (1 row) rollback to settings; --- An full outer join where every record is not matched. +-- parallelism is possible with parallel-aware full hash join +savepoint settings; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = s.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: s.id + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s using (id); + count +------- + 60000 +(1 row) + +rollback to settings; +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; set local max_parallel_workers_per_gather = 0; @@ -950,7 +990,37 @@ select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); (1 row) rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + QUERY PLAN +------------------------------------------------------------------------------ + Finalize Aggregate + -> Gather Motion 3:1 (slice1; segments: 3) + -> Partial Aggregate + -> Hash Full Join + Hash Cond: (r.id = (0 - s.id)) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Hash Key: r.id + -> Seq Scan on simple r + -> Hash + -> Redistribute Motion 3:3 (slice3; segments: 3) + Hash Key: (0 - s.id) + -> Seq Scan on simple s + Optimizer: GPORCA +(13 rows) + +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); + count +-------- + 120000 +(1 row) + +rollback to settings; +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -1012,7 +1082,7 @@ explain (costs off) -> Redistribute Motion 3:3 (slice3; segments: 3) Hash Key: wide_1.id -> Seq Scan on wide wide_1 - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (13 rows) select length(max(s.t)) @@ -1060,11 +1130,57 @@ explain (costs off) select * from join_hash_t_small, join_hash_t_big where a = b -> Seq Scan on join_hash_t_big -> Hash -> Seq Scan on join_hash_t_small - Optimizer: Pivotal Optimizer (GPORCA) + Optimizer: GPORCA (7 rows) rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +begin; +SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE TABLE hjtest_matchbits_t2(id int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; + id | id +----+---- + 1 | + | 2 +(2 rows) + +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; + id | id +----+---- + | 2 + 1 | +(2 rows) + +ROLLBACK TO settings; +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's -- need to reference Hash's outer plan (which is below HashJoin's @@ -1154,9 +1270,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_1, hjtest_2 @@ -1231,9 +1347,9 @@ WHERE Filter: (((hjtest_1.b * 5)) < 50) -> Result Output: (hjtest_1.b * 5) - Settings: enable_sort = 'off', from_collapse_limit = '1' - Optimizer: Pivotal Optimizer (GPORCA) -(49 rows) + Settings: enable_parallel = 'on', enable_sort = 'off', from_collapse_limit = '1', optimizer = 'on' + Optimizer: GPORCA +(51 rows) SELECT hjtest_1.a a1, hjtest_2.a a2,hjtest_1.tableoid::regclass t1, hjtest_2.tableoid::regclass t2 FROM hjtest_2, hjtest_1 diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index f9d01dd8a00..08e7aa198f9 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -1149,6 +1149,56 @@ reset gp_cte_sharing; reset enable_parallel; reset min_parallel_table_scan_size; +-- +-- Parallel Hash Full/Right Join +-- +begin; +create table pj_t1(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t2(id int, v int) with(parallel_workers=2) distributed by (id); +create table pj_t3(id int, v int) with(parallel_workers=0) distributed by (id); + +-- pj_t1 is 3x larger than pj_t2 so the planner hashes the smaller pj_t2 +-- and probes with pj_t1, producing a genuine Parallel Hash Right Join plan. +insert into pj_t1 select i, i from generate_series(1,30000)i; +insert into pj_t2 select i, i from generate_series(25001,35000)i; +insert into pj_t3 select i, i from generate_series(1,10000)i; +analyze pj_t1; +analyze pj_t2; +analyze pj_t3; + +set local enable_parallel = on; +set local min_parallel_table_scan_size = 0; + +-- 12_P_12_10: Parallel Hash Full Join: HashedWorkers FULL JOIN HashedWorkers -> HashedOJ(parallel) +explain(costs off, locus) +select count(*) from pj_t1 full join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 full join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 full join pj_t2 using (id); + +-- Parallel Hash Right Join: pj_t1 (30K) is larger, so the planner hashes the smaller pj_t2 +-- (10K) as the build side and probes with pj_t1; result locus HashedWorkers(parallel) +explain(costs off, locus) +select count(*) from pj_t1 right join pj_t2 using (id); +-- correctness: parallel result matches non-parallel +set local enable_parallel = off; +select count(*) from pj_t1 right join pj_t2 using (id); +set local enable_parallel = on; +select count(*) from pj_t1 right join pj_t2 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by INNER JOIN with Hashed(serial) +-- The full join result (HashedOJ,parallel=2) is joined with pj_t3 (Hashed,serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj inner join pj_t3 using (id); + +-- Locus propagation: HashedOJ(parallel) followed by FULL JOIN with Hashed(serial) +explain(costs off, locus) +select count(*) from (pj_t1 full join pj_t2 using (id)) fj full join pj_t3 using (id); + +abort; + -- start_ignore drop schema test_parallel cascade; -- end_ignore diff --git a/src/test/regress/sql/join_hash.sql b/src/test/regress/sql/join_hash.sql index 325068e9d23..2978e155ecd 100644 --- a/src/test/regress/sql/join_hash.sql +++ b/src/test/regress/sql/join_hash.sql @@ -13,6 +13,9 @@ set allow_system_table_mods=on; set local min_parallel_table_scan_size = 0; set local parallel_setup_cost = 0; set local enable_hashjoin = on; +-- CBDB: disable CBDB parallel for these PG-originated tests; parallel full join +-- is tested separately in cbdb_parallel.sql. +set local enable_parallel = off; -- Extract bucket and batch counts from an explain analyze plan. In -- general we can't make assertions about how many batches (or @@ -191,6 +194,8 @@ select original > 1 as initially_multibatch, final > original as increased_batch $$ select count(*) from simple r join simple s using (id); $$); +-- parallel full multi-batch hash join +select count(*) from simple r full outer join simple s using (id); rollback to settings; -- The "bad" case: during execution we need to increase number of @@ -438,7 +443,16 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s using (id); +select count(*) from simple r full outer join simple s using (id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -446,7 +460,7 @@ explain (costs off) select count(*) from simple r full outer join simple s using (id); rollback to settings; --- An full outer join where every record is not matched. +-- A full outer join where every record is not matched. -- non-parallel savepoint settings; @@ -456,7 +470,16 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; --- parallelism not possible with parallel-oblivious outer hash join +-- parallelism not possible with parallel-oblivious full hash join +savepoint settings; +set enable_parallel_hash = off; +set local max_parallel_workers_per_gather = 2; +explain (costs off) + select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); +rollback to settings; + +-- parallelism is possible with parallel-aware full hash join savepoint settings; set local max_parallel_workers_per_gather = 2; explain (costs off) @@ -464,6 +487,7 @@ explain (costs off) select count(*) from simple r full outer join simple s on (r.id = 0 - s.id); rollback to settings; + -- exercise special code paths for huge tuples (note use of non-strict -- expression and left join required to get the detoasted tuple into -- the hash table) @@ -518,6 +542,37 @@ rollback to settings; rollback; +-- Hash join reuses the HOT status bit to indicate match status. This can only +-- be guaranteed to produce correct results if all the hash join tuple match +-- bits are reset before reuse. This is done upon loading them into the +-- hashtable. +begin; +SAVEPOINT settings; +-- CBDB: disable CBDB parallel; the serial full join match-bit test is what matters here. +SET enable_parallel = off; +SET enable_parallel_hash = on; +SET min_parallel_table_scan_size = 0; +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0; +CREATE TABLE hjtest_matchbits_t1(id int); +CREATE TABLE hjtest_matchbits_t2(id int); +INSERT INTO hjtest_matchbits_t1 VALUES (1); +INSERT INTO hjtest_matchbits_t2 VALUES (2); +-- Update should create a HOT tuple. If this status bit isn't cleared, we won't +-- correctly emit the NULL-extended unmatching tuple in full hash join. +UPDATE hjtest_matchbits_t2 set id = 2; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id + ORDER BY t1.id; +-- Test serial full hash join. +-- Resetting parallel_setup_cost should force a serial plan. +-- Just to be safe, however, set enable_parallel_hash to off, as parallel full +-- hash joins are only supported with shared hashtables. +RESET parallel_setup_cost; +SET enable_parallel_hash = off; +SELECT * FROM hjtest_matchbits_t1 t1 FULL JOIN hjtest_matchbits_t2 t2 ON t1.id = t2.id; +ROLLBACK TO settings; + +rollback; -- Verify that hash key expressions reference the correct -- nodes. Hashjoin's hashkeys need to reference its outer plan, Hash's