From 3ba61244b8e0d665d03d7c9901026fd56e0ca0c0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 3 Mar 2022 09:50:06 +0100 Subject: [PATCH 1/2] Synchronize pg_dist_colocation metadata --- .../distributed/metadata/metadata_sync.c | 240 ++++++++++++++++++ .../distributed/metadata/node_metadata.c | 7 + .../distributed/sql/citus--10.2-4--11.0-1.sql | 2 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 2 + .../11.0-1.sql | 13 + .../latest.sql | 13 + .../11.0-1.sql | 9 + .../latest.sql | 9 + .../distributed/utils/colocation_utils.c | 35 ++- src/include/distributed/colocation_utils.h | 5 + src/include/distributed/metadata_sync.h | 7 + .../expected/multi_cluster_management.out | 6 + .../expected/multi_colocation_utils.out | 96 +++++++ src/test/regress/expected/multi_extension.out | 108 ++++---- .../regress/expected/multi_metadata_sync.out | 75 ++++-- .../expected/multi_mx_node_metadata.out | 7 + .../expected/upgrade_list_citus_objects.out | 6 +- .../regress/sql/multi_cluster_management.sql | 1 + .../regress/sql/multi_colocation_utils.sql | 32 +++ src/test/regress/sql/multi_metadata_sync.sql | 5 +- .../regress/sql/multi_mx_node_metadata.sql | 1 + 21 files changed, 598 insertions(+), 81 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/latest.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index ef7e64ec1..fe03bedb6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -28,6 +28,7 @@ #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_attrdef.h" +#include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" #include "catalog/pg_depend.h" #include "catalog/pg_foreign_server.h" @@ -48,12 +49,14 @@ #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/metadata_utility.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" #include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_shard.h" #include "distributed/relation_access_tracking.h" @@ -124,6 +127,11 @@ static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storag static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId, int32 shardState, int64 shardLength, int32 groupId); +static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, + int replicationFactor, + Oid distributionColumnType, + Oid distributionColumnCollation); +static char * ColocationGroupDeleteCommand(uint32 colocationId); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); @@ -142,6 +150,8 @@ PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); +PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); +PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); static bool got_SIGTERM = false; @@ -558,6 +568,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PLACEMENTS); dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); + dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_COLOCATION); Assert(superuser()); SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( @@ -3161,3 +3172,232 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * citus_internal_add_colocation_metadata is an internal UDF to + * add a row to pg_dist_colocation. + */ +Datum +citus_internal_add_colocation_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + int colocationId = PG_GETARG_INT32(0); + int shardCount = PG_GETARG_INT32(1); + int replicationFactor = PG_GETARG_INT32(2); + Oid distributionColumnType = PG_GETARG_INT32(3); + Oid distributionColumnCollation = PG_GETARG_INT32(4); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + InsertColocationGroupLocally(colocationId, shardCount, replicationFactor, + distributionColumnType, distributionColumnCollation); + + PG_RETURN_VOID(); +} + + +/* + * citus_internal_delete_colocation_metadata is an internal UDF to + * delte row from pg_dist_colocation. + */ +Datum +citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureSuperUser(); + + int colocationId = PG_GETARG_INT32(0); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + } + + DeleteColocationGroupLocally(colocationId); + + PG_RETURN_VOID(); +} + + +/* + * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. + */ +void +SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicationFactor, + Oid distributionColumnType, Oid distributionColumnCollation) +{ + char *command = ColocationGroupCreateCommand(colocationId, shardCount, + replicationFactor, + distributionColumnType, + distributionColumnCollation); + + /* + * We require superuser for all pg_dist_colocation operations because we have + * no reasonable way of restricting access. + */ + SendCommandToWorkersWithMetadataViaSuperUser(command); +} + + +/* + * ColocationGroupCreateCommand returns a command for creating a colocation group. + */ +static char * +ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor, + Oid distributionColumnType, Oid distributionColumnCollation) +{ + StringInfo insertColocationCommand = makeStringInfo(); + + appendStringInfo(insertColocationCommand, + "SELECT pg_catalog.citus_internal_add_colocation_metadata(" + "%d, %d, %d, ", + colocationId, + shardCount, + replicationFactor); + + /* we also have pg_dist_colocation entries for reference tables */ + if (distributionColumnType != InvalidOid) + { + char *typeName = format_type_extended(distributionColumnType, -1, + FORMAT_TYPE_FORCE_QUALIFY | + FORMAT_TYPE_ALLOW_INVALID); + + /* format_type_extended returns ??? in case of an unknown type */ + if (strcmp(typeName, "???") != 0) + { + appendStringInfo(insertColocationCommand, + "%s::regtype", + quote_literal_cstr(typeName)); + } + else + { + /* pg_dist_colocation contains an invalid type ID, just insert 0 */ + appendStringInfoString(insertColocationCommand, "0"); + } + } + else + { + appendStringInfoString(insertColocationCommand, "0"); + } + + appendStringInfoString(insertColocationCommand, ", "); + + if (distributionColumnCollation != InvalidOid) + { + /* would be great if we could use regcollation, but it's not available on PG12 */ + + Datum collationIdDatum = ObjectIdGetDatum(distributionColumnCollation); + HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); + + if (HeapTupleIsValid(collationTuple)) + { + Form_pg_collation collationform = + (Form_pg_collation) GETSTRUCT(collationTuple); + char *collationName = NameStr(collationform->collname); + char *collationSchemaName = get_namespace_name(collationform->collnamespace); + + appendStringInfo(insertColocationCommand, + "(select oid from pg_collation" + " where collname = %s" + " and collnamespace = %s::regnamespace)", + quote_literal_cstr(collationName), + quote_literal_cstr(collationSchemaName)); + } + else + { + /* pg_dist_colocation contains an invalid collation, just insert 0 */ + appendStringInfoString(insertColocationCommand, "0"); + } + + ReleaseSysCache(collationTuple); + } + else + { + appendStringInfoString(insertColocationCommand, "0"); + } + + appendStringInfoString(insertColocationCommand, ")"); + + return insertColocationCommand->data; +} + + +/* + * SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers. + */ +void +SyncDeleteColocationGroupToNodes(uint32 colocationId) +{ + char *command = ColocationGroupDeleteCommand(colocationId); + + /* + * We require superuser for all pg_dist_colocation operations because we have + * no reasonable way of restricting access. + */ + SendCommandToWorkersWithMetadataViaSuperUser(command); +} + + +/* + * ColocationGroupDeleteCommand returns a command for deleting a colocation group. + */ +static char * +ColocationGroupDeleteCommand(uint32 colocationId) +{ + StringInfo deleteColocationCommand = makeStringInfo(); + + appendStringInfo(deleteColocationCommand, + "SELECT pg_catalog.citus_internal_delete_colocation_metadata(%d)", + colocationId); + + return deleteColocationCommand->data; +} + + +/* + * ColocationGroupCreateCommandList returns the full list of commands for syncing + * pg_dist_colocation. + */ +List * +ColocationGroupCreateCommandList(void) +{ + List *commandList = NIL; + + Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock); + + bool indexOK = false; + SysScanDesc scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, + NULL, 0, NULL); + + HeapTuple colocationTuple = systable_getnext(scanDescriptor); + + while (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + char *command = + ColocationGroupCreateCommand(colocationForm->colocationid, + colocationForm->shardcount, + colocationForm->replicationfactor, + colocationForm->distributioncolumntype, + colocationForm->distributioncolumncollation); + + commandList = lappend(commandList, command); + + colocationTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgDistColocation, AccessShareLock); + + return commandList; +} diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 498497e64..4d36549fd 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -653,6 +653,8 @@ PgDistTableMetadataSyncCommandList(void) DELETE_ALL_PLACEMENTS); metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + DELETE_ALL_COLOCATION); /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ foreach_ptr(cacheEntry, propagatedTableList) @@ -664,6 +666,11 @@ PgDistTableMetadataSyncCommandList(void) tableMetadataCreateCommandList); } + /* commands to insert pg_dist_colocation entries */ + List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList(); + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + colocationGroupSyncCommandList); + /* As the last step, propagate the pg_dist_object entities */ Assert(ShouldPropagate()); List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList(); diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index d836d4b72..0b0c0ca39 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -10,6 +10,8 @@ #include "udfs/citus_shard_indexes_on_worker/11.0-1.sql" #include "udfs/citus_internal_add_object_metadata/11.0-1.sql" +#include "udfs/citus_internal_add_colocation_metadata/11.0-1.sql" +#include "udfs/citus_internal_delete_colocation_metadata/11.0-1.sql" #include "udfs/citus_run_local_command/11.0-1.sql" #include "udfs/worker_drop_sequence_dependency/11.0-1.sql" #include "udfs/worker_drop_shell_table/11.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index f11a9d450..62b683cc0 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -50,6 +50,8 @@ DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean); +DROP FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int, int, int, regtype, oid); +DROP FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int); DROP FUNCTION pg_catalog.citus_run_local_command(text); DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text); DROP FUNCTION pg_catalog.worker_drop_shell_table(table_name text); diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql new file mode 100644 index 000000000..823f45569 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_colocation_metadata( + colocation_id int, + shard_count int, + replication_factor int, + distribution_column_type regtype, + distribution_column_collation oid) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int,int,int,regtype,oid) IS + 'Inserts a co-location group into pg_dist_colocation'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/latest.sql new file mode 100644 index 000000000..823f45569 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/latest.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_colocation_metadata( + colocation_id int, + shard_count int, + replication_factor int, + distribution_column_type regtype, + distribution_column_collation oid) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int,int,int,regtype,oid) IS + 'Inserts a co-location group into pg_dist_colocation'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql new file mode 100644 index 000000000..d4c3f1be9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_colocation_metadata( + colocation_id int) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int) IS + 'deletes a co-location group from pg_dist_colocation'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/latest.sql new file mode 100644 index 000000000..d4c3f1be9 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_colocation_metadata( + colocation_id int) + RETURNS void + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME'; + +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int) IS + 'deletes a co-location group from pg_dist_colocation'; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 4edc9e424..9fb616ad8 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -583,6 +583,25 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol Oid distributionColumnCollation) { uint32 colocationId = GetNextColocationId(); + + InsertColocationGroupLocally(colocationId, shardCount, replicationFactor, + distributionColumnType, distributionColumnCollation); + + SyncNewColocationGroupToNodes(colocationId, shardCount, replicationFactor, + distributionColumnType, distributionColumnCollation); + + return colocationId; +} + + +/* + * InsertColocationGroupLocally inserts a record into pg_dist_colocation. + */ +void +InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicationFactor, + Oid distributionColumnType, + Oid distributionColumnCollation) +{ Datum values[Natts_pg_dist_colocation]; bool isNulls[Natts_pg_dist_colocation]; @@ -610,8 +629,6 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol /* increment the counter so that next command can see the row */ CommandCounterIncrement(); table_close(pgDistColocation, RowExclusiveLock); - - return colocationId; } @@ -1215,10 +1232,22 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId) /* - * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. + * DeleteColocationGroup deletes the colocation group from pg_dist_colocation + * throughout the cluster. */ static void DeleteColocationGroup(uint32 colocationId) +{ + DeleteColocationGroupLocally(colocationId); + SyncDeleteColocationGroupToNodes(colocationId); +} + + +/* + * DeleteColocationGroupLocally deletes the colocation group from pg_dist_colocation. + */ +void +DeleteColocationGroupLocally(uint32 colocationId) { int scanKeyCount = 1; ScanKeyData scanKey[1]; diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 0ce4d8fb1..0095ac427 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -31,6 +31,10 @@ uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColum extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation); +extern void InsertColocationGroupLocally(uint32 colocationId, int shardCount, + int replicationFactor, + Oid distributionColumnType, + Oid distributionColumnCollation); extern bool IsColocateWithNone(char *colocateWithTableName); extern uint32 GetNextColocationId(void); extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId); @@ -43,5 +47,6 @@ extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colo bool localOnly); extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId); extern List * ColocationGroupTableList(uint32 colocationId, uint32 count); +extern void DeleteColocationGroupLocally(uint32 colocationId); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 905d2cab5..30b86099a 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -38,6 +38,7 @@ extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern List * NodeMetadataCreateCommands(void); extern List * DistributedObjectMetadataSyncCommandList(void); +extern List * ColocationGroupCreateCommandList(void); extern List * CitusTableMetadataCreateCommandList(Oid relationId); extern List * NodeMetadataDropCommands(void); extern char * MarkObjectsDistributedCreateCommand(List *addresses, @@ -76,12 +77,18 @@ extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList, extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern void SetLocalEnableMetadataSync(bool state); +extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, + int replicationFactor, + Oid distributionColumType, + Oid distributionColumnCollation); +extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" #define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard" #define DELETE_ALL_DISTRIBUTED_OBJECTS "DELETE FROM citus.pg_dist_object" #define DELETE_ALL_PARTITIONS "DELETE FROM pg_dist_partition" +#define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation" #define REMOVE_ALL_SHELL_TABLES_COMMAND \ "SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition" #define REMOVE_ALL_CITUS_TABLES_COMMAND \ diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 5a5317e74..c3698c1f0 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -101,6 +101,12 @@ SELECT * FROM rebalance_table_shards(); -- TODO: Figure out why this is necessary, rebalance_table_shards shouldn't -- insert stuff into pg_dist_colocation TRUNCATE pg_dist_colocation; +SELECT run_command_on_workers('TRUNCATE pg_dist_colocation'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"TRUNCATE TABLE") +(1 row) + ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT 1 FROM citus_activate_node('localhost', :worker_2_port); ?column? diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 815908799..dab14a3a8 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -2,6 +2,13 @@ SET citus.next_shard_id TO 1300000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 4; -- Delete orphaned entries from pg_dist_colocation DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6; +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6'); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + -- =================================================================== -- create test utility function -- =================================================================== @@ -356,6 +363,13 @@ SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5); (1 row) DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5); +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5)'); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + SET citus.shard_count = 2; CREATE TABLE table1_groupA ( id int ); SELECT create_distributed_table('table1_groupA', 'id'); @@ -449,6 +463,23 @@ SELECT * FROM pg_dist_colocation 7 | 8 | 2 | 23 | 0 (4 rows) +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + nodeport | unnest +--------------------------------------------------------------------- + 57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 6, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57637 | {"shardcount": 8, "colocationid": 7, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 6, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57638 | {"shardcount": 8, "colocationid": 7, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} +(8 rows) + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; @@ -472,6 +503,16 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 4; 4 | 2 | 2 | 23 | 0 (1 row) +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4 +$$); + nodeport | unnest +--------------------------------------------------------------------- + 57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} +(2 rows) + -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 4; @@ -480,6 +521,16 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 4; 4 | 2 | 2 | 23 | 0 (1 row) +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4 +$$); + nodeport | unnest +--------------------------------------------------------------------- + 57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} +(2 rows) + -- create dropped colocation group again SET citus.shard_count = 2; CREATE TABLE table1_groupE ( id int ); @@ -820,6 +871,13 @@ ORDER BY ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000; +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000'); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata @@ -933,6 +991,25 @@ SELECT * FROM pg_dist_colocation 5 | 2 | 2 | 23 | 0 (5 rows) +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + nodeport | unnest +--------------------------------------------------------------------- + 57637 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57637 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57638 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} +(10 rows) + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid, logicalrelid; @@ -1008,6 +1085,25 @@ SELECT * FROM pg_dist_colocation 5 | 2 | 2 | 23 | 0 (5 rows) +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + nodeport | unnest +--------------------------------------------------------------------- + 57637 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57637 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"} + 57638 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} + 57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"} +(10 rows) + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid, logicalrelid; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 79d21dd3e..6d2408a21 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -428,20 +428,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.4-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -469,20 +469,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.4-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -578,20 +578,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE ALTER EXTENSION citus UPDATE TO '9.5-2'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -619,20 +619,20 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.5-1'; -- should see the old source code SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1; - prosrc + prosrc --------------------------------------------------------------------- - + - DECLARE + - colocated_tables regclass[]; + - BEGIN + - SELECT get_colocated_table_array(relation) INTO colocated_tables;+ - PERFORM + - master_update_shard_statistics(shardid) + - FROM + - pg_dist_shard + - WHERE + - logicalrelid = ANY (colocated_tables); + - END; + + + + DECLARE + + colocated_tables regclass[]; + + BEGIN + + SELECT get_colocated_table_array(relation) INTO colocated_tables;+ + PERFORM + + master_update_shard_statistics(shardid) + + FROM + + pg_dist_shard + + WHERE + + logicalrelid = ANY (colocated_tables); + + END; + (1 row) @@ -1013,7 +1013,9 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_finalize_upgrade_to_citus11(boolean) boolean + | function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void + | function citus_internal_delete_colocation_metadata(integer) void | function citus_internal_global_blocked_processes() SETOF record | function citus_internal_local_blocked_processes() SETOF record | function citus_run_local_command(text) void @@ -1026,7 +1028,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record -(23 rows) +(25 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index be923eee3..78cf5caaf 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -59,10 +59,11 @@ ALTER ROLE CURRENT_USER WITH PASSWORD 'dummypassword'; -- Show that, with no MX tables, activate node snapshot contains only the delete commands, -- pg_dist_node entries, pg_dist_object entries and roles. SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -75,6 +76,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''') SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition @@ -88,7 +90,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; -(27 rows) +(29 rows) -- this function is dropped in Citus10, added here for tests SET citus.enable_metadata_sync TO OFF; @@ -121,7 +123,7 @@ reset citus.shard_replication_factor; UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; -- Show that the created MX table is and its sequences are included in the activate node snapshot SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.user_defined_seq OWNER TO postgres @@ -130,6 +132,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -143,6 +146,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -162,12 +167,12 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(39 rows) +(42 rows) -- Show that CREATE INDEX commands are included in the activate node snapshot CREATE INDEX mx_index ON mx_test_table(col_2); SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.user_defined_seq OWNER TO postgres @@ -177,6 +182,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -190,6 +196,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -209,13 +217,13 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(40 rows) +(43 rows) -- Show that schema changes are included in the activate node snapshot CREATE SCHEMA mx_testing_schema; ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema; SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.user_defined_seq OWNER TO postgres @@ -226,6 +234,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -239,6 +248,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -258,7 +269,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(41 rows) +(44 rows) -- Show that append distributed tables are not included in the activate node snapshot CREATE TABLE non_mx_test_table (col_1 int, col_2 text); @@ -270,7 +281,7 @@ SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append'); UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass; SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.user_defined_seq OWNER TO postgres @@ -281,6 +292,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -294,6 +306,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -313,12 +327,12 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(41 rows) +(44 rows) -- Show that range distributed tables are not included in the activate node snapshot UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.user_defined_seq OWNER TO postgres @@ -329,6 +343,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -342,6 +357,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -361,7 +378,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(41 rows) +(44 rows) -- Test start_metadata_sync_to_node and citus_activate_node UDFs -- Ensure that hasmetadata=false for all nodes @@ -497,11 +514,13 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE col_2 | text | col_2 (1 row) --- Check that pg_dist_colocation is not synced +-- Check that pg_dist_colocation is synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype | distributioncolumncollation --------------------------------------------------------------------- -(0 rows) + 1 | 1 | -1 | 0 | 0 + 2 | 8 | 1 | 23 | 0 +(2 rows) -- Make sure that truncate trigger has been set for the MX table on worker SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; @@ -1522,6 +1541,13 @@ ORDER BY (2 rows) SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset +-- make sure we have the pg_dist_colocation record on the worker +SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0; + count +--------------------------------------------------------------------- + 1 +(1 row) + -- Check that DDL commands are propagated to reference tables on workers \c - - - :master_port ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; @@ -1823,7 +1849,7 @@ ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1'); ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT; ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1'); SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.mx_test_sequence_0 OWNER TO postgres @@ -1855,6 +1881,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; CREATE TABLE public.mx_ref (col_1 integer, col_2 text) CREATE TABLE public.test_table (id integer DEFAULT worker_nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT worker_nextval('public.mx_test_sequence_1'::regclass)) DELETE FROM citus.pg_dist_object + DELETE FROM pg_catalog.pg_dist_colocation DELETE FROM pg_dist_node DELETE FROM pg_dist_partition DELETE FROM pg_dist_placement @@ -1873,6 +1900,10 @@ SELECT unnest(activate_node_snapshot()) order by 1; SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10005, 's') SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10003, 't') SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10005, 's') + SELECT pg_catalog.citus_internal_add_colocation_metadata(10002, 7, 1, 'integer'::regtype, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(10003, 1, -1, 0, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(10004, 3, 1, 'integer'::regtype, 0) + SELECT pg_catalog.citus_internal_add_colocation_metadata(10005, 4, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -1909,7 +1940,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(84 rows) +(89 rows) -- shouldn't work since test_table is MX ALTER TABLE test_table ADD COLUMN id3 bigserial; @@ -1928,8 +1959,8 @@ ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0); ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers \c - - - :worker_1_port \ds - List of relations - Schema | Name | Type | Owner + List of relations + Schema | Name | Type | Owner --------------------------------------------------------------------- public | mx_test_sequence_0 | sequence | postgres public | mx_test_sequence_1 | sequence | postgres @@ -1949,8 +1980,8 @@ DETAIL: drop cascades to default value for column id2 of table test_table drop cascades to default value for column id of table test_table \c - - - :worker_1_port \ds - List of relations - Schema | Name | Type | Owner + List of relations + Schema | Name | Type | Owner --------------------------------------------------------------------- public | mx_test_table_col_3_seq | sequence | postgres public | sequence_rollback | sequence | postgres @@ -2080,13 +2111,13 @@ NOTICE: dropping metadata on the node (localhost,57637) stop_metadata_sync_to_node --------------------------------------------------------------------- - (1 row) +(1 row) SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node --------------------------------------------------------------------- - (1 row) +(1 row) RESET citus.shard_count; RESET citus.shard_replication_factor; diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index a4f49f320..933407024 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -860,6 +860,13 @@ NOTICE: drop cascades to default value for column a of table reference_table DROP TABLE ref_table; DROP TABLE reference_table; TRUNCATE pg_dist_colocation; +SELECT run_command_on_workers('TRUNCATE pg_dist_colocation'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"TRUNCATE TABLE") + (localhost,57638,t,"TRUNCATE TABLE") +(2 rows) + SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 5f5ea7b1e..b1c5c0088 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -70,10 +70,12 @@ ORDER BY 1; function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() function citus_internal.upgrade_columnar_storage(regclass) + function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) + function citus_internal_delete_colocation_metadata(integer) function citus_internal_delete_shard_metadata(bigint) function citus_internal_global_blocked_processes() function citus_internal_local_blocked_processes() @@ -176,7 +178,7 @@ ORDER BY 1; function master_update_table_statistics(regclass) function notify_constraint_dropped() function pg_cancel_backend(bigint) - function pg_terminate_backend(bigint, bigint) + function pg_terminate_backend(bigint,bigint) function poolinfo_valid(text) function read_intermediate_result(text,citus_copy_format) function read_intermediate_results(text[],citus_copy_format) @@ -273,5 +275,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(257 rows) +(259 rows) diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index f5a67e148..f6391c0ce 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -46,6 +46,7 @@ SELECT * FROM rebalance_table_shards(); -- TODO: Figure out why this is necessary, rebalance_table_shards shouldn't -- insert stuff into pg_dist_colocation TRUNCATE pg_dist_colocation; +SELECT run_command_on_workers('TRUNCATE pg_dist_colocation'); ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT 1 FROM citus_activate_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 6729b1cbf..ad9169fac 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -4,6 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 4; -- Delete orphaned entries from pg_dist_colocation DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6; +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6'); -- =================================================================== -- create test utility function @@ -161,6 +162,7 @@ SELECT find_shard_interval_index(1300016); SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5); DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5); +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5)'); SET citus.shard_count = 2; @@ -213,6 +215,12 @@ SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; @@ -221,10 +229,21 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition DROP TABLE table1_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 4; +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4 +$$); + + -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 4; +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4 +$$); + -- create dropped colocation group again SET citus.shard_count = 2; @@ -350,6 +369,7 @@ ORDER BY ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000; +SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000'); UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; @@ -401,6 +421,12 @@ SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid, logicalrelid; @@ -427,6 +453,12 @@ SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; +-- check to see whether metadata is synced +SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$ +SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c + WHERE colocationid >= 1 AND colocationid < 1000 +$$); + SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid, logicalrelid; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 13c984396..725ca9024 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -129,7 +129,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE SELECT "Column", "Type", "Definition" FROM index_attrs WHERE relid = 'mx_testing_schema.mx_index'::regclass; --- Check that pg_dist_colocation is not synced +-- Check that pg_dist_colocation is synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; -- Make sure that truncate trigger has been set for the MX table on worker @@ -637,6 +637,9 @@ ORDER BY SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset +-- make sure we have the pg_dist_colocation record on the worker +SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0; + -- Check that DDL commands are propagated to reference tables on workers \c - - - :master_port ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index a54917f87..e9b375337 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -388,6 +388,7 @@ DROP SEQUENCE sequence CASCADE; DROP TABLE ref_table; DROP TABLE reference_table; TRUNCATE pg_dist_colocation; +SELECT run_command_on_workers('TRUNCATE pg_dist_colocation'); SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; From ddf7cf29f3418bedce5ff56cf1c42a19431381a6 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 3 Mar 2022 12:48:48 +0100 Subject: [PATCH 2/2] Sync pg_dist_colocation as a batch --- .../distributed/metadata/metadata_sync.c | 168 +++++++++++++----- .../regress/expected/multi_metadata_sync.out | 36 ++-- 2 files changed, 139 insertions(+), 65 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index fe03bedb6..dc501923e 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -132,6 +132,9 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, Oid distributionColumnType, Oid distributionColumnCollation); static char * ColocationGroupDeleteCommand(uint32 colocationId); +static char * RemoteTypeIdExpression(Oid typeId); +static char * RemoteCollationIdExpression(Oid colocationId); + PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); @@ -3258,43 +3261,67 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio appendStringInfo(insertColocationCommand, "SELECT pg_catalog.citus_internal_add_colocation_metadata(" - "%d, %d, %d, ", + "%d, %d, %d, %s, %s)", colocationId, shardCount, - replicationFactor); + replicationFactor, + RemoteTypeIdExpression(distributionColumnType), + RemoteCollationIdExpression(distributionColumnCollation)); + + return insertColocationCommand->data; +} + + +/* + * RemoteTypeIdExpression returns an expression in text form that can + * be used to obtain the OID of a type on a different node when included + * in a query string. + */ +static char * +RemoteTypeIdExpression(Oid typeId) +{ + /* by default, use 0 (InvalidOid) */ + char *expression = "0"; /* we also have pg_dist_colocation entries for reference tables */ - if (distributionColumnType != InvalidOid) + if (typeId != InvalidOid) { - char *typeName = format_type_extended(distributionColumnType, -1, + char *typeName = format_type_extended(typeId, -1, FORMAT_TYPE_FORCE_QUALIFY | FORMAT_TYPE_ALLOW_INVALID); /* format_type_extended returns ??? in case of an unknown type */ if (strcmp(typeName, "???") != 0) { - appendStringInfo(insertColocationCommand, + StringInfo regtypeExpression = makeStringInfo(); + + appendStringInfo(regtypeExpression, "%s::regtype", quote_literal_cstr(typeName)); - } - else - { - /* pg_dist_colocation contains an invalid type ID, just insert 0 */ - appendStringInfoString(insertColocationCommand, "0"); + + expression = regtypeExpression->data; } } - else + + return expression; +} + + +/* + * RemoteCollationIdExpression returns an expression in text form that can + * be used to obtain the OID of a type on a different node when included + * in a query string. Currently this is a sublink because regcollation type + * is not available in PG12. + */ +static char * +RemoteCollationIdExpression(Oid colocationId) +{ + /* by default, use 0 (InvalidOid) */ + char *expression = "0"; + + if (colocationId != InvalidOid) { - appendStringInfoString(insertColocationCommand, "0"); - } - - appendStringInfoString(insertColocationCommand, ", "); - - if (distributionColumnCollation != InvalidOid) - { - /* would be great if we could use regcollation, but it's not available on PG12 */ - - Datum collationIdDatum = ObjectIdGetDatum(distributionColumnCollation); + Datum collationIdDatum = ObjectIdGetDatum(colocationId); HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); if (HeapTupleIsValid(collationTuple)) @@ -3304,29 +3331,21 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio char *collationName = NameStr(collationform->collname); char *collationSchemaName = get_namespace_name(collationform->collnamespace); - appendStringInfo(insertColocationCommand, + StringInfo colocationIdQuery = makeStringInfo(); + appendStringInfo(colocationIdQuery, "(select oid from pg_collation" " where collname = %s" " and collnamespace = %s::regnamespace)", quote_literal_cstr(collationName), quote_literal_cstr(collationSchemaName)); - } - else - { - /* pg_dist_colocation contains an invalid collation, just insert 0 */ - appendStringInfoString(insertColocationCommand, "0"); + + expression = colocationIdQuery->data; } ReleaseSysCache(collationTuple); } - else - { - appendStringInfoString(insertColocationCommand, "0"); - } - appendStringInfoString(insertColocationCommand, ")"); - - return insertColocationCommand->data; + return expression; } @@ -3369,7 +3388,14 @@ ColocationGroupDeleteCommand(uint32 colocationId) List * ColocationGroupCreateCommandList(void) { - List *commandList = NIL; + bool hasColocations = false; + + StringInfo colocationGroupCreateCommand = makeStringInfo(); + appendStringInfo(colocationGroupCreateCommand, + "WITH colocation_group_data (colocationid, shardcount, " + "replicationfactor, distributioncolumntype, " + "distributioncolumncollationname, " + "distributioncolumncollationschema) AS (VALUES "); Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock); @@ -3381,17 +3407,59 @@ ColocationGroupCreateCommandList(void) while (HeapTupleIsValid(colocationTuple)) { + if (hasColocations) + { + appendStringInfo(colocationGroupCreateCommand, ", "); + } + + hasColocations = true; + Form_pg_dist_colocation colocationForm = (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); - char *command = - ColocationGroupCreateCommand(colocationForm->colocationid, - colocationForm->shardcount, - colocationForm->replicationfactor, - colocationForm->distributioncolumntype, - colocationForm->distributioncolumncollation); + appendStringInfo(colocationGroupCreateCommand, + "(%d, %d, %d, %s, ", + colocationForm->colocationid, + colocationForm->shardcount, + colocationForm->replicationfactor, + RemoteTypeIdExpression(colocationForm->distributioncolumntype)); - commandList = lappend(commandList, command); + /* + * For collations, include the names in the VALUES section and then + * join with pg_collation. + */ + Oid distributionColumCollation = colocationForm->distributioncolumncollation; + if (distributionColumCollation != InvalidOid) + { + Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation); + HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); + + if (HeapTupleIsValid(collationTuple)) + { + Form_pg_collation collationform = + (Form_pg_collation) GETSTRUCT(collationTuple); + char *collationName = NameStr(collationform->collname); + char *collationSchemaName = get_namespace_name( + collationform->collnamespace); + + appendStringInfo(colocationGroupCreateCommand, + "%s, %s)", + quote_literal_cstr(collationName), + quote_literal_cstr(collationSchemaName)); + + ReleaseSysCache(collationTuple); + } + else + { + appendStringInfo(colocationGroupCreateCommand, + "NULL, NULL)"); + } + } + else + { + appendStringInfo(colocationGroupCreateCommand, + "NULL, NULL)"); + } colocationTuple = systable_getnext(scanDescriptor); } @@ -3399,5 +3467,19 @@ ColocationGroupCreateCommandList(void) systable_endscan(scanDescriptor); table_close(pgDistColocation, AccessShareLock); - return commandList; + if (!hasColocations) + { + return NIL; + } + + appendStringInfo(colocationGroupCreateCommand, + ") SELECT pg_catalog.citus_internal_add_colocation_metadata(" + "colocationid, shardcount, replicationfactor, " + "distributioncolumntype, coalesce(c.oid, 0)) " + "FROM colocation_group_data d LEFT JOIN pg_collation c " + "ON (d.distributioncolumncollationname = c.collname " + "AND d.distributioncolumncollationschema::regnamespace" + " = c.collnamespace)"); + + return list_make1(colocationGroupCreateCommand->data); } diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 78cf5caaf..0e537fe65 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -59,7 +59,7 @@ ALTER ROLE CURRENT_USER WITH PASSWORD 'dummypassword'; -- Show that, with no MX tables, activate node snapshot contains only the delete commands, -- pg_dist_node entries, pg_dist_object entries and roles. SELECT unnest(activate_node_snapshot()) order by 1; - unnest + unnest --------------------------------------------------------------------- CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres DELETE FROM citus.pg_dist_object @@ -76,7 +76,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''') SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition @@ -89,6 +88,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; (29 rows) @@ -146,8 +146,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -164,10 +162,11 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(42 rows) +(41 rows) -- Show that CREATE INDEX commands are included in the activate node snapshot CREATE INDEX mx_index ON mx_test_table(col_2); @@ -196,8 +195,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -214,10 +211,11 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(43 rows) +(42 rows) -- Show that schema changes are included in the activate node snapshot CREATE SCHEMA mx_testing_schema; @@ -248,8 +246,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -266,10 +262,11 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(44 rows) +(43 rows) -- Show that append distributed tables are not included in the activate node snapshot CREATE TABLE non_mx_test_table (col_1 int, col_2 text); @@ -306,8 +303,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -324,10 +319,11 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(44 rows) +(43 rows) -- Show that range distributed tables are not included in the activate node snapshot UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; @@ -357,8 +353,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; RESET ROLE SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''') SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(1, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(2, 8, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -375,10 +369,11 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(44 rows) +(43 rows) -- Test start_metadata_sync_to_node and citus_activate_node UDFs -- Ensure that hasmetadata=false for all nodes @@ -1900,10 +1895,6 @@ SELECT unnest(activate_node_snapshot()) order by 1; SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10005, 's') SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10003, 't') SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10005, 's') - SELECT pg_catalog.citus_internal_add_colocation_metadata(10002, 7, 1, 'integer'::regtype, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(10003, 1, -1, 0, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(10004, 3, 1, 'integer'::regtype, 0) - SELECT pg_catalog.citus_internal_add_colocation_metadata(10005, 4, 1, 'integer'::regtype, 0) SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint') @@ -1927,6 +1918,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; SET citus.enable_ddl_propagation TO 'on' SET citus.enable_ddl_propagation TO 'on' UPDATE pg_dist_local_group SET groupid = 1 + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10002, 7, 1, 'integer'::regtype, NULL, NULL), (10003, 1, -1, 0, NULL, NULL), (10004, 3, 1, 'integer'::regtype, NULL, NULL), (10005, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_1', 'mx_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_2', 'mx_table_2']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_ref']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'dist_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 5, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 5, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 5, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 5, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310020, 1, 0, 1, 100020), (1310021, 1, 0, 5, 100021), (1310022, 1, 0, 1, 100022), (1310023, 1, 0, 5, 100023), (1310024, 1, 0, 1, 100024)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; @@ -1940,7 +1932,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -(89 rows) +(86 rows) -- shouldn't work since test_table is MX ALTER TABLE test_table ADD COLUMN id3 bigserial;