diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 41d86517b..03e3e7019 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -594,6 +594,25 @@ NodeDeleteCommand(uint32 nodeId) } +/* + * ColocationIdUpdateCommand creates the SQL command to change the colocationId + * of the table with the given name to the given colocationId in pg_dist_partition + * table. + */ +char * +ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) +{ + StringInfo command = makeStringInfo(); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + appendStringInfo(command, "UPDATE pg_dist_partition " + "SET colocationid = %d " + "WHERE logicalrelid = %s::regclass", + colocationId, quote_literal_cstr(qualifiedRelationName)); + + return command->data; +} + + /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 02ac8285c..4a2035dcd 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -22,11 +22,13 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_transaction.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" @@ -617,6 +619,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) HeapTuple heapTuple = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; + bool shouldSyncMetadata = false; bool indexOK = true; int scanKeyCount = 1; ScanKeyData scanKey[scanKeyCount]; @@ -660,6 +663,15 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) systable_endscan(scanDescriptor); heap_close(pgDistPartition, NoLock); + + shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); + if (shouldSyncMetadata) + { + char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId, + colocationId); + + SendCommandToWorkers(WORKERS_WITH_METADATA, updateColocationIdCommand); + } } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 07492162b..936c7b34e 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -29,6 +29,7 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId); extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); +extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 4ec98dc1b..da9a3799c 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -737,6 +737,94 @@ Foreign-key constraints: Referenced by: TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) +-- Check that mark_tables_colocated call propagates the changes to the workers +\c - - - :master_port +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; +CREATE TABLE mx_colocation_test_1 (a int); +SELECT create_distributed_table('mx_colocation_test_1', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE mx_colocation_test_2 (a int); +SELECT create_distributed_table('mx_colocation_test_2', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +-- Check the colocation IDs of the created tables +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass +ORDER BY logicalrelid; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10000 + mx_colocation_test_2 | 10000 +(2 rows) + + +-- Reset the colocation IDs of the test tables +DELETE FROM + pg_dist_colocation +WHERE EXISTS ( + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid + AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); +UPDATE + pg_dist_partition +SET + colocationid = 0 +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; +-- Mark tables colocated and see the changes on the master and the worker +SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); + mark_tables_colocated +----------------------- + +(1 row) + +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10001 + mx_colocation_test_2 | 10001 +(2 rows) + +\c - - - :worker_1_port +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + logicalrelid | colocationid +----------------------+-------------- + mx_colocation_test_1 | 10001 + mx_colocation_test_2 | 10001 +(2 rows) + +\c - - - :master_port +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; -- Cleanup \c - - - :worker_1_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index b9988bf30..25d8706c4 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -298,6 +298,67 @@ NOT VALID; \c - - - :worker_1_port \d mx_test_schema_1.mx_table_1 +-- Check that mark_tables_colocated call propagates the changes to the workers +\c - - - :master_port +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; +SET citus.shard_count TO 7; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE mx_colocation_test_1 (a int); +SELECT create_distributed_table('mx_colocation_test_1', 'a'); + +CREATE TABLE mx_colocation_test_2 (a int); +SELECT create_distributed_table('mx_colocation_test_2', 'a'); + +-- Check the colocation IDs of the created tables +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass +ORDER BY logicalrelid; + +-- Reset the colocation IDs of the test tables +DELETE FROM + pg_dist_colocation +WHERE EXISTS ( + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid + AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); +UPDATE + pg_dist_partition +SET + colocationid = 0 +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + +-- Mark tables colocated and see the changes on the master and the worker +SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; +\c - - - :worker_1_port +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass + OR logicalrelid = 'mx_colocation_test_2'::regclass; + +\c - - - :master_port +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; + -- Cleanup \c - - - :worker_1_port