Merge pull request #819 from robin900/handle-repartitions-by-typname

During repartitions use partitionColumnType as ::regtype so that UDTs work
pull/832/head
Marco Slot 2016-10-03 19:50:45 +02:00 committed by GitHub
commit 3d1e2c1d3a
7 changed files with 433 additions and 8 deletions

View File

@ -4151,6 +4151,7 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
ListCell *filterTaskCell = NULL; ListCell *filterTaskCell = NULL;
Var *partitionColumn = mapMergeJob->partitionColumn; Var *partitionColumn = mapMergeJob->partitionColumn;
Oid partitionColumnType = partitionColumn->vartype; Oid partitionColumnType = partitionColumn->vartype;
char *partitionColumnTypeFullName = format_type_be_qualified(partitionColumnType);
int32 partitionColumnTypeMod = partitionColumn->vartypmod; int32 partitionColumnTypeMod = partitionColumn->vartypmod;
char *partitionColumnName = NULL; char *partitionColumnName = NULL;
@ -4194,7 +4195,7 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
appendStringInfo(mapQueryString, RANGE_PARTITION_COMMAND, jobId, taskId, appendStringInfo(mapQueryString, RANGE_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName, filterQueryEscapedText, partitionColumnName,
partitionColumnType, splitPointString->data); partitionColumnTypeFullName, splitPointString->data);
} }
else else
{ {
@ -4202,7 +4203,7 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId, appendStringInfo(mapQueryString, HASH_PARTITION_COMMAND, jobId, taskId,
filterQueryEscapedText, partitionColumnName, filterQueryEscapedText, partitionColumnName,
partitionColumnType, partitionCount); partitionColumnTypeFullName, partitionCount);
} }
/* convert filter query task into map task */ /* convert filter query task into map task */

View File

@ -39,9 +39,9 @@
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', %d, %s)" (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %s)"
#define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \ #define HASH_PARTITION_COMMAND "SELECT worker_hash_partition_table \
(" UINT64_FORMAT ", %d, %s, '%s', %d, %d)" (" UINT64_FORMAT ", %d, %s, '%s', '%s'::regtype, %d)"
#define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \
(" UINT64_FORMAT ", %d, '%s', '%s')" (" UINT64_FORMAT ", %d, '%s', '%s')"
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \ #define MERGE_FILES_AND_RUN_QUERY_COMMAND \

View File

@ -0,0 +1,213 @@
--
-- MULTI_REPARTITION_UDT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 535000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 535000;
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
CREATE TABLE repartition_udt (
pk integer not null,
udtcol test_udt,
txtcol text
);
CREATE TABLE repartition_udt_other (
pk integer not null,
udtcol test_udt,
txtcol text
);
-- Connect directly to a worker, create and drop the type, then
-- proceed with type creation as above; thus the OIDs will be different.
-- so that the OID is off.
\c - - - :worker_1_port
CREATE TYPE test_udt AS (i integer, i2 integer);
DROP TYPE test_udt CASCADE;
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
\c - - - :worker_2_port
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
-- Connect to master
\c - - - :master_port
-- Distribute and populate the two tables.
SELECT master_create_distributed_table('repartition_udt', 'pk', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('repartition_udt', 3, 1);
master_create_worker_shards
-----------------------------
(1 row)
SELECT master_create_distributed_table('repartition_udt_other', 'pk', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('repartition_udt_other', 5, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO repartition_udt values (1, '(1,1)'::test_udt, 'foo');
INSERT INTO repartition_udt values (2, '(1,2)'::test_udt, 'foo');
INSERT INTO repartition_udt values (3, '(1,3)'::test_udt, 'foo');
INSERT INTO repartition_udt values (4, '(2,1)'::test_udt, 'foo');
INSERT INTO repartition_udt values (5, '(2,2)'::test_udt, 'foo');
INSERT INTO repartition_udt values (6, '(2,3)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (7, '(1,1)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (8, '(1,2)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (9, '(1,3)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (10, '(2,1)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (11, '(2,2)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (12, '(2,3)'::test_udt, 'foo');
SET client_min_messages = LOG;
-- Query that should result in a repartition join on int column, and be empty.
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.pk = repartition_udt_other.pk
WHERE repartition_udt.pk > 1;
pk | udtcol | txtcol | pk | udtcol | txtcol
----+--------+--------+----+--------+--------
(0 rows)
-- Query that should result in a repartition join on UDT column.
SET citus.large_table_shard_count = 1;
SET citus.task_executor_type = 'task-tracker';
SET citus.log_multi_join_order = true;
EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
QUERY PLAN
-------------------------------------------------------------------------
Distributed Query into pg_merge_job_535003
Executor: Task-Tracker
Task Count: 4
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
Merge Task Count: 4
-> MapMergeJob
Map Task Count: 5
Merge Task Count: 4
Master Query
-> Seq Scan on pg_merge_job_535003 (cost=0.00..0.00 rows=0 width=0)
(12 rows)
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;
LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ]
pk | udtcol | txtcol | pk | udtcol | txtcol
----+--------+--------+----+--------+--------
2 | (1,2) | foo | 8 | (1,2) | foo
3 | (1,3) | foo | 9 | (1,3) | foo
4 | (2,1) | foo | 10 | (2,1) | foo
5 | (2,2) | foo | 11 | (2,2) | foo
6 | (2,3) | foo | 12 | (2,3) | foo
(5 rows)

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1130000;
\set TaskId 101103 \set TaskId 101103
\set Partition_Column l_orderkey \set Partition_Column l_orderkey
\set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type 20 \set Partition_Column_Type '\'int8\''
\set Partition_Count 4 \set Partition_Count 4
\set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_Query_Text '\'SELECT * FROM lineitem\''
\set Select_All 'SELECT *' \set Select_All 'SELECT *'
@ -24,7 +24,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1130000;
\set Table_Part_03 lineitem_hash_part_03 \set Table_Part_03 lineitem_hash_part_03
-- Run select query, and apply hash partitioning on query results -- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type, :Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count); :Partition_Count);
worker_hash_partition_table worker_hash_partition_table
----------------------------- -----------------------------

View File

@ -128,6 +128,7 @@ test: multi_simple_queries
test: multi_utilities test: multi_utilities
test: multi_create_insert_proxy test: multi_create_insert_proxy
test: multi_data_types test: multi_data_types
test: multi_repartition_udt
test: multi_repartitioned_subquery_udf test: multi_repartitioned_subquery_udf
test: multi_modifying_xacts test: multi_modifying_xacts

View File

@ -0,0 +1,210 @@
--
-- MULTI_REPARTITION_UDT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 535000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 535000;
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
CREATE TABLE repartition_udt (
pk integer not null,
udtcol test_udt,
txtcol text
);
CREATE TABLE repartition_udt_other (
pk integer not null,
udtcol test_udt,
txtcol text
);
-- Connect directly to a worker, create and drop the type, then
-- proceed with type creation as above; thus the OIDs will be different.
-- so that the OID is off.
\c - - - :worker_1_port
CREATE TYPE test_udt AS (i integer, i2 integer);
DROP TYPE test_udt CASCADE;
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
\c - - - :worker_2_port
-- START type creation
CREATE TYPE test_udt AS (i integer, i2 integer);
-- ... as well as a function to use as its comparator...
CREATE FUNCTION equal_test_udt_function(test_udt, test_udt) RETURNS boolean
AS 'select $1.i = $2.i AND $1.i2 = $2.i2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- ... use that function to create a custom equality operator...
CREATE OPERATOR = (
LEFTARG = test_udt,
RIGHTARG = test_udt,
PROCEDURE = equal_test_udt_function,
COMMUTATOR = =,
HASHES
);
-- ... and create a custom operator family for hash indexes...
CREATE OPERATOR FAMILY tudt_op_fam USING hash;
-- ... create a test HASH function. Though it is a poor hash function,
-- it is acceptable for our tests
CREATE FUNCTION test_udt_hash(test_udt) RETURNS int
AS 'SELECT hashtext( ($1.i + $1.i2)::text);'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
-- We need to define two different operator classes for the composite types
-- One uses BTREE the other uses HASH
CREATE OPERATOR CLASS tudt_op_fam_clas3
DEFAULT FOR TYPE test_udt USING BTREE AS
OPERATOR 3 = (test_udt, test_udt);
CREATE OPERATOR CLASS tudt_op_fam_class
DEFAULT FOR TYPE test_udt USING HASH AS
OPERATOR 1 = (test_udt, test_udt),
FUNCTION 1 test_udt_hash(test_udt);
-- END type creation
-- Connect to master
\c - - - :master_port
-- Distribute and populate the two tables.
SELECT master_create_distributed_table('repartition_udt', 'pk', 'hash');
SELECT master_create_worker_shards('repartition_udt', 3, 1);
SELECT master_create_distributed_table('repartition_udt_other', 'pk', 'hash');
SELECT master_create_worker_shards('repartition_udt_other', 5, 1);
INSERT INTO repartition_udt values (1, '(1,1)'::test_udt, 'foo');
INSERT INTO repartition_udt values (2, '(1,2)'::test_udt, 'foo');
INSERT INTO repartition_udt values (3, '(1,3)'::test_udt, 'foo');
INSERT INTO repartition_udt values (4, '(2,1)'::test_udt, 'foo');
INSERT INTO repartition_udt values (5, '(2,2)'::test_udt, 'foo');
INSERT INTO repartition_udt values (6, '(2,3)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (7, '(1,1)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (8, '(1,2)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (9, '(1,3)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (10, '(2,1)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (11, '(2,2)'::test_udt, 'foo');
INSERT INTO repartition_udt_other values (12, '(2,3)'::test_udt, 'foo');
SET client_min_messages = LOG;
-- Query that should result in a repartition join on int column, and be empty.
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.pk = repartition_udt_other.pk
WHERE repartition_udt.pk > 1;
-- Query that should result in a repartition join on UDT column.
SET citus.large_table_shard_count = 1;
SET citus.task_executor_type = 'task-tracker';
SET citus.log_multi_join_order = true;
EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1;
SELECT * FROM repartition_udt JOIN repartition_udt_other
ON repartition_udt.udtcol = repartition_udt_other.udtcol
WHERE repartition_udt.pk > 1
ORDER BY repartition_udt.pk;

View File

@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1130000;
\set TaskId 101103 \set TaskId 101103
\set Partition_Column l_orderkey \set Partition_Column l_orderkey
\set Partition_Column_Text '\'l_orderkey\'' \set Partition_Column_Text '\'l_orderkey\''
\set Partition_Column_Type 20 \set Partition_Column_Type '\'int8\''
\set Partition_Count 4 \set Partition_Count 4
\set Select_Query_Text '\'SELECT * FROM lineitem\'' \set Select_Query_Text '\'SELECT * FROM lineitem\''
@ -34,7 +34,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1130000;
-- Run select query, and apply hash partitioning on query results -- Run select query, and apply hash partitioning on query results
SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text, SELECT worker_hash_partition_table(:JobId, :TaskId, :Select_Query_Text,
:Partition_Column_Text, :Partition_Column_Type, :Partition_Column_Text, :Partition_Column_Type::regtype,
:Partition_Count); :Partition_Count);
COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000'; COPY :Table_Part_00 FROM 'base/pgsql_job_cache/job_201010/task_101103/p_00000';