diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 6e6e1b52a..361ba7081 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -80,7 +80,6 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table); @@ -177,18 +176,7 @@ create_distributed_table(PG_FUNCTION_ARGS) if (ShouldSyncTableMetadata(relationId)) { - List *commandList = GetDistributedTableDDLEvents(relationId); - ListCell *commandCell = NULL; - - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - - /* send the commands one by one */ - foreach(commandCell, commandList) - { - char *command = (char *) lfirst(commandCell); - - SendCommandToWorkers(WORKERS_WITH_METADATA, command); - } + CreateTableMetadataOnWorkers(relationId); } PG_RETURN_VOID(); @@ -244,6 +232,8 @@ CreateReferenceTable(Oid relationId) /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); + + CreateTableMetadataOnWorkers(relationId); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index b5bafb40c..7345d18f5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -400,6 +400,12 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); + /* we don't support copy to reference tables from workers */ + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + EnsureSchemaNode(); + } + /* load the list of shards and verify that we have shards to copy into */ shardIntervalList = LoadShardIntervalList(tableId); if (shardIntervalList == NIL) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index a7890d627..5534de9aa 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -307,10 +307,9 @@ GetNextShardId() /* - * master_get_new_placementid allocates and returns a unique placementId for - * the placement to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having shardId - * collisions. + * master_get_new_placementid is a user facing wrapper function around + * GetNextPlacementId() which allocates and returns a unique placement id for the + * placement to be created. * * NB: This can be called by any user; for now we have decided that that's * ok. We might want to restrict this to users part of a specific role or such @@ -318,25 +317,51 @@ GetNextShardId() */ Datum master_get_new_placementid(PG_FUNCTION_ARGS) +{ + uint64 placementId = 0; + Datum placementIdDatum = 0; + + EnsureSchemaNode(); + + placementId = GetNextPlacementId(); + placementIdDatum = Int64GetDatum(placementId); + + PG_RETURN_DATUM(placementIdDatum); +} + + +/* + * GetNextPlacementId allocates and returns a unique placementId for + * the placement to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having shardId + * collisions. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +uint64 +GetNextPlacementId(void) { text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - Datum shardIdDatum = 0; - - EnsureSchemaNode(); + Datum placementIdDatum = 0; + uint64 placementId = 0; GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - /* generate new and unique shardId from sequence */ - shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + /* generate new and unique placement id from sequence */ + placementIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); SetUserIdAndSecContext(savedUserId, savedSecurityContext); - PG_RETURN_DATUM(shardIdDatum); + placementId = DatumGetInt64(placementIdDatum); + + return placementId; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 34dc859a3..d0bbf5e05 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -63,11 +63,11 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); /* * start_metadata_sync_to_node function creates the metadata in a worker for preparing the - * worker for accepting MX-table queries. The function first sets the localGroupId of the - * worker so that the worker knows which tuple in pg_dist_node table represents itself. - * After that, SQL statetemens for re-creating metadata about mx distributed - * tables are sent to the worker. Finally, the hasmetadata column of the target node in - * pg_dist_node is marked as true. + * worker for accepting queries. The function first sets the localGroupId of the worker + * so that the worker knows which tuple in pg_dist_node table represents itself. After + * that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are + * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node + * is marked as true. */ Datum start_metadata_sync_to_node(PG_FUNCTION_ARGS) @@ -132,7 +132,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) /* * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * to false in pg_dist_node table, thus indicating that the specified worker node does not - * receive DDL changes anymore and cannot be used for issuing mx queries. + * receive DDL changes anymore and cannot be used for issuing queries. */ Datum stop_metadata_sync_to_node(PG_FUNCTION_ARGS) @@ -159,19 +159,24 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) /* - * ShouldSyncTableMetadata checks if a distributed table has streaming replication model - * and hash distribution. In that case the distributed table is considered an MX table, - * and its metadata is required to exist on the worker nodes. + * ShouldSyncTableMetadata checks if the metadata of a distributed table should be + * propagated to metadata workers, i.e. the table is an MX table or reference table. + * Tables with streaming replication model (which means RF=1) and hash distribution are + * considered as MX tables while tables with none distribution are reference tables. */ bool ShouldSyncTableMetadata(Oid relationId) { DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId); - bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); - bool usesStreamingReplication = + + bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); + bool streamingReplicated = (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING); - if (usesStreamingReplication && usesHashDistribution) + bool mxTable = (streamingReplicated && hashDistributed); + bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); + + if (mxTable || referenceTable) { return true; } @@ -199,7 +204,7 @@ MetadataCreateCommands(void) { List *metadataSnapshotCommandList = NIL; List *distributedTableList = DistributedTableList(); - List *mxTableList = NIL; + List *propagatedTableList = NIL; List *workerNodeList = WorkerNodeList(); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; @@ -209,19 +214,19 @@ MetadataCreateCommands(void) metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, nodeListInsertCommand); - /* create the list of mx tables */ + /* create the list of tables whose metadata will be created */ foreach(distributedTableCell, distributedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); if (ShouldSyncTableMetadata(cacheEntry->relationId)) { - mxTableList = lappend(mxTableList, cacheEntry); + propagatedTableList = lappend(propagatedTableList, cacheEntry); } } - /* create the mx tables, but not the metadata */ - foreach(distributedTableCell, mxTableList) + /* create the tables, but not the metadata */ + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -240,7 +245,7 @@ MetadataCreateCommands(void) } /* construct the foreign key constraints after all tables are created */ - foreach(distributedTableCell, mxTableList) + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -253,7 +258,7 @@ MetadataCreateCommands(void) } /* after all tables are created, create the metadata */ - foreach(distributedTableCell, mxTableList) + foreach(distributedTableCell, propagatedTableList) { DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); @@ -323,7 +328,7 @@ GetDistributedTableDDLEvents(Oid relationId) metadataCommand = DistributionCreateCommand(cacheEntry); commandList = lappend(commandList, metadataCommand); - /* commands to create the truncate trigger of the mx table */ + /* commands to create the truncate trigger of the table */ truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); commandList = lappend(commandList, truncateTriggerCreateCommand); @@ -436,19 +441,30 @@ DistributionCreateCommand(DistTableCacheEntry *cacheEntry) char *partitionKeyString = cacheEntry->partitionKeyString; char *qualifiedRelationName = generate_qualified_relation_name(relationId); - char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString); uint32 colocationId = cacheEntry->colocationId; char replicationModel = cacheEntry->replicationModel; + StringInfo tablePartitionKeyString = makeStringInfo(); + + if (distributionMethod == DISTRIBUTE_BY_NONE) + { + appendStringInfo(tablePartitionKeyString, "NULL"); + } + else + { + char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString); + appendStringInfo(tablePartitionKeyString, "column_name_to_column(%s,%s)", + quote_literal_cstr(qualifiedRelationName), + quote_literal_cstr(partitionKeyColumnName)); + } appendStringInfo(insertDistributionCommand, "INSERT INTO pg_dist_partition " "(logicalrelid, partmethod, partkey, colocationid, repmodel) " "VALUES " - "(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')", + "(%s::regclass, '%c', %s, %d, '%c')", quote_literal_cstr(qualifiedRelationName), distributionMethod, - quote_literal_cstr(qualifiedRelationName), - quote_literal_cstr(partitionKeyColumnName), + tablePartitionKeyString->data, colocationId, replicationModel); @@ -511,7 +527,6 @@ ShardListInsertCommand(List *shardIntervalList) StringInfo insertShardCommand = makeStringInfo(); int shardCount = list_length(shardIntervalList); int processedShardCount = 0; - int processedShardPlacementCount = 0; /* if there are no shards, return empty list */ if (shardCount == 0) @@ -519,13 +534,6 @@ ShardListInsertCommand(List *shardIntervalList) return commandList; } - /* generate the shard placement query without any values yet */ - appendStringInfo(insertPlacementCommand, - "INSERT INTO pg_dist_shard_placement " - "(shardid, shardstate, shardlength," - " nodename, nodeport, placementid) " - "VALUES "); - /* add placements to insertPlacementCommand */ foreach(shardIntervalCell, shardIntervalList) { @@ -533,25 +541,33 @@ ShardListInsertCommand(List *shardIntervalList) uint64 shardId = shardInterval->shardId; List *shardPlacementList = FinalizedShardPlacementList(shardId); - ShardPlacement *placement = NULL; + ListCell *shardPlacementCell = NULL; - /* the function only handles single placement per shard */ - Assert(list_length(shardPlacementList) == 1); - - placement = (ShardPlacement *) linitial(shardPlacementList); - - appendStringInfo(insertPlacementCommand, - "(%lu, 1, %lu, %s, %d, %lu)", - shardId, - placement->shardLength, - quote_literal_cstr(placement->nodeName), - placement->nodePort, - placement->placementId); - - processedShardPlacementCount++; - if (processedShardPlacementCount != shardCount) + foreach(shardPlacementCell, shardPlacementList) { - appendStringInfo(insertPlacementCommand, ","); + ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + + if (insertPlacementCommand->len == 0) + { + /* generate the shard placement query without any values yet */ + appendStringInfo(insertPlacementCommand, + "INSERT INTO pg_dist_shard_placement " + "(shardid, shardstate, shardlength," + " nodename, nodeport, placementid) " + "VALUES "); + } + else + { + appendStringInfo(insertPlacementCommand, ","); + } + + appendStringInfo(insertPlacementCommand, + "(%lu, 1, %lu, %s, %d, %lu)", + shardId, + placement->shardLength, + quote_literal_cstr(placement->nodeName), + placement->nodePort, + placement->placementId); } } @@ -573,17 +589,36 @@ ShardListInsertCommand(List *shardIntervalList) Oid distributedRelationId = shardInterval->relationId; char *qualifiedRelationName = generate_qualified_relation_name( distributedRelationId); + StringInfo minHashToken = makeStringInfo(); + StringInfo maxHashToken = makeStringInfo(); - int minHashToken = DatumGetInt32(shardInterval->minValue); - int maxHashToken = DatumGetInt32(shardInterval->maxValue); + if (shardInterval->minValueExists) + { + appendStringInfo(minHashToken, "'%d'", DatumGetInt32( + shardInterval->minValue)); + } + else + { + appendStringInfo(minHashToken, "NULL"); + } + + if (shardInterval->maxValueExists) + { + appendStringInfo(maxHashToken, "'%d'", DatumGetInt32( + shardInterval->maxValue)); + } + else + { + appendStringInfo(maxHashToken, "NULL"); + } appendStringInfo(insertShardCommand, - "(%s::regclass, %lu, '%c', '%d', '%d')", + "(%s::regclass, %lu, '%c', %s, %s)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, - minHashToken, - maxHashToken); + minHashToken->data, + maxHashToken->data); processedShardCount++; if (processedShardCount != shardCount) @@ -635,6 +670,24 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) } +/* + * PlacementUpsertCommand creates a SQL command for upserting a pg_dist_shard_placment + * entry with the given properties. In the case of a conflict on placementId, the command + * updates all properties (excluding the placementId) with the given ones. + */ +char * +PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, + uint64 shardLength, char *nodeName, uint32 nodePort) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength, + quote_literal_cstr(nodeName), nodePort, placementId); + + return command->data; +} + + /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. @@ -899,3 +952,29 @@ HasMetadataWorkers(void) return false; } + + +/* + * CreateTableMetadataOnWorkers creates the list of commands needed to create the + * given distributed table and sends these commands to all metadata workers i.e. workers + * with hasmetadata=true. Before sending the commands, in order to prevent recursive + * propagation, DDL propagation on workers are disabled with a + * `SET citus.enable_ddl_propagation TO off;` command. + */ +void +CreateTableMetadataOnWorkers(Oid relationId) +{ + List *commandList = GetDistributedTableDDLEvents(relationId); + ListCell *commandCell = NULL; + + /* prevent recursive propagation */ + SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + + /* send the commands one by one */ + foreach(commandCell, commandList) + { + char *command = (char *) lfirst(commandCell); + + SendCommandToWorkers(WORKERS_WITH_METADATA, command); + } +} diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index cde71d763..4ea766ec8 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -364,12 +364,18 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) bool hasShardPlacements = false; WorkerNode *workerNode = NULL; List *referenceTableList = NIL; + uint32 deletedNodeId = INVALID_PLACEMENT_ID; EnsureSchemaNode(); EnsureSuperUser(); workerNode = FindWorkerNode(nodeName, nodePort); + if (workerNode != NULL) + { + deletedNodeId = workerNode->nodeId; + } + DeleteNodeRow(nodeName, nodePort); DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); @@ -410,7 +416,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) } } - nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); + nodeDeleteCommand = NodeDeleteCommand(deletedNodeId); /* make sure we don't have any lingering session lifespan connections */ CloseNodeConnectionsAfterTransaction(nodeName, nodePort); @@ -728,6 +734,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort) NULL, scanKeyCount, scanKey); heapTuple = systable_getnext(heapScan); + if (!HeapTupleIsValid(heapTuple)) { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 3b67919f6..5b8eedfef 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -20,6 +20,7 @@ #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" @@ -54,6 +55,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) List *shardIntervalList = NIL; ShardInterval *shardInterval = NULL; uint64 shardId = INVALID_SHARD_ID; + DistTableCacheEntry *tableEntry = NULL; EnsureSchemaNode(); @@ -67,7 +69,9 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) "create_reference_table('%s');", relationName))); } - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + tableEntry = DistributedTableCacheEntry(relationId); + + if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -76,6 +80,16 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) relationName))); } + if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) + { + char *relationName = get_rel_name(relationId); + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot upgrade to reference table"), + errdetail("Upgrade is only supported for statement-based " + "replicated tables but \"%s\" is streaming replicated", + relationName))); + } + shardIntervalList = LoadShardIntervalList(relationId); if (list_length(shardIntervalList) != 1) { @@ -198,11 +212,18 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId) ReplicateShardToAllWorkers(shardInterval); /* - * After copying the shards, we need to update metadata tables to mark this table as - * reference table. We modify pg_dist_partition, pg_dist_colocation and pg_dist_shard - * tables in ConvertToReferenceTableMetadata function. + * We need to update metadata tables to mark this table as reference table. We modify + * pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in + * ConvertToReferenceTableMetadata function. */ ConvertToReferenceTableMetadata(relationId, shardId); + + /* + * After the table has been officially marked as a reference table, we need to create + * the reference table itself and insert its pg_dist_partition, pg_dist_shard and + * existing pg_dist_shard_placement rows. + */ + CreateTableMetadataOnWorkers(relationId); } @@ -248,18 +269,45 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) nodeName, nodePort, missingWorkerOk); + /* + * Although this function is used for reference tables and reference table shard + * placements always have shardState = FILE_FINALIZED, in case of an upgrade of + * a non-reference table to reference table, unhealty placements may exist. In + * this case, we repair the shard placement and update its state in + * pg_dist_shard_placement table. + */ if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { + uint64 placementId = 0; + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, ddlCommandList); if (targetPlacement == NULL) { - InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, 0, + placementId = GetNextPlacementId(); + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, nodeName, nodePort); } else { - UpdateShardPlacementState(targetPlacement->placementId, FILE_FINALIZED); + placementId = targetPlacement->placementId; + UpdateShardPlacementState(placementId, FILE_FINALIZED); + } + + /* + * Although ReplicateShardToAllWorkers is used only for reference tables, + * during the upgrade phase, the placements are created before the table is + * marked as a reference table. All metadata (including the placement + * metadata) will be copied to workers after all reference table changed + * are finished. + */ + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + char *placementCommand = PlacementUpsertCommand(shardId, placementId, + FILE_FINALIZED, 0, + nodeName, nodePort); + + SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); } } } @@ -354,10 +402,17 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; + uint64 placementId = INVALID_PLACEMENT_ID; + StringInfo deletePlacementCommand = makeStringInfo(); LockShardDistributionMetadata(shardId, ExclusiveLock); - DeleteShardPlacementRow(shardId, workerName, workerPort); + placementId = DeleteShardPlacementRow(shardId, workerName, workerPort); + + appendStringInfo(deletePlacementCommand, + "DELETE FROM pg_dist_shard_placement WHERE placementid=%lu", + placementId); + SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data); } } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 3f7c13dc6..72a2baebf 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -97,6 +97,7 @@ extern bool SchemaNode(void); /* Function declarations local to the distributed module */ extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); +extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName); extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 56e91e5f8..c9fef4e1c 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -31,6 +31,9 @@ extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); +extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, + uint64 shardLength, char *nodeName, uint32 nodePort); +extern void CreateTableMetadataOnWorkers(Oid relationId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" @@ -38,6 +41,16 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)" +#define UPSERT_PLACEMENT "INSERT INTO pg_dist_shard_placement " \ + "(shardid, shardstate, shardlength, " \ + "nodename, nodeport, placementid) " \ + "VALUES (%lu, %d, %lu, %s, %d, %lu) " \ + "ON CONFLICT (placementid) DO UPDATE SET " \ + "shardid = EXCLUDED.shardid, " \ + "shardstate = EXCLUDED.shardstate, " \ + "shardlength = EXCLUDED.shardlength, " \ + "nodename = EXCLUDED.nodename, " \ + "nodeport = EXCLUDED.nodeport" #endif /* METADATA_SYNC_H */ diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 8522ddd0c..64a908322 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -8,6 +8,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000; SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id \gset ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset -- Create the necessary test utility function CREATE FUNCTION master_metadata_snapshot() RETURNS text[] @@ -23,8 +25,8 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; --------------+------------+---------+--------------+---------- (0 rows) --- Show that, with no MX tables, metadata snapshot contains only the delete commands and --- pg_dist_node entries +-- Show that, with no MX tables, metadata snapshot contains only the delete commands, +-- pg_dist_node entries and reference tables SELECT unnest(master_metadata_snapshot()); unnest ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- @@ -1120,7 +1122,7 @@ SELECT create_distributed_table('mx_table', 'a'); SELECT master_add_node('localhost', :worker_2_port); master_add_node --------------------------------- - (3,3,localhost,57638,default,f) + (4,4,localhost,57638,default,f) (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -1147,20 +1149,20 @@ SELECT * FROM mx_table ORDER BY a; \c - mx_user - :worker_2_port SELECT nextval('mx_table_b_seq'); - nextval ------------------ - 844424930131969 + nextval +------------------ + 1125899906842625 (1 row) INSERT INTO mx_table (a) VALUES (39); INSERT INTO mx_table (a) VALUES (40); SELECT * FROM mx_table ORDER BY a; - a | b -----+----------------- - 37 | 281474976710658 - 38 | 281474976710659 - 39 | 844424930131970 - 40 | 844424930131971 + a | b +----+------------------ + 37 | 281474976710658 + 38 | 281474976710659 + 39 | 1125899906842626 + 40 | 1125899906842627 (4 rows) \c - mx_user - :master_port @@ -1181,12 +1183,161 @@ DROP USER mx_user; DROP USER mx_user; \c - - - :worker_2_port DROP USER mx_user; +-- Check that create_reference_table creates the metadata on workers +\c - - - :master_port +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); + create_reference_table +------------------------ + +(1 row) + +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + +\c - - - :worker_1_port +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + +SELECT + logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport +FROM + pg_dist_partition + NATURAL JOIN pg_dist_shard + NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_ref'::regclass; + logicalrelid | partmethod | repmodel | shardid | placementid | nodename | nodeport +--------------+------------+----------+---------+-------------+-----------+---------- + mx_ref | n | t | 1310183 | 100184 | localhost | 57638 + mx_ref | n | t | 1310183 | 100183 | localhost | 57637 +(2 rows) + + +SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset +-- Check that DDL commands are propagated to reference tables on workers +\c - - - :master_port +ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +CREATE INDEX mx_ref_index ON mx_ref(col_1); +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + col_3 | numeric | default 0 +Indexes: + "mx_ref_index" btree (col_1) + +\c - - - :worker_1_port +\d mx_ref + Table "public.mx_ref" + Column | Type | Modifiers +--------+---------+----------- + col_1 | integer | + col_2 | text | + col_3 | numeric | default 0 +Indexes: + "mx_ref_index" btree (col_1) + + +-- Check that metada is cleaned successfully upon drop table +\c - - - :master_port +DROP TABLE mx_ref; +\d mx_ref +\c - - - :worker_1_port +\d mx_ref +SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +--------------+---------+--------------+---------------+--------------- +(0 rows) + +SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- +(0 rows) + +-- Check that master_add_node propagates the metadata about new placements of a reference table +\c - - - :master_port +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); + create_reference_table +------------------------ + +(1 row) + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 +(1 row) + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 +(1 row) + +\c - - - :master_port +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "mx_ref" to all workers + master_add_node +--------------------------------- + (5,5,localhost,57638,default,f) +(1 row) + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 + 1310184 | localhost | 57638 +(2 rows) + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 + 1310184 | localhost | 57638 +(2 rows) + +\c - - - :master_port +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; +DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- @@ -1202,5 +1353,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.multi_shard_commit_protocol; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index aecaf0191..c8308dc38 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -1018,6 +1018,9 @@ ORDER BY s.logicalrelid, sp.shardstate; reference_failure_test | 1 | 2 (1 row) +-- connect back to the worker and set rename the test_user back +\c - :default_user - :worker_1_port +ALTER USER test_user_new RENAME TO test_user; -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index e10a80f06..17a81a35b 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -10,6 +10,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; -- create copy of pg_dist_shard_placement to reload after the test CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + -- remove non-existing node SELECT master_remove_node('localhost', 55555); ERROR: could not find valid entry for node "localhost:55555" @@ -78,6 +85,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); master_remove_node -------------------- @@ -112,6 +140,25 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- remove same node twice SELECT master_remove_node('localhost', :worker_2_port); ERROR: could not find valid entry for node "localhost:57638" @@ -153,6 +200,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node @@ -190,6 +258,26 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port -- remove node in a transaction and COMMIT -- status before master_remove_node SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; @@ -220,6 +308,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); master_remove_node @@ -256,6 +365,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -294,6 +423,27 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port + BEGIN; INSERT INTO remove_node_reference_table VALUES(1); SELECT master_remove_node('localhost', :worker_2_port); @@ -338,6 +488,32 @@ SELECT * FROM remove_node_reference_table; 1 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +SELECT * FROM remove_node_reference_table; + column1 +--------- + 1 +(1 row) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -376,6 +552,26 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 +(1 row) + + +\c - - - :master_port BEGIN; ALTER TABLE remove_node_reference_table ADD column2 int; NOTICE: using one-phase commit for distributed DDL commands @@ -415,6 +611,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- verify table structure is changed \d remove_node_reference_table Table "public.remove_node_reference_table" @@ -485,6 +701,28 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) + +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); master_remove_node -------------------- @@ -519,6 +757,26 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -561,6 +819,28 @@ WHERE colocationid IN 1380000 | 1 | 2 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 1 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1380000 | 1 | 0 | localhost | 57638 + 1380001 | 1 | 0 | localhost | 57638 +(2 rows) + + +\c - - - :master_port + SELECT master_disable_node('localhost', :worker_2_port); master_disable_node --------------------- @@ -595,6 +875,25 @@ WHERE colocationid IN 1380000 | 1 | 1 | 0 (1 row) +\c - - - :worker_1_port +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + count +------- + 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+----------+---------- +(0 rows) + + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers @@ -608,6 +907,12 @@ NOTICE: Replicating reference table "table1" to all workers DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table_schema.table1; DROP SCHEMA remove_node_reference_table_schema CASCADE; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index e9d21e600..3c4e6147f 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -360,6 +360,8 @@ SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); (1 row) +-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass; CREATE TABLE replicate_reference_table_reference_two(column1 int); -- status before master_add_node SELECT @@ -388,11 +390,12 @@ SELECT FROM pg_dist_partition WHERE - logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') +ORDER BY logicalrelid; logicalrelid | partmethod | colocationid | repmodel -----------------------------------------+------------+--------------+---------- replicate_reference_table_reference_one | n | 1370004 | t - replicate_reference_table_hash | h | 1370005 | s + replicate_reference_table_hash | h | 1370005 | c (2 rows) BEGIN; @@ -448,7 +451,9 @@ SELECT FROM pg_dist_partition WHERE - logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') +ORDER BY + logicalrelid; logicalrelid | partmethod | colocationid | repmodel -----------------------------------------+------------+--------------+---------- replicate_reference_table_reference_one | n | 1370004 | t diff --git a/src/test/regress/expected/multi_truncate.out b/src/test/regress/expected/multi_truncate.out index 0e17df7de..7f7349d4f 100644 --- a/src/test/regress/expected/multi_truncate.out +++ b/src/test/regress/expected/multi_truncate.out @@ -46,7 +46,7 @@ SELECT master_create_empty_shard('test_truncate_append'); (1 row) -- verify 3 shards are presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid; shardid --------- 1210000 @@ -113,7 +113,7 @@ SELECT count(*) FROM test_truncate_range; (1 row) -- verify 3 shards are presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid; shardid --------- 1210003 @@ -130,7 +130,7 @@ SELECT count(*) FROM test_truncate_range; (1 row) -- verify 3 shards are still present -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid; shardid --------- 1210003 @@ -190,7 +190,7 @@ SELECT count(*) FROM test_truncate_hash; (1 row) -- verify 4 shards are present -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid; shardid --------- (0 rows) @@ -221,7 +221,7 @@ SELECT count(*) FROM test_truncate_hash; (1 row) -- verify 4 shards are still presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid; shardid --------- 1210006 diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 1d4be75d6..aea3b2b89 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -28,6 +28,13 @@ SELECT create_distributed_table('mx_table_2', 'col_1'); (1 row) +CREATE TABLE mx_ref_table (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref_table'); + create_reference_table +------------------------ + +(1 row) + -- Check that the created tables are colocated MX tables SELECT logicalrelid, repmodel, colocationid FROM pg_dist_partition @@ -46,6 +53,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +INSERT INTO mx_ref_table VALUES (-37, 'morbi'); +INSERT INTO mx_ref_table VALUES (-78, 'sapien'); +INSERT INTO mx_ref_table VALUES (-34, 'augue'); SELECT * FROM mx_table ORDER BY col_1; col_1 | col_2 | col_3 -------+----------+------- @@ -98,6 +108,38 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; 5 (1 row) +-- INSERT/UPDATE/DELETE/COPY on reference tables +SELECT * FROM mx_ref_table ORDER BY col_1; + col_1 | col_2 +-------+-------- + -78 | sapien + -37 | morbi + -34 | augue +(3 rows) + +INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +DELETE FROM mx_ref_table WHERE col_1 = -78; +ERROR: cannot perform distributed planning for the given modification +DETAIL: Modifications to reference tables are supported only from the schema node. +COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT * FROM mx_ref_table ORDER BY col_1; + col_1 | col_2 +-------+-------- + -78 | sapien + -37 | morbi + -34 | augue +(3 rows) + +\c - - - :master_port +DROP TABLE mx_ref_table; +\c - - - :worker_1_port -- DDL commands \d mx_table Table "public.mx_table" diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index 954e57ff8..cb1719cd5 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -12,6 +12,7 @@ SELECT upgrade_to_reference_table('upgrade_reference_table_local'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_local" is not distributed. HINT: Instead, you can use; create_reference_table('upgrade_reference_table_local'); +DROP TABLE upgrade_reference_table_local; -- test with table which has more than one shard SET citus.shard_count TO 4; CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); @@ -24,6 +25,7 @@ SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'colum SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_multiple_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. +DROP TABLE upgrade_reference_table_multiple_shard; -- test with table which has no shard CREATE TABLE upgrade_reference_table_no_shard(column1 int); SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); @@ -35,6 +37,7 @@ SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', ' SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_no_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. +DROP TABLE upgrade_reference_table_no_shard; -- test with table with foreign keys SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; @@ -52,12 +55,17 @@ SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1' (1 row) +-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass; +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_referenced" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_referencing" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. +DROP TABLE upgrade_reference_table_referencing; +DROP TABLE upgrade_reference_table_referenced; -- test with no healthy placements CREATE TABLE upgrade_reference_table_unhealthy(column1 int); SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); @@ -66,9 +74,11 @@ SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass; UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); ERROR: could not find any healthy placement for shard 1360006 +DROP TABLE upgrade_reference_table_unhealthy; -- test with table containing composite type CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); \c - - - :worker_1_port @@ -83,9 +93,11 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); ERROR: type "public.upgrade_test_composite_type" does not exist CONTEXT: while executing command on localhost:57638 +DROP TABLE upgrade_reference_table_composite; -- test with reference table CREATE TABLE upgrade_reference_table_reference(column1 int); SELECT create_reference_table('upgrade_reference_table_reference'); @@ -97,6 +109,7 @@ SELECT create_reference_table('upgrade_reference_table_reference'); SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); ERROR: cannot upgrade to reference table DETAIL: Relation "upgrade_reference_table_reference" is already a reference table +DROP TABLE upgrade_reference_table_reference; -- test valid cases, append distributed table CREATE TABLE upgrade_reference_table_append(column1 int); SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append'); @@ -166,7 +179,7 @@ WHERE logicalrelid = 'upgrade_reference_table_append'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1360002 | t + n | t | 1360005 | t (1 row) SELECT @@ -188,7 +201,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360002 | 1 | 2 | 0 + 1360005 | 1 | 2 | 0 (1 row) SELECT @@ -206,6 +219,8 @@ ORDER BY 1360009 | 1 | 0 | localhost | 57638 (2 rows) + +DROP TABLE upgrade_reference_table_append; -- test valid cases, shard exists at one worker CREATE TABLE upgrade_reference_table_one_worker(column1 int); SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); @@ -214,6 +229,7 @@ SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1') (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass; -- situation before upgrade_reference_table SELECT partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel @@ -223,7 +239,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360001 | s + h | f | 1360006 | c (1 row) SELECT @@ -245,7 +261,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360001 | 1 | 1 | 23 + 1360006 | 1 | 1 | 23 (1 row) SELECT @@ -275,7 +291,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1360002 | t + n | t | 1360007 | t (1 row) SELECT @@ -297,7 +313,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360002 | 1 | 2 | 0 + 1360007 | 1 | 2 | 0 (1 row) SELECT @@ -315,6 +331,8 @@ ORDER BY 1360010 | 1 | 0 | localhost | 57638 (2 rows) + +DROP TABLE upgrade_reference_table_one_worker; -- test valid cases, shard exists at both workers but one is unhealthy SET citus.shard_replication_factor TO 2; CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); @@ -334,7 +352,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360003 | c + h | f | 1360008 | c (1 row) SELECT @@ -356,7 +374,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360003 | 1 | 2 | 23 + 1360008 | 1 | 2 | 23 (1 row) SELECT @@ -389,7 +407,7 @@ WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1360002 | t + n | t | 1360009 | t (1 row) SELECT @@ -411,7 +429,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360002 | 1 | 2 | 0 + 1360009 | 1 | 2 | 0 (1 row) SELECT @@ -429,6 +447,8 @@ ORDER BY 1360011 | 1 | 0 | localhost | 57638 (2 rows) + +DROP TABLE upgrade_reference_table_one_unhealthy; -- test valid cases, shard exists at both workers and both are healthy CREATE TABLE upgrade_reference_table_both_healthy(column1 int); SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1'); @@ -446,7 +466,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360004 | c + h | f | 1360010 | c (1 row) SELECT @@ -468,7 +488,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360004 | 1 | 2 | 23 + 1360010 | 1 | 2 | 23 (1 row) SELECT @@ -501,7 +521,7 @@ WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1360002 | t + n | t | 1360011 | t (1 row) SELECT @@ -523,7 +543,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360002 | 1 | 2 | 0 + 1360011 | 1 | 2 | 0 (1 row) SELECT @@ -541,6 +561,8 @@ ORDER BY 1360012 | 1 | 0 | localhost | 57638 (2 rows) + +DROP TABLE upgrade_reference_table_both_healthy; -- test valid cases, do it in transaction and ROLLBACK SET citus.shard_replication_factor TO 1; CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); @@ -550,6 +572,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass; -- situation before upgrade_reference_table SELECT partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel @@ -559,7 +582,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360001 | s + h | f | 1360012 | c (1 row) SELECT @@ -581,7 +604,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360001 | 1 | 1 | 23 + 1360012 | 1 | 1 | 23 (1 row) SELECT @@ -613,7 +636,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360001 | s + h | f | 1360012 | c (1 row) SELECT @@ -627,6 +650,9 @@ WHERE 1360013 | f | f (1 row) +-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation +VACUUM ANALYZE pg_dist_colocation; + SELECT * FROM pg_dist_colocation WHERE colocationid IN @@ -635,7 +661,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360001 | 1 | 1 | 23 + 1360012 | 1 | 1 | 23 (1 row) SELECT @@ -650,6 +676,8 @@ WHERE shardid IN 1360013 | 1 | 0 | localhost | 57637 (1 row) + +DROP TABLE upgrade_reference_table_transaction_rollback; -- test valid cases, do it in transaction and COMMIT SET citus.shard_replication_factor TO 1; CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); @@ -659,6 +687,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'c (1 row) +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass; -- situation before upgrade_reference_table SELECT partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel @@ -668,7 +697,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - h | f | 1360001 | s + h | f | 1360014 | c (1 row) SELECT @@ -690,7 +719,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360001 | 1 | 1 | 23 + 1360014 | 1 | 1 | 23 (1 row) SELECT @@ -722,7 +751,7 @@ WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; partmethod | partkeyisnull | colocationid | repmodel ------------+---------------+--------------+---------- - n | t | 1360002 | t + n | t | 1360015 | t (1 row) SELECT @@ -744,7 +773,7 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1360002 | 1 | 2 | 0 + 1360015 | 1 | 2 | 0 (1 row) SELECT @@ -771,19 +800,286 @@ Table "public.upgrade_reference_table_transaction_commit_1360014" column1 | integer | \c - - - :master_port --- drop used tables to clean the workspace -DROP TABLE upgrade_reference_table_local; -DROP TABLE upgrade_reference_table_multiple_shard; -DROP TABLE upgrade_reference_table_no_shard; -DROP TABLE upgrade_reference_table_referencing; -DROP TABLE upgrade_reference_table_referenced; -DROP TABLE upgrade_reference_table_unhealthy; -DROP TABLE upgrade_reference_table_composite; -DROP TYPE upgrade_test_composite_type; -DROP TABLE upgrade_reference_table_reference; -DROP TABLE upgrade_reference_table_append; -DROP TABLE upgrade_reference_table_one_worker; -DROP TABLE upgrade_reference_table_one_unhealthy; -DROP TABLE upgrade_reference_table_both_healthy; -DROP TABLE upgrade_reference_table_transaction_rollback; DROP TABLE upgrade_reference_table_transaction_commit; +-- create an mx table +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_mx(column1 int); +SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +-- verify that streaming replicated tables cannot be upgraded to reference tables +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 1360016 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360015 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1360016 | 1 | 1 | 23 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360015 | 1 | 0 | localhost | 57637 +(1 row) + + +SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); +ERROR: cannot upgrade to reference table +DETAIL: Upgrade is only supported for statement-based replicated tables but "upgrade_reference_table_mx" is streaming replicated + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 1360016 | s +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360015 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1360016 | 1 | 1 | 23 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360015 | 1 | 0 | localhost | 57637 +(1 row) + +DROP TABLE upgrade_reference_table_mx; +-- test valid cases, do it with MX +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +CREATE TABLE upgrade_reference_table_mx(column1 int); +SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE nodeport = :worker_2_port AND + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass); + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + h | f | 1360017 | c +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360016 | f | f +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1360017 | 1 | 2 | 23 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360016 | 1 | 0 | localhost | 57637 + 1360016 | 3 | 0 | localhost | 57638 +(2 rows) + + +SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); + upgrade_to_reference_table +---------------------------- + +(1 row) + + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 1360018 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360016 | t | t +(1 row) + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1360018 | 1 | 2 | 0 +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360016 | 1 | 0 | localhost | 57637 + 1360016 | 1 | 0 | localhost | 57638 +(2 rows) + + +-- situation on metadata worker +\c - - - :worker_1_port +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + partmethod | partkeyisnull | colocationid | repmodel +------------+---------------+--------------+---------- + n | t | 1360018 | t +(1 row) + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + shardid | shardminvalueisnull | shardmaxvalueisnull +---------+---------------------+--------------------- + 1360016 | t | t +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1360016 | 1 | 0 | localhost | 57637 + 1360016 | 1 | 0 | localhost | 57638 +(2 rows) + + +\c - - - :master_port +DROP TABLE upgrade_reference_table_mx; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 61df18fc3..394397d32 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -13,6 +13,9 @@ SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_pla \gset ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset + -- Create the necessary test utility function CREATE FUNCTION master_metadata_snapshot() RETURNS text[] @@ -25,8 +28,8 @@ COMMENT ON FUNCTION master_metadata_snapshot() -- Show that none of the existing tables are qualified to be MX tables SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; --- Show that, with no MX tables, metadata snapshot contains only the delete commands and --- pg_dist_node entries +-- Show that, with no MX tables, metadata snapshot contains only the delete commands, +-- pg_dist_node entries and reference tables SELECT unnest(master_metadata_snapshot()); -- Create a test table with constraints and SERIAL @@ -506,11 +509,83 @@ DROP USER mx_user; \c - - - :worker_2_port DROP USER mx_user; +-- Check that create_reference_table creates the metadata on workers +\c - - - :master_port +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); +\d mx_ref + +\c - - - :worker_1_port +\d mx_ref +SELECT + logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport +FROM + pg_dist_partition + NATURAL JOIN pg_dist_shard + NATURAL JOIN pg_dist_shard_placement +WHERE + logicalrelid = 'mx_ref'::regclass; + +SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset + +-- Check that DDL commands are propagated to reference tables on workers +\c - - - :master_port +ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0; +CREATE INDEX mx_ref_index ON mx_ref(col_1); +\d mx_ref + +\c - - - :worker_1_port +\d mx_ref + +-- Check that metada is cleaned successfully upon drop table +\c - - - :master_port +DROP TABLE mx_ref; +\d mx_ref + +\c - - - :worker_1_port +\d mx_ref +SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid; +SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; + +-- Check that master_add_node propagates the metadata about new placements of a reference table +\c - - - :master_port +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :master_port +SELECT master_add_node('localhost', :worker_2_port); + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :master_port +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; +DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); @@ -518,5 +593,7 @@ RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.multi_shard_commit_protocol; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 8a2497e57..634740cc6 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -758,6 +758,10 @@ AND s.logicalrelid = 'reference_failure_test'::regclass GROUP BY s.logicalrelid, sp.shardstate ORDER BY s.logicalrelid, sp.shardstate; +-- connect back to the worker and set rename the test_user back +\c - :default_user - :worker_1_port +ALTER USER test_user_new RENAME TO test_user; + -- connect back to the master with the proper user to continue the tests \c - :default_user - :master_port DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 5ab58e978..37d9f8aa3 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -14,6 +14,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +-- make worker 1 receive metadata changes +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- remove non-existing node SELECT master_remove_node('localhost', 55555); @@ -52,6 +54,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); @@ -72,6 +87,18 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- remove same node twice SELECT master_remove_node('localhost', :worker_2_port); @@ -97,6 +124,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); @@ -119,6 +159,18 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- remove node in a transaction and COMMIT @@ -138,6 +190,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port BEGIN; SELECT master_remove_node('localhost', :worker_2_port); @@ -159,6 +224,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -182,6 +260,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + BEGIN; INSERT INTO remove_node_reference_table VALUES(1); SELECT master_remove_node('localhost', :worker_2_port); @@ -207,6 +298,21 @@ WHERE colocationid IN --verify the data is inserted SELECT * FROM remove_node_reference_table; +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +SELECT * FROM remove_node_reference_table; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -230,6 +336,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + BEGIN; ALTER TABLE remove_node_reference_table ADD column2 int; SELECT master_remove_node('localhost', :worker_2_port); @@ -251,6 +370,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port -- verify table structure is changed \d remove_node_reference_table @@ -289,6 +421,19 @@ WHERE colocationid IN (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); + +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port SELECT master_remove_node('localhost', :worker_2_port); @@ -309,6 +454,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -334,6 +492,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + SELECT master_disable_node('localhost', :worker_2_port); -- status after master_disable_node @@ -353,6 +524,19 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'remove_node_reference_table'::regclass); +\c - - - :worker_1_port + +SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + nodeport = :worker_2_port; + +\c - - - :master_port + -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); @@ -362,6 +546,7 @@ DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table_schema.table1; DROP SCHEMA remove_node_reference_table_schema CASCADE; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 330e62fee..39931af95 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -231,6 +231,9 @@ SET citus.shard_replication_factor TO 1; CREATE TABLE replicate_reference_table_hash(column1 int); SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); +-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass; + CREATE TABLE replicate_reference_table_reference_two(column1 int); -- status before master_add_node @@ -253,7 +256,8 @@ SELECT FROM pg_dist_partition WHERE - logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') +ORDER BY logicalrelid; BEGIN; SELECT master_add_node('localhost', :worker_2_port); @@ -283,7 +287,9 @@ SELECT FROM pg_dist_partition WHERE - logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); + logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') +ORDER BY + logicalrelid; DROP TABLE replicate_reference_table_reference_one; DROP TABLE replicate_reference_table_hash; diff --git a/src/test/regress/sql/multi_truncate.sql b/src/test/regress/sql/multi_truncate.sql index dcd5ea306..b1a7399ff 100644 --- a/src/test/regress/sql/multi_truncate.sql +++ b/src/test/regress/sql/multi_truncate.sql @@ -31,7 +31,7 @@ SELECT master_create_empty_shard('test_truncate_append'); SELECT master_create_empty_shard('test_truncate_append'); -- verify 3 shards are presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid; TRUNCATE TABLE test_truncate_append; @@ -79,7 +79,7 @@ INSERT INTO test_truncate_range values (100); SELECT count(*) FROM test_truncate_range; -- verify 3 shards are presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid; TRUNCATE TABLE test_truncate_range; @@ -87,7 +87,7 @@ TRUNCATE TABLE test_truncate_range; SELECT count(*) FROM test_truncate_range; -- verify 3 shards are still present -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid; -- verify that truncate can be aborted INSERT INTO test_truncate_range VALUES (1); @@ -117,7 +117,7 @@ INSERT INTO test_truncate_hash values (100); SELECT count(*) FROM test_truncate_hash; -- verify 4 shards are present -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid; TRUNCATE TABLE test_truncate_hash; @@ -136,7 +136,7 @@ TRUNCATE TABLE test_truncate_hash; SELECT count(*) FROM test_truncate_hash; -- verify 4 shards are still presents -SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; +SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid; -- verify that truncate can be aborted INSERT INTO test_truncate_hash VALUES (1); diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index e629c2926..ceac8b0f4 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -25,6 +25,9 @@ SELECT create_distributed_table('mx_table', 'col_1'); CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); SELECT create_distributed_table('mx_table_2', 'col_1'); +CREATE TABLE mx_ref_table (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref_table'); + -- Check that the created tables are colocated MX tables SELECT logicalrelid, repmodel, colocationid FROM pg_dist_partition @@ -41,6 +44,10 @@ COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); 65832, 'amet' \. +INSERT INTO mx_ref_table VALUES (-37, 'morbi'); +INSERT INTO mx_ref_table VALUES (-78, 'sapien'); +INSERT INTO mx_ref_table VALUES (-34, 'augue'); + SELECT * FROM mx_table ORDER BY col_1; -- Try commands from metadata worker @@ -73,6 +80,18 @@ INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; +-- INSERT/UPDATE/DELETE/COPY on reference tables +SELECT * FROM mx_ref_table ORDER BY col_1; +INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum'); +UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37; +DELETE FROM mx_ref_table WHERE col_1 = -78; +COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +SELECT * FROM mx_ref_table ORDER BY col_1; + +\c - - - :master_port +DROP TABLE mx_ref_table; +\c - - - :worker_1_port + -- DDL commands \d mx_table CREATE INDEX mx_test_index ON mx_table(col_1); diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index 90f73eb03..210c09662 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -11,17 +11,20 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); SELECT upgrade_to_reference_table('upgrade_reference_table_local'); +DROP TABLE upgrade_reference_table_local; -- test with table which has more than one shard SET citus.shard_count TO 4; CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1'); SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); +DROP TABLE upgrade_reference_table_multiple_shard; -- test with table which has no shard CREATE TABLE upgrade_reference_table_no_shard(column1 int); SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); +DROP TABLE upgrade_reference_table_no_shard; -- test with table with foreign keys SET citus.shard_count TO 1; @@ -32,14 +35,23 @@ SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1') CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1)); SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'); +-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass; +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass; + SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); +DROP TABLE upgrade_reference_table_referencing; +DROP TABLE upgrade_reference_table_referenced; + -- test with no healthy placements CREATE TABLE upgrade_reference_table_unhealthy(column1 int); SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass; UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); +DROP TABLE upgrade_reference_table_unhealthy; -- test with table containing composite type CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); @@ -52,12 +64,15 @@ SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type); SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); +DROP TABLE upgrade_reference_table_composite; -- test with reference table CREATE TABLE upgrade_reference_table_reference(column1 int); SELECT create_reference_table('upgrade_reference_table_reference'); SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); +DROP TABLE upgrade_reference_table_reference; -- test valid cases, append distributed table CREATE TABLE upgrade_reference_table_append(column1 int); @@ -133,10 +148,13 @@ WHERE shardid IN WHERE logicalrelid = 'upgrade_reference_table_append'::regclass) ORDER BY nodeport; + +DROP TABLE upgrade_reference_table_append; -- test valid cases, shard exists at one worker CREATE TABLE upgrade_reference_table_one_worker(column1 int); SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass; -- situation before upgrade_reference_table SELECT @@ -201,6 +219,8 @@ WHERE shardid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass) ORDER BY nodeport; + +DROP TABLE upgrade_reference_table_one_worker; -- test valid cases, shard exists at both workers but one is unhealthy SET citus.shard_replication_factor TO 2; @@ -273,6 +293,8 @@ WHERE shardid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass) ORDER BY nodeport; + +DROP TABLE upgrade_reference_table_one_unhealthy; -- test valid cases, shard exists at both workers and both are healthy CREATE TABLE upgrade_reference_table_both_healthy(column1 int); @@ -343,11 +365,14 @@ WHERE shardid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass) ORDER BY nodeport; + +DROP TABLE upgrade_reference_table_both_healthy; -- test valid cases, do it in transaction and ROLLBACK SET citus.shard_replication_factor TO 1; CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass; -- situation before upgrade_reference_table SELECT @@ -396,8 +421,11 @@ SELECT FROM pg_dist_shard WHERE - logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; + +-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation +VACUUM ANALYZE pg_dist_colocation; + SELECT * FROM pg_dist_colocation WHERE colocationid IN @@ -412,11 +440,14 @@ WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + +DROP TABLE upgrade_reference_table_transaction_rollback; -- test valid cases, do it in transaction and COMMIT SET citus.shard_replication_factor TO 1; CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1'); +UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass; -- situation before upgrade_reference_table SELECT @@ -489,20 +520,185 @@ ORDER BY \d upgrade_reference_table_transaction_commit_* \c - - - :master_port - --- drop used tables to clean the workspace -DROP TABLE upgrade_reference_table_local; -DROP TABLE upgrade_reference_table_multiple_shard; -DROP TABLE upgrade_reference_table_no_shard; -DROP TABLE upgrade_reference_table_referencing; -DROP TABLE upgrade_reference_table_referenced; -DROP TABLE upgrade_reference_table_unhealthy; -DROP TABLE upgrade_reference_table_composite; -DROP TYPE upgrade_test_composite_type; -DROP TABLE upgrade_reference_table_reference; -DROP TABLE upgrade_reference_table_append; -DROP TABLE upgrade_reference_table_one_worker; -DROP TABLE upgrade_reference_table_one_unhealthy; -DROP TABLE upgrade_reference_table_both_healthy; -DROP TABLE upgrade_reference_table_transaction_rollback; DROP TABLE upgrade_reference_table_transaction_commit; + +-- create an mx table +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +CREATE TABLE upgrade_reference_table_mx(column1 int); +SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); + +-- verify that streaming replicated tables cannot be upgraded to reference tables +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + + +SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); + + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + +DROP TABLE upgrade_reference_table_mx; + +-- test valid cases, do it with MX +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 2; +CREATE TABLE upgrade_reference_table_mx(column1 int); +SELECT create_distributed_table('upgrade_reference_table_mx', 'column1'); +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE nodeport = :worker_2_port AND + shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass); + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +-- situation before upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + + +SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); + + +-- situation after upgrade_reference_table +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT * +FROM pg_dist_colocation +WHERE colocationid IN + (SELECT colocationid + FROM pg_dist_partition + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + +-- situation on metadata worker +\c - - - :worker_1_port +SELECT + partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel +FROM + pg_dist_partition +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull +FROM + pg_dist_shard +WHERE + logicalrelid = 'upgrade_reference_table_mx'::regclass; + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM pg_dist_shard_placement +WHERE shardid IN + (SELECT shardid + FROM pg_dist_shard + WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) +ORDER BY nodeport; + +\c - - - :master_port +DROP TABLE upgrade_reference_table_mx; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +