diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 541358ca5..ab0907c7f 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,8 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 6.1-18 \ + 6.1-19 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -129,9 +130,12 @@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16. cat $^ > $@ $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql cat $^ > $@ +$(EXTENSION)--6.1-18.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.1-18.sql + cat $^ > $@ +$(EXTENSION)--6.1-19.sql: $(EXTENSION)--6.1-18.sql $(EXTENSION)--6.1-18--6.1-19.sql + cat $^ > $@ NO_PGXS = 1 - SHLIB_LINK = $(libpq) include $(citus_top_builddir)/Makefile.global diff --git a/src/backend/distributed/citus--6.1-17--6.1-18.sql b/src/backend/distributed/citus--6.1-17--6.1-18.sql new file mode 100644 index 000000000..6ae1b101a --- /dev/null +++ b/src/backend/distributed/citus--6.1-17--6.1-18.sql @@ -0,0 +1,22 @@ +/* citus--6.1-17--6.1-18.sql */ + +SET search_path = 'pg_catalog'; + +DROP FUNCTION IF EXISTS master_add_node(text, integer); + +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + activate_node boolean DEFAULT TRUE, + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer, activate_node boolean) + IS 'add node to the cluster'; + +RESET search_path; diff --git a/src/backend/distributed/citus--6.1-18--6.1-19.sql b/src/backend/distributed/citus--6.1-18--6.1-19.sql new file mode 100644 index 000000000..72be1cbcf --- /dev/null +++ b/src/backend/distributed/citus--6.1-18--6.1-19.sql @@ -0,0 +1,13 @@ +/* citus--6.1-18--6.1-19.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION master_activate_node(nodename text, + nodeport integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_activate_node$$; +COMMENT ON FUNCTION master_activate_node(nodename text, nodeport integer) + IS 'add node to the cluster'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 57d2534ec..0aeb489d8 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-17' +default_version = '6.1-19' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 361ba7081..bf11e51f7 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -131,6 +131,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; EnsureSchemaNode(); + EnsureAllNodesActivated(); /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) @@ -193,6 +194,8 @@ create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); + EnsureAllNodesActivated(); + CreateReferenceTable(relationId); PG_RETURN_VOID(); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 880abac71..aa23d33e5 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -32,6 +32,7 @@ #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" @@ -67,6 +68,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) Oid distributedTableId = ResolveRelationId(tableNameText); EnsureSchemaNode(); + EnsureAllNodesActivated(); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 789e2b816..7bc51c4a2 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -34,6 +34,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" +#include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" @@ -90,6 +91,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) EnsureTablePermissions(relationId, ACL_INSERT); CheckDistributedTable(relationId); + EnsureAllNodesActivated(); /* * We check whether the table is a foreign table or not. If it is, we set diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b69ec3139..f466366eb 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -35,6 +35,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_node.h" +#include "distributed/reference_table_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "foreign/foreign.h" @@ -84,6 +85,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) List *createMetadataCommandList = NIL; EnsureSchemaNode(); + EnsureAllNodesActivated(); EnsureSuperUser(); PreventTransactionChain(true, "start_metadata_sync_to_node"); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 169510dc4..de855213d 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -51,6 +51,7 @@ int GroupSize = 1; /* local function forward declarations */ +static void ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove); static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); @@ -70,17 +71,20 @@ PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(master_initialize_node_metadata); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); +PG_FUNCTION_INFO_V1(master_activate_node); /* * master_add_node function adds a new node to the cluster and returns its data. It also - * replicates all reference tables to the new node. + * replicates all reference tables to the new node if replicateReferenceTables is true. */ Datum master_add_node(PG_FUNCTION_ARGS) { text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); + bool activateNode = PG_GETARG_BOOL(2); + char *nodeNameString = text_to_cstring(nodeName); int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; @@ -96,9 +100,9 @@ master_add_node(PG_FUNCTION_ARGS) * reference tables to all nodes however, it skips nodes which already has healthy * placement of particular reference table. */ - if (!nodeAlreadyExists) + if (!nodeAlreadyExists && activateNode) { - ReplicateAllReferenceTablesToAllNodes(); + ActivateNode(nodeNameString, nodePort); } PG_RETURN_CSTRING(returnData); @@ -262,6 +266,43 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) } +/* + * For the time-being master_activate_node is a wrapper around + * ReplicateAllReferenceTablesToNode(). + */ +Datum +master_activate_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + + char *nodeNameString = text_to_cstring(nodeName); + WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); + + if (workerNode == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("cannot replicate reference tables to a " + "non-existing node")))); + } + + ActivateNode(nodeNameString, nodePort); + + PG_RETURN_VOID(); +} + + +/* + * ActivateNode activates the node with nodeName and nodePort. Currently, activation + * includes only replicating the reference tables. + */ +static void +ActivateNode(char *nodeName, int nodePort) +{ + ReplicateAllReferenceTablesToNode(nodeName, nodePort); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 5b8eedfef..d10021787 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -36,7 +36,11 @@ /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); +static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, + int nodePort); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); +static void EnsureReferenceTablesReplicatedToAllNodes(void); +static WorkerNode * FindUnderReplicatedWorkerNode(List *finalizedShardPlacementList); static int CompareOids(const void *leftElement, const void *rightElement); /* exports for SQL callable functions */ @@ -114,22 +118,46 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) /* - * ReplicateAllReferenceTablesToAllNodes function finds all reference tables and - * replicates them to all worker nodes. It also modifies pg_dist_colocation table to - * update the replication factor column. This function skips a worker node if that node - * already has healthy placement of a particular reference table to prevent unnecessary - * data transfer. + * ReplicateAllReferenceTablesToAllNodes replicates all reference tables to all worker + * nodes. It also modifies pg_dist_colocation table to update the replication factor + * column. This function skips a worker node if that node already has healthy placement + * of a particular reference table to prevent unnecessary data transfer. */ void ReplicateAllReferenceTablesToAllNodes() { - List *referenceTableList = ReferenceTableOidList(); - ListCell *referenceTableCell = NULL; - Relation pgDistNode = NULL; List *workerNodeList = NIL; - int workerCount = 0; + ListCell *workerNodeCell = NULL; + /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ + pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + workerNodeList = WorkerNodeList(); + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + + ReplicateAllReferenceTablesToNode(workerNode->workerName, workerNode->workerPort); + } + + heap_close(pgDistNode, NoLock); +} + + +/* + * ReplicateAllReferenceTablesToNode function finds all reference tables and + * replicates them the given worker node. It also modifies pg_dist_colocation table to + * update the replication factor column when necessary. This function skips a worker node + * if that node already has healthy placement of a particular reference table to prevent + * unnecessary data transfer. + */ +void +ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) +{ + List *referenceTableList = ReferenceTableOidList(); + ListCell *referenceTableCell = NULL; + List *workerNodeList = WorkerNodeList(); + uint32 workerCount = 0; Oid firstReferenceTableId = InvalidOid; uint32 referenceTableColocationId = INVALID_COLOCATION_ID; @@ -139,12 +167,6 @@ ReplicateAllReferenceTablesToAllNodes() return; } - /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ - pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - workerNodeList = WorkerNodeList(); - workerCount = list_length(workerNodeList); - - /* * We sort the reference table list to prevent deadlocks in concurrent * ReplicateAllReferenceTablesToAllNodes calls. @@ -156,14 +178,10 @@ ReplicateAllReferenceTablesToAllNodes() List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; - char *relationName = get_rel_name(referenceTableId); LockShardDistributionMetadata(shardId, ExclusiveLock); - ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", - relationName))); - - ReplicateShardToAllWorkers(shardInterval); + ReplicateShardToNode(shardInterval, nodeName, nodePort); } /* @@ -171,10 +189,10 @@ ReplicateAllReferenceTablesToAllNodes() * colocation group of reference tables so that worker count will be equal to * replication factor again. */ + workerCount = list_length(workerNodeList); firstReferenceTableId = linitial_oid(referenceTableList); referenceTableColocationId = TableColocationId(firstReferenceTableId); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); - heap_close(pgDistNode, NoLock); } @@ -228,25 +246,15 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId) /* - * ReplicateShardToAllWorkers function replicates given shard to the given worker nodes - * in a separate transactions. While replicating, it only replicates the shard to the - * workers which does not have a healthy replica of the shard. This function also modifies - * metadata by inserting/updating related rows in pg_dist_shard_placement. However, this - * function does not obtain any lock on shard resource and shard metadata. It is caller's + * ReplicateShardToAllWorkers function replicates given shard to the all worker nodes + * in separate transactions. While replicating, it only replicates the shard to the + * workers which does not have a healthy replica of the shard. However, this function + * does not obtain any lock on shard resource and shard metadata. It is caller's * responsibility to take those locks. */ static void ReplicateShardToAllWorkers(ShardInterval *shardInterval) { - uint64 shardId = shardInterval->shardId; - List *shardPlacementList = ShardPlacementList(shardId); - bool missingOk = false; - ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); - char *srcNodeName = sourceShardPlacement->nodeName; - uint32 srcNodePort = sourceShardPlacement->nodePort; - char *tableOwner = TableOwner(shardInterval->relationId); - List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); - /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); List *workerNodeList = WorkerNodeList(); @@ -263,59 +271,84 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; - bool missingWorkerOk = true; - ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, - nodeName, nodePort, - missingWorkerOk); - - /* - * Although this function is used for reference tables and reference table shard - * placements always have shardState = FILE_FINALIZED, in case of an upgrade of - * a non-reference table to reference table, unhealty placements may exist. In - * this case, we repair the shard placement and update its state in - * pg_dist_shard_placement table. - */ - if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) - { - uint64 placementId = 0; - - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); - if (targetPlacement == NULL) - { - placementId = GetNextPlacementId(); - InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, - nodeName, nodePort); - } - else - { - placementId = targetPlacement->placementId; - UpdateShardPlacementState(placementId, FILE_FINALIZED); - } - - /* - * Although ReplicateShardToAllWorkers is used only for reference tables, - * during the upgrade phase, the placements are created before the table is - * marked as a reference table. All metadata (including the placement - * metadata) will be copied to workers after all reference table changed - * are finished. - */ - if (ShouldSyncTableMetadata(shardInterval->relationId)) - { - char *placementCommand = PlacementUpsertCommand(shardId, placementId, - FILE_FINALIZED, 0, - nodeName, nodePort); - - SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); - } - } + ReplicateShardToNode(shardInterval, nodeName, nodePort); } heap_close(pgDistNode, NoLock); } +/* + * ReplicateShardToNode function replicates given shard to the given worker node + * in a separate transaction. While replicating, it only replicates the shard to the + * workers which does not have a healthy replica of the shard. This function also modifies + * metadata by inserting/updating related rows in pg_dist_shard_placement. + */ +static void +ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) +{ + uint64 shardId = shardInterval->shardId; + List *shardPlacementList = ShardPlacementList(shardId); + bool missingOk = false; + ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); + char *srcNodeName = sourceShardPlacement->nodeName; + uint32 srcNodePort = sourceShardPlacement->nodePort; + char *tableOwner = TableOwner(shardInterval->relationId); + List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + bool missingWorkerOk = true; + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + nodeName, nodePort, + missingWorkerOk); + + /* + * Although this function is used for reference tables and reference table shard + * placements always have shardState = FILE_FINALIZED, in case of an upgrade of + * a non-reference table to reference table, unhealty placements may exist. In + * this case, we repair the shard placement and update its state in + * pg_dist_shard_placement table. + */ + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) + { + uint64 placementId = 0; + + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", + get_rel_name(shardInterval->relationId), nodeName, + nodePort))); + + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); + if (targetPlacement == NULL) + { + placementId = GetNextPlacementId(); + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, + nodeName, nodePort); + } + else + { + placementId = targetPlacement->placementId; + UpdateShardPlacementState(placementId, FILE_FINALIZED); + } + + /* + * Although ReplicateShardToAllWorkers is used only for reference tables, + * during the upgrade phase, the placements are created before the table is + * marked as a reference table. All metadata (including the placement + * metadata) will be copied to workers after all reference table changed + * are finished. + */ + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + char *placementCommand = PlacementUpsertCommand(shardId, placementId, + FILE_FINALIZED, 0, + nodeName, nodePort); + + SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); + } + } +} + + /* * ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to * reference table metadata. To do this, this function updates pg_dist_partition, @@ -417,6 +450,91 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) } +/* + * Errors out if all nodes in the cluster is not activated. Currently only checks + * whether all reference tables are replicate to all nodes. If not, the function + * errors out. + */ +void +EnsureAllNodesActivated() +{ + EnsureReferenceTablesReplicatedToAllNodes(); +} + + +/* + * EnsureReferenceTablesReplicatedToAllNodes errors out if there exists any + * under-replicated reference tables in the cluster. + */ +static void +EnsureReferenceTablesReplicatedToAllNodes() +{ + List *referenceTableOidList = ReferenceTableOidList(); + ListCell *referenceTableOidCell = NULL; + List *workerNodeList = WorkerNodeList(); + int workerCount = list_length(workerNodeList); + + foreach(referenceTableOidCell, referenceTableOidList) + { + Oid tableId = lfirst_oid(referenceTableOidCell); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); + ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0]; + uint64 shardId = shardInterval->shardId; + List *finalizedShardPlacementList = FinalizedShardPlacementList(shardId); + + if (list_length(finalizedShardPlacementList) != workerCount) + { + WorkerNode *missingWorkerNode = + FindUnderReplicatedWorkerNode(finalizedShardPlacementList); + + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("operation is not allowed because %s:%d is not " + "activated", missingWorkerNode->workerName, + missingWorkerNode->workerPort), + errdetail("At least one of the reference tables is " + "under-replicated: \"%s\"", get_rel_name(tableId)), + errhint("Use SELECT master_activate_node('%s', " + "%d) to replicate under-replicated reference tables " + "to the given node.", + missingWorkerNode->workerName, + missingWorkerNode->workerPort))); + } + } +} + + +/* + * FindUnderReplicatedWorkerNode iterates over worker list and tries to find + * a worker node where finalizedShardPlacementList does not have a placement + * on it. If found the workerNode is returned. Else, the function returns NULL. + * + * This function does an O(n^2) search and thus should be used with caution. + */ +static WorkerNode * +FindUnderReplicatedWorkerNode(List *finalizedShardPlacementList) +{ + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + ShardPlacement *placement = NULL; + bool missinOk = true; + + placement = SearchShardPlacementInList(finalizedShardPlacementList, + workerNode->workerName, + workerNode->workerPort, missinOk); + if (placement == NULL) + { + return workerNode; + } + } + + return NULL; +} + + /* * ReferenceTableOidList function scans pg_dist_partition to create a list of all * reference tables. To create the list, it performs sequential scan. Since it is not diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 5a84c6e36..8ec56f495 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -14,8 +14,10 @@ extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToAllNodes(void); +extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort); +extern void EnsureAllNodesActivated(void); extern List * ReferenceTableOidList(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 294f70af7..6a79b47ed 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -75,6 +75,8 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.1-18'; +ALTER EXTENSION citus UPDATE TO '6.1-19'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 64a908322..efe9d278e 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1286,7 +1286,8 @@ SELECT create_reference_table('mx_ref'); SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 @@ -1295,7 +1296,8 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 @@ -1303,7 +1305,7 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "mx_ref" to all workers +NOTICE: Replicating reference table "mx_ref" to the node localhost:57638 master_add_node --------------------------------- (5,5,localhost,57638,default,f) @@ -1311,7 +1313,8 @@ NOTICE: Replicating reference table "mx_ref" to all workers SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 @@ -1321,7 +1324,8 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; shardid | nodename | nodeport ---------+-----------+---------- 1310184 | localhost | 57637 diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 17a81a35b..dd03dd67d 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -68,7 +68,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -98,7 +100,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -124,7 +128,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -152,7 +158,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -164,7 +172,7 @@ SELECT master_remove_node('localhost', :worker_2_port); ERROR: could not find valid entry for node "localhost:57638" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 master_add_node --------------------------------------------- (1380001,1380001,localhost,57638,default,f) @@ -183,7 +191,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -213,7 +223,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -241,7 +253,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -270,7 +284,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -291,7 +307,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -321,7 +339,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -349,7 +369,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -378,7 +400,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -387,7 +411,7 @@ WHERE \c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 master_add_node --------------------------------------------- (1380002,1380002,localhost,57638,default,f) @@ -406,7 +430,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -435,7 +461,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -465,7 +493,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -500,7 +530,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -516,7 +548,7 @@ SELECT * FROM remove_node_reference_table; \c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 master_add_node --------------------------------------------- (1380003,1380003,localhost,57638,default,f) @@ -535,7 +567,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -564,7 +598,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -595,7 +631,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -624,7 +662,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -641,7 +681,7 @@ Table "public.remove_node_reference_table" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 master_add_node --------------------------------------------- (1380004,1380004,localhost,57638,default,f) @@ -682,8 +722,8 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port -ORDER BY - shardid; +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -714,7 +754,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -769,7 +811,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -779,8 +823,8 @@ WHERE -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers -NOTICE: Replicating reference table "table1" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 +NOTICE: Replicating reference table "table1" to the node localhost:57638 master_add_node --------------------------------------------- (1380005,1380005,localhost,57638,default,f) @@ -801,7 +845,7 @@ FROM WHERE nodeport = :worker_2_port ORDER BY - shardid; + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -831,7 +875,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1380000 | 1 | 0 | localhost | 57638 @@ -859,7 +905,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -887,7 +935,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -896,8 +946,8 @@ WHERE \c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers -NOTICE: Replicating reference table "table1" to all workers +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 +NOTICE: Replicating reference table "table1" to the node localhost:57638 master_add_node --------------------------------------------- (1380006,1380006,localhost,57638,default,f) diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 3c4e6147f..a386f00b4 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -71,7 +71,6 @@ SELECT create_reference_table('replicate_reference_table_unhealthy'); UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers ERROR: could not find any healthy placement for shard 1370000 -- verify node is not added SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; @@ -123,7 +122,7 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers +NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638 master_add_node --------------------------------------------- (1370002,1370002,localhost,57638,default,f) @@ -244,7 +243,7 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers +NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638 master_add_node --------------------------------------------- (1370003,1370003,localhost,57638,default,f) @@ -306,7 +305,7 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers +NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638 master_add_node --------------------------------------------- (1370004,1370004,localhost,57638,default,f) @@ -400,13 +399,14 @@ ORDER BY logicalrelid; BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers +NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638 master_add_node --------------------------------------------- (1370005,1370005,localhost,57638,default,f) (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); +NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -481,7 +481,7 @@ SELECT create_reference_table('replicate_reference_table_insert'); BEGIN; INSERT INTO replicate_reference_table_insert VALUES(1); SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers +NOTICE: Replicating reference table "replicate_reference_table_insert" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_insert; @@ -496,7 +496,7 @@ SELECT create_reference_table('replicate_reference_table_copy'); BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers +NOTICE: Replicating reference table "replicate_reference_table_copy" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_copy; @@ -513,7 +513,7 @@ ALTER TABLE replicate_reference_table_ddl ADD column2 int; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers +NOTICE: Replicating reference table "replicate_reference_table_ddl" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_ddl; @@ -527,7 +527,7 @@ SELECT create_reference_table('replicate_reference_table_drop'); BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers +NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638 master_add_node --------------------------------------------- (1370009,1370009,localhost,57638,default,f) @@ -571,7 +571,7 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "table1" to all workers +NOTICE: Replicating reference table "table1" to the node localhost:57638 master_add_node --------------------------------------------- (1370010,1370010,localhost,57638,default,f) @@ -602,6 +602,130 @@ WHERE colocationid IN DROP TABLE replicate_reference_table_schema.table1; DROP SCHEMA replicate_reference_table_schema CASCADE; +-- do some tests with manually disabling reference table replication +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE initially_not_replicated_reference_table (key int); +SELECT create_reference_table('initially_not_replicated_reference_table'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE initially_not_replicated_reference_table_second (key int); +SELECT create_reference_table('initially_not_replicated_reference_table_second'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE append_test_table(key int); +SELECT create_distributed_table('append_test_table', 'key', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT master_add_node('localhost', :worker_2_port, activate_node := false); + master_add_node +--------------------------------------------- + (1370011,1370011,localhost,57638,default,f) +(1 row) + +-- we should see only two shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid IN + ('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second')) +ORDER BY 1,4,5; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370012 | 1 | 0 | localhost | 57637 + 1370013 | 1 | 0 | localhost | 57637 +(2 rows) + +-- now, see that certain operations are disallowed +CREATE TABLE disallow_test_table (key int); +SELECT create_reference_table('disallow_test_table'); +ERROR: operation is not allowed because localhost:57638 is not activated +DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table" +HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node. +SELECT create_distributed_table('disallow_test_table', 'key'); +ERROR: operation is not allowed because localhost:57638 is not activated +DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table" +HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node. +SELECT master_create_empty_shard('append_test_table'); +ERROR: operation is not allowed because localhost:57638 is not activated +DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table" +HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node. +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: operation is not allowed because localhost:57638 is not activated +DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table" +HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node. +COPY append_test_table FROM STDIN; +ERROR: operation is not allowed because localhost:57638 is not activated +DETAIL: At least one of the reference tables is under-replicated: "initially_not_replicated_reference_table" +HINT: Use SELECT master_activate_node('localhost', 57638) to replicate under-replicated reference tables to the given node. +SELECT master_activate_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638 +NOTICE: Replicating reference table "initially_not_replicated_reference_table_second" to the node localhost:57638 + master_activate_node +---------------------- + +(1 row) + +-- we should see the four shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid IN + ('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second')) +ORDER BY 1,4,5; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370012 | 1 | 0 | localhost | 57637 + 1370012 | 1 | 0 | localhost | 57638 + 1370013 | 1 | 0 | localhost | 57637 + 1370013 | 1 | 0 | localhost | 57638 +(4 rows) + +-- this should have no effect +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------------------- + (1370011,1370011,localhost,57638,default,f) +(1 row) + +-- now, should be able to create tables +CREATE TABLE initially_not_replicated_reference_table_third (key int); +SELECT create_reference_table('initially_not_replicated_reference_table_third'); + create_reference_table +------------------------ + +(1 row) + +-- drop unnecassary tables +DROP TABLE initially_not_replicated_reference_table_third, initially_not_replicated_reference_table_second, + initially_not_replicated_reference_table, append_test_table; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index fb6e523d5..47a8cf49c 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -95,6 +95,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); +NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638 ERROR: type "public.upgrade_test_composite_type" does not exist CONTEXT: while executing command on localhost:57638 DROP TABLE upgrade_reference_table_composite; @@ -165,6 +166,7 @@ WHERE shardid IN (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_append'); +NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -277,6 +279,7 @@ WHERE shardid IN (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); +NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -621,6 +624,7 @@ WHERE shardid IN BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); +NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -733,6 +737,7 @@ WHERE shardid IN BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); +NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -978,6 +983,7 @@ ORDER BY nodeport; SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); +NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638 upgrade_to_reference_table ---------------------------- diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 647cedde4..081011407 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -75,6 +75,8 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.1-18'; +ALTER EXTENSION citus UPDATE TO '6.1-19'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 394397d32..6a0e27c39 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -557,24 +557,28 @@ SELECT create_reference_table('mx_ref'); SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; \c - - - :worker_1_port SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE logicalrelid='mx_ref'::regclass; +WHERE logicalrelid='mx_ref'::regclass +ORDER BY shardid, nodename, nodeport; \c - - - :master_port INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 37d9f8aa3..eee581ef1 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -46,7 +46,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -64,7 +66,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -78,7 +82,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -96,7 +102,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -116,7 +124,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -134,7 +144,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -150,7 +162,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -168,7 +182,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -182,7 +198,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -200,7 +218,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -216,7 +236,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -234,7 +256,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -251,7 +275,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -269,7 +295,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -286,7 +314,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -307,7 +337,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM remove_node_reference_table; @@ -327,7 +359,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -345,7 +379,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -362,7 +398,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -380,7 +418,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -412,8 +452,8 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port -ORDER BY - shardid; +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -431,7 +471,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -463,7 +505,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -483,7 +527,7 @@ FROM WHERE nodeport = :worker_2_port ORDER BY - shardid; + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -501,7 +545,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port @@ -515,7 +561,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; SELECT * FROM pg_dist_colocation @@ -533,7 +581,9 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY + shardid, nodename, nodeport; \c - - - :master_port diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 39931af95..b58cdc7c1 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -391,6 +391,76 @@ WHERE colocationid IN DROP TABLE replicate_reference_table_schema.table1; DROP SCHEMA replicate_reference_table_schema CASCADE; +-- do some tests with manually disabling reference table replication +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE initially_not_replicated_reference_table (key int); +SELECT create_reference_table('initially_not_replicated_reference_table'); + +CREATE TABLE initially_not_replicated_reference_table_second (key int); +SELECT create_reference_table('initially_not_replicated_reference_table_second'); + +CREATE TABLE append_test_table(key int); +SELECT create_distributed_table('append_test_table', 'key', 'append'); + +SELECT master_add_node('localhost', :worker_2_port, activate_node := false); + +-- we should see only two shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid IN + ('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second')) +ORDER BY 1,4,5; + + +-- now, see that certain operations are disallowed +CREATE TABLE disallow_test_table (key int); +SELECT create_reference_table('disallow_test_table'); +SELECT create_distributed_table('disallow_test_table', 'key'); +SELECT master_create_empty_shard('append_test_table'); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + +COPY append_test_table FROM STDIN; +1 +2 +3 +\. + +SELECT master_activate_node('localhost', :worker_2_port); + +-- we should see the four shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid IN + ('initially_not_replicated_reference_table', 'initially_not_replicated_reference_table_second')) +ORDER BY 1,4,5; + +-- this should have no effect +SELECT master_add_node('localhost', :worker_2_port); + +-- now, should be able to create tables +CREATE TABLE initially_not_replicated_reference_table_third (key int); +SELECT create_reference_table('initially_not_replicated_reference_table_third'); + +-- drop unnecassary tables +DROP TABLE initially_not_replicated_reference_table_third, initially_not_replicated_reference_table_second, + initially_not_replicated_reference_table, append_test_table; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);