From 15af1637aa6f854df9d51c9f0100798f9a92e376 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 5 Nov 2019 09:39:51 -0800 Subject: [PATCH] Replicate reference tables to coordinator. --- CONTRIBUTING.md | 2 +- .../distributed/commands/dependencies.c | 2 +- src/backend/distributed/commands/function.c | 2 +- .../distributed/commands/utility_hook.c | 48 ++++ .../executor/intermediate_results.c | 2 +- .../executor/multi_server_executor.c | 2 +- .../executor/multi_task_tracker_executor.c | 2 +- .../distributed/executor/subplan_execution.c | 2 +- .../master/citus_create_restore_point.c | 2 +- .../distributed/master/master_create_shards.c | 19 +- .../master/master_delete_protocol.c | 16 +- .../master/master_metadata_utility.c | 1 + .../distributed/master/master_node_protocol.c | 2 +- .../master/master_stage_protocol.c | 2 +- .../distributed/master/worker_node_manager.c | 121 +++++++-- .../distributed/metadata/metadata_sync.c | 16 +- .../planner/multi_physical_planner.c | 6 +- .../planner/multi_router_planner.c | 2 +- .../distributed/sql/citus--9.0-2--9.1-1.sql | 2 + .../udfs/master_add_inactive_node/9.1-1.sql | 15 ++ .../udfs/master_add_inactive_node/latest.sql | 15 ++ .../sql/udfs/master_add_node/9.1-1.sql | 14 + .../sql/udfs/master_add_node/latest.sql | 14 + src/backend/distributed/test/metadata_sync.c | 2 +- .../distributed/transaction/backend_data.c | 2 +- .../transaction/citus_dist_stat_activity.c | 4 +- .../transaction/worker_transaction.c | 3 +- .../distributed/utils/metadata_cache.c | 4 +- src/backend/distributed/utils/node_metadata.c | 59 ++-- .../distributed/utils/reference_table_utils.c | 52 ++-- src/backend/distributed/utils/resource_lock.c | 2 +- .../distributed/utils/statistics_collection.c | 2 +- .../worker/worker_shard_visibility.c | 17 +- .../distributed/commands/utility_hook.h | 1 + .../distributed/reference_table_utils.h | 1 + src/include/distributed/worker_manager.h | 23 +- src/test/regress/expected/add_coordinator.out | 22 ++ .../foreign_key_to_reference_table.out | 54 ++-- ...licate_reference_tables_to_coordinator.out | 143 ++++++++++ .../expected/multi_mx_add_coordinator.out | 90 +++++++ .../expected/multi_reference_table.out | 53 ++-- .../multi_replicate_reference_table.out | 75 +++--- .../expected/multi_transaction_recovery.out | 46 +++- .../multi_upgrade_reference_table.out | 253 +++++++++--------- .../regress/expected/remove_coordinator.out | 7 + ...licate_reference_tables_to_coordinator.out | 103 +++++++ src/test/regress/isolation_schedule | 1 + src/test/regress/multi_mx_schedule | 1 + src/test/regress/multi_schedule | 12 + .../regress/multi_task_tracker_extra_schedule | 2 +- ...lation_distributed_deadlock_detection.spec | 2 +- ...icate_reference_tables_to_coordinator.spec | 134 ++++++++++ src/test/regress/sql/add_coordinator.sql | 14 + .../sql/foreign_key_to_reference_table.sql | 4 + .../regress/sql/multi_mx_add_coordinator.sql | 46 ++++ .../regress/sql/multi_reference_table.sql | 32 ++- .../sql/multi_replicate_reference_table.sql | 57 ++-- .../sql/multi_transaction_recovery.sql | 31 ++- .../sql/multi_upgrade_reference_table.sql | 120 +++++---- src/test/regress/sql/remove_coordinator.sql | 2 + ...licate_reference_tables_to_coordinator.sql | 52 ++++ 61 files changed, 1424 insertions(+), 413 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/master_add_inactive_node/9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_add_inactive_node/latest.sql create mode 100644 src/backend/distributed/sql/udfs/master_add_node/9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_add_node/latest.sql create mode 100644 src/test/regress/expected/add_coordinator.out create mode 100644 src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out create mode 100644 src/test/regress/expected/multi_mx_add_coordinator.out create mode 100644 src/test/regress/expected/remove_coordinator.out create mode 100644 src/test/regress/expected/replicate_reference_tables_to_coordinator.out create mode 100644 src/test/regress/specs/isolation_replicate_reference_tables_to_coordinator.spec create mode 100644 src/test/regress/sql/add_coordinator.sql create mode 100644 src/test/regress/sql/multi_mx_add_coordinator.sql create mode 100644 src/test/regress/sql/remove_coordinator.sql create mode 100644 src/test/regress/sql/replicate_reference_tables_to_coordinator.sql diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8f89c1f53..4e237eaea 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -161,7 +161,7 @@ function. Then copy the `latest.sql` file to `{version}.sql`, where `{version}` is the version for which this sql change is, e.g. `{9.0-1.sql}`. Now that you've created this stable snapshot of the function definition for your version you -should use it in your actual sql file, .e.g. +should use it in your actual sql file, e.g. `src/backend/distributed/sql/citus--8.3-1--9.0-1.sql`. You do this by using C style `#include` statements like this: ``` diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 7bda42c35..c952babd4 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -93,7 +93,7 @@ EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target) * either get it now, or get it in master_add_node after this transaction finishes and * the pg_dist_object record becomes visible. */ - workerNodeList = ActivePrimaryNodeList(RowShareLock); + workerNodeList = ActivePrimaryWorkerNodeList(RowShareLock); /* * right after we acquired the lock we mark our objects as distributed, these changes diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index d288d1cba..ddb635510 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1074,7 +1074,7 @@ EnsureSequentialModeForFunctionDDL(void) static void TriggerSyncMetadataToPrimaryNodes(void) { - List *workerList = ActivePrimaryNodeList(ShareLock); + List *workerList = ActivePrimaryWorkerNodeList(ShareLock); ListCell *workerCell = NULL; bool triggerMetadataSync = false; diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index fe251238d..3d8d01e95 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -65,6 +65,7 @@ bool EnableDDLPropagation = true; /* ddl propagation is enabled */ PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; static int activeAlterTables = 0; +static int activeDropSchemaOrDBs = 0; /* Local functions forward declarations for helper functions */ @@ -76,6 +77,7 @@ static List * PlanRenameAttributeStmt(RenameStmt *stmt, const char *queryString) static List * PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString); static List * PlanAlterObjectDependsStmt(AlterObjectDependsStmt *stmt, const char *queryString); +static bool IsDropSchemaOrDB(Node *parsetree); /* @@ -655,6 +657,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, activeAlterTables++; } + if (IsDropSchemaOrDB(parsetree)) + { + activeDropSchemaOrDBs++; + } + standard_ProcessUtility(pstmt, queryString, context, params, queryEnv, dest, completionTag); @@ -678,6 +685,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, { activeAlterTables--; } + + if (IsDropSchemaOrDB(parsetree)) + { + activeDropSchemaOrDBs--; + } } PG_CATCH(); { @@ -686,6 +698,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, activeAlterTables--; } + if (IsDropSchemaOrDB(parsetree)) + { + activeDropSchemaOrDBs--; + } + PG_RE_THROW(); } PG_END_TRY(); @@ -796,6 +813,26 @@ multi_ProcessUtility(PlannedStmt *pstmt, } +/* + * IsDropSchemaOrDB returns true if parsetree represents DROP SCHEMA ...or + * a DROP DATABASE. + */ +static bool +IsDropSchemaOrDB(Node *parsetree) +{ + DropStmt *dropStatement = NULL; + + if (!IsA(parsetree, DropStmt)) + { + return false; + } + + dropStatement = (DropStmt *) parsetree; + return (dropStatement->removeType == OBJECT_SCHEMA) || + (dropStatement->removeType == OBJECT_DATABASE); +} + + /* * PlanRenameAttributeStmt called for RenameStmt's that are targetting an attribute eg. * type attributes. Based on the relation type the attribute gets renamed it dispatches to @@ -1198,3 +1235,14 @@ AlterTableInProgress(void) { return activeAlterTables > 0; } + + +/* + * DropSchemaOrDBInProgress returns true if we're processing a DROP SCHEMA + * or a DROP DATABASE command right now. + */ +bool +DropSchemaOrDBInProgress(void) +{ + return activeDropSchemaOrDBs > 0; +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 021ebb43e..acf0cbffb 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -127,7 +127,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS) */ BeginOrContinueCoordinatedTransaction(); - nodeList = ActivePrimaryNodeList(NoLock); + nodeList = ActivePrimaryWorkerNodeList(NoLock); estate = CreateExecutorState(); resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString, estate, nodeList, diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 53f7f8aac..714f87dd2 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -114,7 +114,7 @@ JobExecutorType(DistributedPlan *distributedPlan) } else { - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActiveReadableWorkerNodeList(); int workerNodeCount = list_length(workerNodeList); int taskCount = list_length(job->taskList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 2e4b37b63..5c3d4c2d5 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -212,7 +212,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - workerNodeList = ActivePrimaryNodeList(NoLock); + workerNodeList = ActivePrimaryWorkerNodeList(NoLock); taskTrackerCount = (uint32) list_length(workerNodeList); /* connect as the current user for running queries */ diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 328ee6af1..81525d4f1 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -56,7 +56,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) */ BeginOrContinueCoordinatedTransaction(); - nodeList = ActiveReadableNodeList(); + nodeList = ActiveReadableWorkerNodeList(); foreach(subPlanCell, subPlanList) { diff --git a/src/backend/distributed/master/citus_create_restore_point.c b/src/backend/distributed/master/citus_create_restore_point.c index 838738647..f7d642556 100644 --- a/src/backend/distributed/master/citus_create_restore_point.c +++ b/src/backend/distributed/master/citus_create_restore_point.c @@ -121,7 +121,7 @@ OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode) ListCell *workerNodeCell = NULL; int connectionFlags = FORCE_NEW_CONNECTION; - workerNodeList = ActivePrimaryNodeList(lockMode); + workerNodeList = ActivePrimaryWorkerNodeList(lockMode); foreach(workerNodeCell, workerNodeList) { diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index d9571c414..91099ac58 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -177,7 +177,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, LockRelationOid(DistNodeRelationId(), RowShareLock); /* load and sort the worker node list for deterministic placement */ - workerNodeList = ActivePrimaryShouldHaveShardsNodeList(NoLock); + workerNodeList = DistributedTablePlacementNodeList(NoLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); workerNodeCount = list_length(workerNodeList); @@ -348,8 +348,7 @@ void CreateReferenceTableShard(Oid distributedTableId) { char shardStorageType = 0; - List *workerNodeList = NIL; - int32 workerNodeCount = 0; + List *nodeList = NIL; List *existingShardList = NIL; uint64 shardId = INVALID_SHARD_ID; int workerStartIndex = 0; @@ -386,18 +385,16 @@ CreateReferenceTableShard(Oid distributedTableId) /* * load and sort the worker node list for deterministic placements - * create_reference_table has already acquired ActivePrimaryNodeList lock + * create_reference_table has already acquired pg_dist_node lock */ - workerNodeList = ActivePrimaryNodeList(NoLock); - workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + nodeList = ReferenceTablePlacementNodeList(ShareLock); + nodeList = SortList(nodeList, CompareWorkerNodes); + + replicationFactor = ReferenceTableReplicationFactor(); /* get the next shard id */ shardId = GetNextShardId(); - /* set the replication factor equal to the number of worker nodes */ - workerNodeCount = list_length(workerNodeList); - replicationFactor = workerNodeCount; - /* * Grabbing the shard metadata lock isn't technically necessary since * we already hold an exclusive lock on the partition table, but we'll @@ -410,7 +407,7 @@ CreateReferenceTableShard(Oid distributedTableId) shardMaxValue); insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId, - workerNodeList, workerStartIndex, + nodeList, workerStartIndex, replicationFactor); CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 7d1c01490..2747705c8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -422,7 +422,21 @@ DropShards(Oid relationId, char *schemaName, char *relationName, quotedShardName); } - connection = GetPlacementConnection(connectionFlags, shardPlacement, NULL); + /* + * The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we + * try to drop it over another connection, we will get into a distributed + * deadlock. + */ + if (shardPlacement->groupId == COORDINATOR_GROUP_ID && + IsCoordinator() && + DropSchemaOrDBInProgress()) + { + DeleteShardPlacementRow(shardPlacement->placementId); + continue; + } + + connection = GetPlacementConnection(connectionFlags, shardPlacement, + NULL); RemoteTransactionBeginIfNecessary(connection); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index dd834f1f3..cbc0aeb99 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -41,6 +41,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_placement.h" +#include "distributed/reference_table_utils.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/remote_commands.h" diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index ed8895399..98e4b6e05 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -475,7 +475,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) /* switch to memory context appropriate for multiple function calls */ oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - workerNodeList = ActiveReadableNodeList(); + workerNodeList = ActiveReadableWorkerNodeList(); workerNodeCount = (uint32) list_length(workerNodeList); functionContext->user_fctx = workerNodeList; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 3ad0eb4ae..c17f92a38 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -160,7 +160,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) shardId = GetNextShardId(); /* if enough live groups, add an extra candidate node as backup */ - workerNodeList = ActivePrimaryNodeList(NoLock); + workerNodeList = DistributedTablePlacementNodeList(NoLock); if (list_length(workerNodeList) > ShardReplicationFactor) { diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index c53009785..47691a9ff 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -44,6 +44,9 @@ static List * PrimaryNodesNotInList(List *currentList); static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList); static bool OddNumber(uint32 number); static bool ListMember(List *currentList, WorkerNode *workerNode); +static bool NodeIsPrimaryWorker(WorkerNode *node); +static bool NodeCanHaveDistTablePlacements(WorkerNode *node); +static bool NodeIsReadableWorker(WorkerNode *node); /* ------------------------------------------------------------ @@ -294,12 +297,12 @@ WorkerGetNodeWithName(const char *hostname) /* - * ActivePrimaryNodeCount returns the number of groups with a primary in the cluster. + * ActivePrimaryWorkerNodeCount returns the number of groups with a primary in the cluster. */ uint32 -ActivePrimaryNodeCount(void) +ActivePrimaryWorkerNodeCount(void) { - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; @@ -307,18 +310,28 @@ ActivePrimaryNodeCount(void) /* - * ActiveReadableNodeCount returns the number of groups with a node we can read from. + * ActiveReadableWorkerNodeCount returns the number of groups with a node we can read from. */ uint32 -ActiveReadableNodeCount(void) +ActiveReadableWorkerNodeCount(void) { - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActiveReadableWorkerNodeList(); uint32 liveWorkerCount = list_length(workerNodeList); return liveWorkerCount; } +/* + * NodeIsCoordinator returns true if the given node represents the coordinator. + */ +bool +NodeIsCoordinator(WorkerNode *node) +{ + return node->groupId == COORDINATOR_GROUP_ID; +} + + /* * ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction * returns true for. @@ -358,37 +371,111 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *)) /* - * ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash - * lockMode specifies which lock to use on pg_dist_node, this is necessary when - * the caller wouldn't want nodes to be added concurrent to their use of this list + * ActivePrimaryWorkerNodeList returns a list of all active primary worker nodes + * in workerNodeHash. lockMode specifies which lock to use on pg_dist_node, + * this is necessary when the caller wouldn't want nodes to be added concurrent + * to their use of this list. + */ +List * +ActivePrimaryWorkerNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker); +} + + +/* + * ActivePrimaryNodeList returns a list of all active primary nodes in + * workerNodeHash. */ List * ActivePrimaryNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); - return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimary); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimary); } /* - * ActivePrimaryShouldHaveShardsNodeList returns a list of all active, primary + * NodeIsPrimaryWorker returns true if the node is a primary worker node. + */ +static bool +NodeIsPrimaryWorker(WorkerNode *node) +{ + return !NodeIsCoordinator(node) && NodeIsPrimary(node); +} + + +/* + * ReferenceTablePlacementNodeList returns the set of nodes that should have + * reference table placements. This includes all primaries, including the + * coordinator if known. + */ +List * +ReferenceTablePlacementNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimary); +} + + +/* + * DistributedTablePlacementNodeList returns a list of all active, primary * worker nodes that can store new data, i.e shouldstoreshards is 'true' */ List * -ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode) +DistributedTablePlacementNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); - return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimaryShouldHaveShardsNode); + return FilterActiveNodeListFunc(lockMode, NodeCanHaveDistTablePlacements); } /* - * ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from. + * NodeCanHaveDistTablePlacements returns true if the given node can have + * shards of a distributed table. + */ +static bool +NodeCanHaveDistTablePlacements(WorkerNode *node) +{ + if (!NodeIsPrimary(node)) + { + return false; + } + + return node->shouldHaveShards; +} + + +/* + * ActiveReadableWorkerNodeList returns a list of all nodes in workerNodeHash + * that are readable workers. + */ +List * +ActiveReadableWorkerNodeList(void) +{ + return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker); +} + + +/* + * ActiveReadableNodeList returns a list of all nodes in workerNodeHash + * that are readable workers. */ List * ActiveReadableNodeList(void) { - return FilterActiveNodeListFunc(NoLock, WorkerNodeIsReadable); + return FilterActiveNodeListFunc(NoLock, NodeIsReadable); +} + + +/* + * NodeIsReadableWorker returns true if the given node is a readable worker node. + */ +static bool +NodeIsReadableWorker(WorkerNode *node) +{ + return !NodeIsCoordinator(node) && NodeIsReadable(node); } @@ -414,7 +501,7 @@ PrimaryNodesNotInList(List *currentList) continue; } - if (WorkerNodeIsPrimary(workerNode)) + if (NodeIsPrimary(workerNode)) { workerNodeList = lappend(workerNodeList, workerNode); } @@ -524,7 +611,7 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) WorkerNode * GetFirstPrimaryWorkerNode(void) { - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); ListCell *workerNodeCell = NULL; WorkerNode *firstWorkerNode = NULL; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 0079e183a..8f5ef4395 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -124,9 +124,17 @@ StartMetadatSyncToNode(char *nodeNameString, int32 nodePort) escapedNodeName, nodePort))); } + if (NodeIsCoordinator(workerNode)) + { + ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains " + "metadata, skipping syncing the metadata", + nodeNameString, nodePort))); + return; + } + MarkNodeHasMetadata(nodeNameString, nodePort, true); - if (!WorkerNodeIsPrimary(workerNode)) + if (!NodeIsPrimary(workerNode)) { /* * If this is a secondary node we can't actually sync metadata to it; we assume @@ -346,7 +354,7 @@ MetadataCreateCommands(void) List *distributedTableList = DistributedTableList(); List *propagatedTableList = NIL; bool includeNodesFromOtherClusters = true; - List *workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); + List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; bool includeSequenceDefaults = true; @@ -1177,7 +1185,7 @@ SchemaOwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); ListCell *workerNodeCell = NULL; foreach(workerNodeCell, workerNodeList) @@ -1306,7 +1314,7 @@ SyncMetadataToNodes(void) return METADATA_SYNC_FAILED_LOCK; } - workerList = ActivePrimaryNodeList(NoLock); + workerList = ActivePrimaryWorkerNodeList(NoLock); foreach(workerCell, workerList) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 64fbe015a..9e1991921 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1991,7 +1991,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActiveReadableNodeCount(); + uint32 groupCount = ActiveReadableWorkerNodeCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -5024,7 +5024,7 @@ GreedyAssignTaskList(List *taskList) uint32 taskCount = list_length(taskList); /* get the worker node list and sort the list */ - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActiveReadableWorkerNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* @@ -5472,7 +5472,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActiveReadableWorkerNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index acabc48cd..57ee6fe7f 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2120,7 +2120,7 @@ PlanRouterQuery(Query *originalQuery, } else if (replacePrunedQueryWithDummy) { - List *workerNodeList = ActiveReadableNodeList(); + List *workerNodeList = ActiveReadableWorkerNodeList(); if (workerNodeList != NIL) { int workerNodeCount = list_length(workerNodeList); diff --git a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql index 6c55b16fe..253d63e77 100644 --- a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql +++ b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql @@ -6,6 +6,8 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS #include "udfs/master_drain_node/9.1-1.sql" #include "udfs/rebalance_table_shards/9.1-1.sql" #include "udfs/get_rebalance_table_shards_plan/9.1-1.sql" +#include "udfs/master_add_node/9.1-1.sql" +#include "udfs/master_add_inactive_node/9.1-1.sql" -- we don't maintain replication factor of reference tables anymore and just -- use -1 instead. diff --git a/src/backend/distributed/sql/udfs/master_add_inactive_node/9.1-1.sql b/src/backend/distributed/sql/udfs/master_add_inactive_node/9.1-1.sql new file mode 100644 index 000000000..d05c7f6b7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_add_inactive_node/9.1-1.sql @@ -0,0 +1,15 @@ +-- Update the default groupId to -1 +DROP FUNCTION master_add_inactive_node(text, integer, integer, noderole, name); +CREATE FUNCTION master_add_inactive_node(nodename text, + nodeport integer, + groupid integer default -1, + noderole noderole default 'primary', + nodecluster name default 'default') + RETURNS INTEGER + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_add_inactive_node$$; +COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer, + groupid integer, noderole noderole, + nodecluster name) + IS 'prepare node by adding it to pg_dist_node'; +REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_add_inactive_node/latest.sql b/src/backend/distributed/sql/udfs/master_add_inactive_node/latest.sql new file mode 100644 index 000000000..d05c7f6b7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_add_inactive_node/latest.sql @@ -0,0 +1,15 @@ +-- Update the default groupId to -1 +DROP FUNCTION master_add_inactive_node(text, integer, integer, noderole, name); +CREATE FUNCTION master_add_inactive_node(nodename text, + nodeport integer, + groupid integer default -1, + noderole noderole default 'primary', + nodecluster name default 'default') + RETURNS INTEGER + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_add_inactive_node$$; +COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer, + groupid integer, noderole noderole, + nodecluster name) + IS 'prepare node by adding it to pg_dist_node'; +REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_add_node/9.1-1.sql b/src/backend/distributed/sql/udfs/master_add_node/9.1-1.sql new file mode 100644 index 000000000..453a269d7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_add_node/9.1-1.sql @@ -0,0 +1,14 @@ +-- Update the default groupId to -1 +DROP FUNCTION master_add_node(text, integer, integer, noderole, name); +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + groupid integer default -1, + noderole noderole default 'primary', + nodecluster name default 'default') + RETURNS INTEGER + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer, + groupid integer, noderole noderole, nodecluster name) + IS 'add node to the cluster'; +REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_add_node/latest.sql b/src/backend/distributed/sql/udfs/master_add_node/latest.sql new file mode 100644 index 000000000..453a269d7 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_add_node/latest.sql @@ -0,0 +1,14 @@ +-- Update the default groupId to -1 +DROP FUNCTION master_add_node(text, integer, integer, noderole, name); +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + groupid integer default -1, + noderole noderole default 'primary', + nodecluster name default 'default') + RETURNS INTEGER + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer, + groupid integer, noderole noderole, nodecluster name) + IS 'add node to the cluster'; +REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC; diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index f571b2ee5..0450cb196 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -80,7 +80,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) uint32 timeout = PG_GETARG_UINT32(0); int waitResult = 0; - List *workerList = ActivePrimaryNodeList(NoLock); + List *workerList = ActivePrimaryWorkerNodeList(NoLock); ListCell *workerCell = NULL; bool waitNotifications = false; MultiConnection *connection = NULL; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 5d938f2d0..403c79789 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -216,7 +216,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); ListCell *workerNodeCell = NULL; List *connectionList = NIL; ListCell *connectionCell = NULL; diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 3226a97aa..c7ae102ed 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -317,7 +317,7 @@ CitusStatActivity(const char *statQuery) { List *citusStatsList = NIL; - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); ListCell *workerNodeCell = NULL; char *nodeUser = NULL; List *connectionList = NIL; @@ -455,7 +455,7 @@ GetLocalNodeCitusDistStat(const char *statQuery) localGroupId = GetLocalGroupId(); /* get the current worker's node stats */ - workerNodeList = ActivePrimaryNodeList(NoLock); + workerNodeList = ActivePrimaryWorkerNodeList(NoLock); foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index ab7e2f770..2d8cd4342 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -111,13 +111,14 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command) List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { - List *workerNodeList = ActivePrimaryNodeList(lockMode); + List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); ListCell *workerNodeCell = NULL; List *result = NIL; foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (targetWorkerSet == WORKERS_WITH_METADATA && (!workerNode->hasMetadata || !workerNode->metadataSynced)) { diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 9f952e4ea..989edb199 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -627,7 +627,7 @@ LookupNodeForGroup(int32 groupId) foundAnyNodes = true; - if (WorkerNodeIsReadable(workerNode)) + if (NodeIsReadable(workerNode)) { return workerNode; } @@ -3026,7 +3026,7 @@ InitializeWorkerNodeCache(void) newWorkerNodeHash = hash_create("Worker Node Hash", maxTableSize, &info, hashFlags); /* read the list from pg_dist_node */ - workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters); + workerNodeList = ReadDistNode(includeNodesFromOtherClusters); newWorkerNodeCount = list_length(workerNodeList); newWorkerNodeArray = MemoryContextAlloc(MetadataCacheMemoryContext, diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 18aedcb53..e9b89562a 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -49,6 +49,7 @@ #include "utils/rel.h" #include "utils/relcache.h" +#define INVALID_GROUP_ID -1 /* default group size */ int GroupSize = 1; @@ -109,6 +110,7 @@ DefaultNodeMetadata() NodeMetadata nodeMetadata = { .nodeRack = WORKER_DEFAULT_RACK, .shouldHaveShards = true, + .groupId = INVALID_GROUP_ID, }; return nodeMetadata; } @@ -277,7 +279,7 @@ master_disable_node(PG_FUNCTION_ARGS) PG_TRY(); { - if (WorkerNodeIsPrimary(workerNode)) + if (NodeIsPrimary(workerNode)) { /* * Delete reference table placements so they are not taken into account @@ -363,7 +365,7 @@ master_set_node_property(PG_FUNCTION_ARGS) static void SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) { - if (WorkerNodeIsPrimary(newWorkerNode)) + if (NodeIsPrimary(newWorkerNode)) { EnsureNoModificationsHaveBeenDone(); ReplicateAllDependenciesToNode(newWorkerNode->workerName, @@ -450,10 +452,10 @@ GroupForNode(char *nodeName, int nodePort) /* - * WorkerNodeIsPrimary returns whether the argument represents a primary node. + * NodeIsPrimary returns whether the argument represents a primary node. */ bool -WorkerNodeIsPrimary(WorkerNode *worker) +NodeIsPrimary(WorkerNode *worker) { Oid primaryRole = PrimaryNodeRoleId(); @@ -468,10 +470,10 @@ WorkerNodeIsPrimary(WorkerNode *worker) /* - * WorkerNodeIsSecondary returns whether the argument represents a secondary node. + * NodeIsSecondary returns whether the argument represents a secondary node. */ bool -WorkerNodeIsSecondary(WorkerNode *worker) +NodeIsSecondary(WorkerNode *worker) { Oid secondaryRole = SecondaryNodeRoleId(); @@ -486,36 +488,20 @@ WorkerNodeIsSecondary(WorkerNode *worker) /* - * WorkerNodeIsPrimaryShouldHaveShardsNode returns whether the argument represents a - * primary node that is a eligible for new data. - */ -bool -WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker) -{ - if (!WorkerNodeIsPrimary(worker)) - { - return false; - } - - return worker->shouldHaveShards; -} - - -/* - * WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this + * NodeIsReadable returns whether we're allowed to send SELECT queries to this * node. */ bool -WorkerNodeIsReadable(WorkerNode *workerNode) +NodeIsReadable(WorkerNode *workerNode) { if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER && - WorkerNodeIsPrimary(workerNode)) + NodeIsPrimary(workerNode)) { return true; } if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS && - WorkerNodeIsSecondary(workerNode)) + NodeIsSecondary(workerNode)) { return true; } @@ -552,7 +538,7 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) *groupContainsNodes = true; } - if (WorkerNodeIsPrimary(workerNode)) + if (NodeIsPrimary(workerNode)) { hash_seq_term(&status); return workerNode; @@ -668,7 +654,7 @@ master_update_node(PG_FUNCTION_ARGS) * though we currently only query secondaries on follower clusters * where these locks will have no effect. */ - if (WorkerNodeIsPrimary(workerNode)) + if (NodeIsPrimary(workerNode)) { /* * before acquiring the locks check if we want a background worker to help us to @@ -918,7 +904,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) /* - * ReadWorkerNodes iterates over pg_dist_node table, converts each row + * ReadDistNode iterates over pg_dist_node table, converts each row * into it's memory representation (i.e., WorkerNode) and adds them into * a list. Lastly, the list is returned to the caller. * @@ -926,7 +912,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) * by includeNodesFromOtherClusters. */ List * -ReadWorkerNodes(bool includeNodesFromOtherClusters) +ReadDistNode(bool includeNodesFromOtherClusters) { SysScanDesc scanDescriptor = NULL; ScanKeyData scanKey[1]; @@ -981,7 +967,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) char *nodeDeleteCommand = NULL; WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); - if (WorkerNodeIsPrimary(workerNode)) + if (NodeIsPrimary(workerNode)) { bool onlyConsiderActivePlacements = false; @@ -1024,7 +1010,7 @@ CountPrimariesWithMetadata(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->hasMetadata && WorkerNodeIsPrimary(workerNode)) + if (workerNode->hasMetadata && NodeIsPrimary(workerNode)) { primariesWithMetadata++; } @@ -1074,11 +1060,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort, } /* user lets Citus to decide on the group that the newly added node should be in */ - if (nodeMetadata->groupId == 0) + if (nodeMetadata->groupId == INVALID_GROUP_ID) { nodeMetadata->groupId = GetNextGroupId(); } + /* if this is a coordinator, we shouldn't place shards on it */ + if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) + { + nodeMetadata->shouldHaveShards = false; + nodeMetadata->hasMetadata = true; + } + /* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */ if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole == PrimaryNodeRoleId()) diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 1bd31ab8f..423746fdc 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -36,8 +36,8 @@ /* local function forward declarations */ -static void ReplicateSingleShardTableToAllWorkers(Oid relationId); -static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); +static void ReplicateSingleShardTableToAllNodes(Oid relationId); +static void ReplicateShardToAllNodes(ShardInterval *shardInterval); static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); @@ -56,8 +56,6 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); List *shardIntervalList = NIL; - ShardInterval *shardInterval = NULL; - uint64 shardId = INVALID_SHARD_ID; DistTableCacheEntry *tableEntry = NULL; CheckCitusVersion(ERROR); @@ -95,6 +93,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) relationName))); } + LockRelationOid(relationId, AccessExclusiveLock); + shardIntervalList = LoadShardIntervalList(relationId); if (list_length(shardIntervalList) != 1) { @@ -106,13 +106,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) "reference tables.", relationName))); } - shardInterval = (ShardInterval *) linitial(shardIntervalList); - shardId = shardInterval->shardId; - - LockShardDistributionMetadata(shardId, ExclusiveLock); - LockShardResource(shardId, ExclusiveLock); - - ReplicateSingleShardTableToAllWorkers(relationId); + ReplicateSingleShardTableToAllNodes(relationId); PG_RETURN_VOID(); } @@ -184,12 +178,13 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) /* - * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to - * all worker nodes. It assumes that caller of this function ensures that given broadcast - * table has only one shard. + * ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates + * it to all worker nodes, and the coordinator if it has been added by the user + * to pg_dist_node. It assumes that caller of this function ensures that given + * broadcast table has only one shard. */ static void -ReplicateSingleShardTableToAllWorkers(Oid relationId) +ReplicateSingleShardTableToAllNodes(Oid relationId) { List *shardIntervalList = LoadShardIntervalList(relationId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); @@ -209,12 +204,12 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId) } /* - * ReplicateShardToAllWorkers function opens separate transactions (i.e., not part + * ReplicateShardToAllNodes function opens separate transactions (i.e., not part * of any coordinated transactions) to each worker and replicates given shard to all * workers. If a worker already has a healthy replica of given shard, it skips that * worker to prevent copying unnecessary data. */ - ReplicateShardToAllWorkers(shardInterval); + ReplicateShardToAllNodes(shardInterval); /* * We need to update metadata tables to mark this table as reference table. We modify @@ -233,20 +228,20 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId) /* - * ReplicateShardToAllWorkers function replicates given shard to the all worker nodes + * ReplicateShardToAllNodes function replicates given shard to all nodes * in separate transactions. While replicating, it only replicates the shard to the - * workers which does not have a healthy replica of the shard. However, this function + * nodes which does not have a healthy replica of the shard. However, this function * does not obtain any lock on shard resource and shard metadata. It is caller's * responsibility to take those locks. */ static void -ReplicateShardToAllWorkers(ShardInterval *shardInterval) +ReplicateShardToAllNodes(ShardInterval *shardInterval) { List *workerNodeList = NULL; ListCell *workerNodeCell = NULL; /* prevent concurrent pg_dist_node changes */ - workerNodeList = ActivePrimaryNodeList(ShareLock); + workerNodeList = ReferenceTablePlacementNodeList(ShareLock); /* * We will iterate over all worker nodes and if a healthy placement does not exist @@ -325,7 +320,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) } /* - * Although ReplicateShardToAllWorkers is used only for reference tables, + * Although ReplicateShardToAllNodes is used only for reference tables, * during the upgrade phase, the placements are created before the table is * marked as a reference table. All metadata (including the placement * metadata) will be copied to workers after all reference table changed @@ -514,3 +509,16 @@ CompareOids(const void *leftElement, const void *rightElement) return 0; } } + + +/* + * ReferenceTableReplicationFactor returns the replication factor for + * reference tables. + */ +int +ReferenceTableReplicationFactor(void) +{ + List *nodeList = ReferenceTablePlacementNodeList(NoLock); + int replicationFactor = list_length(nodeList); + return replicationFactor; +} diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index cb44bbf79..6c4156bb0 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -233,7 +233,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) static bool IsFirstWorkerNode() { - List *workerNodeList = ActivePrimaryNodeList(NoLock); + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); WorkerNode *firstWorkerNode = NULL; workerNodeList = SortList(workerNodeList, CompareWorkerNodes); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index 8da33ba03..5172d1680 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -108,7 +108,7 @@ CollectBasicUsageStatistics(void) distTableOids = DistTableOidList(); roundedDistTableCount = NextPow2(list_length(distTableOids)); roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); - workerNodeCount = ActivePrimaryNodeCount(); + workerNodeCount = ActivePrimaryWorkerNodeCount(); metadataJsonbDatum = DistNodeMetadata(); metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, metadataJsonbDatum)); diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 27e1565b0..c79e1ba31 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -122,11 +122,18 @@ RelationIsAKnownShard(Oid shardRelationId) localGroupId = GetLocalGroupId(); if (localGroupId == 0) { - /* - * We're not interested in shards in the coordinator - * or non-mx worker nodes. - */ - return false; + bool coordinatorIsKnown = false; + PrimaryNodeForGroup(0, &coordinatorIsKnown); + + if (!coordinatorIsKnown) + { + /* + * We're not interested in shards in the coordinator + * or non-mx worker nodes, unless the coordinator is + * in pg_dist_node. + */ + return false; + } } /* we're not interested in the relations that are not in the search path */ diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index a5955070c..359bcd176 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -58,5 +58,6 @@ extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); +extern bool DropSchemaOrDBInProgress(void); #endif /* MULTI_UTILITY_H */ diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0aeaf1c64..a7eaf34a0 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,6 +21,7 @@ extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); extern List * ReferenceTableOidList(void); extern int CompareOids(const void *leftElement, const void *rightElement); +extern int ReferenceTableReplicationFactor(void); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 9e1f931aa..9e951fce7 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -33,6 +33,8 @@ #define WORKER_DEFAULT_CLUSTER "default" +#define COORDINATOR_GROUP_ID 0 + /* * In memory representation of pg_dist_node table elements. The elements are hold in * WorkerNodeHash table. @@ -67,22 +69,25 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); -extern uint32 ActivePrimaryNodeCount(void); +extern uint32 ActivePrimaryWorkerNodeCount(void); +extern List * ActivePrimaryWorkerNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); -extern List * ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode); -extern uint32 ActiveReadableNodeCount(void); +extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); +extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); +extern uint32 ActiveReadableWorkerNodeCount(void); +extern List * ActiveReadableWorkerNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * GetWorkerNodeByNodeId(int nodeId); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); -extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters); +extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); -extern uint32 GroupForNode(char *nodeName, int32 nodePorT); +extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); -extern bool WorkerNodeIsPrimary(WorkerNode *worker); -extern bool WorkerNodeIsSecondary(WorkerNode *worker); -extern bool WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker); -extern bool WorkerNodeIsReadable(WorkerNode *worker); +extern bool NodeIsPrimary(WorkerNode *worker); +extern bool NodeIsSecondary(WorkerNode *worker); +extern bool NodeIsReadable(WorkerNode *worker); +extern bool NodeIsCoordinator(WorkerNode *node); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); diff --git a/src/test/regress/expected/add_coordinator.out b/src/test/regress/expected/add_coordinator.out new file mode 100644 index 000000000..41d89db06 --- /dev/null +++ b/src/test/regress/expected/add_coordinator.out @@ -0,0 +1,22 @@ +-- +-- ADD_COORDINATOR +-- +SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset +-- adding the same node again should return the existing nodeid +SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid; + ?column? +---------- + t +(1 row) + +-- adding another node with groupid=0 should error out +SELECT master_add_node('localhost', 12345, groupid => 0) = :master_nodeid; +ERROR: group 0 already has a primary node +-- start_metadata_sync_to_node() for coordinator should raise a notice +SELECT start_metadata_sync_to_node('localhost', :master_port); +NOTICE: localhost:57636 is the coordinator and already contains metadata, skipping syncing the metadata + start_metadata_sync_to_node +----------------------------- + +(1 row) + diff --git a/src/test/regress/expected/foreign_key_to_reference_table.out b/src/test/regress/expected/foreign_key_to_reference_table.out index 2737d3d52..dd59151c2 100644 --- a/src/test/regress/expected/foreign_key_to_reference_table.out +++ b/src/test/regress/expected/foreign_key_to_reference_table.out @@ -527,12 +527,10 @@ SELECT count(*) FROM referencing_schema.referencing_table; 800 (1 row) +SET client_min_messages TO ERROR; DROP SCHEMA referenced_schema CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table referenced_schema.referenced_table -drop cascades to constraint fkey_ref on table referencing_schema.referencing_table DROP SCHEMA referencing_schema CASCADE; -NOTICE: drop cascades to table referencing_schema.referencing_table +RESET client_min_messages; -- on delete set update cascades properly CREATE TABLE referenced_table(test_column int, test_column2 int, PRIMARY KEY(test_column)); CREATE TABLE referencing_table(id int, ref_id int DEFAULT 1); @@ -819,8 +817,8 @@ INSERT INTO referenced_table SELECT x, x+1 FROM generate_series(0,1000) AS f(x); INSERT INTO referenced_table2 SELECT x, x+1 FROM generate_series(500,1500) AS f(x); -- should fail INSERT INTO referencing_table SELECT x, x+1 FROM generate_series(0,1500) AS f(x); -ERROR: insert or update on table "referencing_table_7000230" violates foreign key constraint "foreign_key_2_7000230" -DETAIL: Key (id)=(28) is not present in table "referenced_table2_7000225". +ERROR: insert or update on table "referencing_table_7000226" violates foreign key constraint "foreign_key_2_7000226" +DETAIL: Key (id)=(1) is not present in table "referenced_table2_7000225". -- should fail INSERT INTO referencing_table SELECT x, x+1 FROM generate_series(0,400) AS f(x); ERROR: insert or update on table "referencing_table_7000226" violates foreign key constraint "foreign_key_2_7000226" @@ -1188,24 +1186,24 @@ SELECT create_distributed_table('referencing_referencing_table', 'id'); ALTER TABLE referencing_table ADD CONSTRAINT fkey_ref FOREIGN KEY (ref_id, ref_id2) REFERENCES referenced_table(test_column, test_column2) ON DELETE CASCADE; SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_reference_table.referencing%' ORDER BY 1,2,3; - name | relid | refd_relid ------------------------------------------------+------------------------------------------------------------+------------------------------------------------ - fkey_ref_7000299 | fkey_reference_table.referencing_table_7000299 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000300 | fkey_reference_table.referencing_table_7000300 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000301 | fkey_reference_table.referencing_table_7000301 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000302 | fkey_reference_table.referencing_table_7000302 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000303 | fkey_reference_table.referencing_table_7000303 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000304 | fkey_reference_table.referencing_table_7000304 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000305 | fkey_reference_table.referencing_table_7000305 | fkey_reference_table.referenced_table_7000298 - fkey_ref_7000306 | fkey_reference_table.referencing_table_7000306 | fkey_reference_table.referenced_table_7000298 - referencing_referencing_table_id_fkey_7000307 | fkey_reference_table.referencing_referencing_table_7000307 | fkey_reference_table.referencing_table_7000299 - referencing_referencing_table_id_fkey_7000308 | fkey_reference_table.referencing_referencing_table_7000308 | fkey_reference_table.referencing_table_7000300 - referencing_referencing_table_id_fkey_7000309 | fkey_reference_table.referencing_referencing_table_7000309 | fkey_reference_table.referencing_table_7000301 - referencing_referencing_table_id_fkey_7000310 | fkey_reference_table.referencing_referencing_table_7000310 | fkey_reference_table.referencing_table_7000302 - referencing_referencing_table_id_fkey_7000311 | fkey_reference_table.referencing_referencing_table_7000311 | fkey_reference_table.referencing_table_7000303 - referencing_referencing_table_id_fkey_7000312 | fkey_reference_table.referencing_referencing_table_7000312 | fkey_reference_table.referencing_table_7000304 - referencing_referencing_table_id_fkey_7000313 | fkey_reference_table.referencing_referencing_table_7000313 | fkey_reference_table.referencing_table_7000305 - referencing_referencing_table_id_fkey_7000314 | fkey_reference_table.referencing_referencing_table_7000314 | fkey_reference_table.referencing_table_7000306 + name | relid | refd_relid +------------------------------------------------------+------------------------------------------------------------+------------------------------------------------ + fkey_ref_7000299 | fkey_reference_table.referencing_table_7000299 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000300 | fkey_reference_table.referencing_table_7000300 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000301 | fkey_reference_table.referencing_table_7000301 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000302 | fkey_reference_table.referencing_table_7000302 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000303 | fkey_reference_table.referencing_table_7000303 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000304 | fkey_reference_table.referencing_table_7000304 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000305 | fkey_reference_table.referencing_table_7000305 | fkey_reference_table.referenced_table_7000298 + fkey_ref_7000306 | fkey_reference_table.referencing_table_7000306 | fkey_reference_table.referenced_table_7000298 + referencing_referencing_table_id_ref_id_fkey_7000307 | fkey_reference_table.referencing_referencing_table_7000307 | fkey_reference_table.referencing_table_7000299 + referencing_referencing_table_id_ref_id_fkey_7000308 | fkey_reference_table.referencing_referencing_table_7000308 | fkey_reference_table.referencing_table_7000300 + referencing_referencing_table_id_ref_id_fkey_7000309 | fkey_reference_table.referencing_referencing_table_7000309 | fkey_reference_table.referencing_table_7000301 + referencing_referencing_table_id_ref_id_fkey_7000310 | fkey_reference_table.referencing_referencing_table_7000310 | fkey_reference_table.referencing_table_7000302 + referencing_referencing_table_id_ref_id_fkey_7000311 | fkey_reference_table.referencing_referencing_table_7000311 | fkey_reference_table.referencing_table_7000303 + referencing_referencing_table_id_ref_id_fkey_7000312 | fkey_reference_table.referencing_referencing_table_7000312 | fkey_reference_table.referencing_table_7000304 + referencing_referencing_table_id_ref_id_fkey_7000313 | fkey_reference_table.referencing_referencing_table_7000313 | fkey_reference_table.referencing_table_7000305 + referencing_referencing_table_id_ref_id_fkey_7000314 | fkey_reference_table.referencing_referencing_table_7000314 | fkey_reference_table.referencing_table_7000306 (16 rows) INSERT INTO referenced_table SELECT x, x+1 FROM generate_series(1,1000) AS f(x); @@ -1221,7 +1219,7 @@ SELECT max(ref_id) FROM referencing_referencing_table; DROP TABLE referenced_table CASCADE; NOTICE: drop cascades to constraint fkey_ref on table referencing_table DROP TABLE referencing_table CASCADE; -NOTICE: drop cascades to constraint referencing_referencing_table_id_fkey on table referencing_referencing_table +NOTICE: drop cascades to constraint referencing_referencing_table_id_ref_id_fkey on table referencing_referencing_table DROP TABLE referencing_referencing_table; -- test if create_distributed_table works in transactions with some edge cases -- the following checks if create_distributed_table works on foreign keys when @@ -1859,9 +1857,7 @@ ROLLBACK; DROP TABLE referenced_table CASCADE; NOTICE: drop cascades to constraint fkey_to_ref on table referencing_table_4 DROP TABLE referencing_table; +SET client_min_messages TO ERROR; DROP SCHEMA fkey_reference_table CASCADE; -NOTICE: drop cascades to 3 other objects -DETAIL: drop cascades to type foreign_details -drop cascades to view table_fkeys_in_workers -drop cascades to type composite SET search_path TO DEFAULT; +RESET client_min_messages; diff --git a/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out new file mode 100644 index 000000000..eae88c220 --- /dev/null +++ b/src/test/regress/expected/isolation_replicate_reference_tables_to_coordinator.out @@ -0,0 +1,143 @@ +Parsed test spec with 3 sessions + +starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update-dist-table: + update dist_table set b = 2 where a = 1; + +step s2-lock-ref-table-placement-on-coordinator: + DO $$ + DECLARE refshardid int; + BEGIN + SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass; + EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text); + END + $$; + +step s1-lock-ref-table-placement-on-coordinator: + DO $$ + DECLARE refshardid int; + BEGIN + SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass; + EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text); + END + $$; + +step s2-update-dist-table: + update dist_table set b = 2 where a = 1; + +step deadlock-checker-call: + SELECT check_distributed_deadlocks(); + +check_distributed_deadlocks + +t +step s1-lock-ref-table-placement-on-coordinator: <... completed> +step s2-update-dist-table: <... completed> +error in steps deadlock-checker-call s1-lock-ref-table-placement-on-coordinator s2-update-dist-table: ERROR: canceling the transaction since it was involved in a distributed deadlock +step s1-end: + END; + +step s2-end: + END; + +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update-ref-table: + update ref_table set a = a + 1; + +step s2-sleep: + SELECT pg_sleep(0.5); + +pg_sleep + + +step s2-view-dist: + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + + update ref_table set a = a + 1; +coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression + + update ref_table set a = a + 1; +localhost 57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s2-view-worker: + SELECT query, query_hostname, query_hostport, master_query_host_name, + master_query_host_port, state, wait_event_type, wait_event, usename, datname + FROM citus_worker_stat_activity + WHERE query NOT ILIKE '%pg_prepared_xacts%' AND + query NOT ILIKE '%COMMIT%' AND + query NOT ILIKE '%dump_local_wait_edges%' + ORDER BY query, query_hostport DESC; + +query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + +UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression +step s2-end: + END; + +step s1-end: + END; + +master_remove_node + + + +starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end +create_distributed_table + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-update-ref-table: + update ref_table set a = a + 1; + +step s2-active-transactions: + -- Admin should be able to see all transactions + SELECT count(*) FROM get_all_active_transactions(); + SELECT count(*) FROM get_global_active_transactions(); + +count + +2 +count + +6 +step s1-end: + END; + +step s2-end: + END; + +master_remove_node + + diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out new file mode 100644 index 000000000..bb347c2f6 --- /dev/null +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -0,0 +1,90 @@ +CREATE SCHEMA mx_add_coordinator; +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +---------- + 1 +(1 row) + +-- test that coordinator pg_dist_node entry is synced to the workers +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + verify_metadata | verify_metadata +-----------------+----------------- + t | t +(1 row) + +CREATE TABLE ref(a int); +SELECT create_reference_table('ref'); + create_reference_table +------------------------ + +(1 row) + +-- test that changes from a metadata node is reflected in the coordinator placement +\c - - - :worker_1_port +SET search_path TO mx_add_coordinator,public; +INSERT INTO ref VALUES (1), (2), (3); +UPDATE ref SET a = a + 1; +DELETE FROM ref WHERE a > 3; +\c - - - :master_port +SET search_path TO mx_add_coordinator,public; +SELECT * FROM ref ORDER BY a; + a +--- + 2 + 3 +(2 rows) + +-- Clear pg_dist_transaction before removing the node. This is to keep the output +-- of multi_mx_transaction_recovery consistent. +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +SELECT count(*) FROM run_command_on_workers('SELECT recover_prepared_transactions()'); + count +------- + 2 +(1 row) + +SELECT master_remove_node('localhost', :master_port); + master_remove_node +-------------------- + +(1 row) + +-- test that coordinator pg_dist_node entry was removed from the workers +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + verify_metadata | verify_metadata +-----------------+----------------- + t | t +(1 row) + +DROP SCHEMA mx_add_coordinator CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ref +drop cascades to table ref_7000000 +SET search_path TO DEFAULT; +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 37c483d7b..72276a0ec 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -34,19 +34,19 @@ WHERE 1250000 | t | t (1 row) +SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset SELECT - shardid, shardstate, nodename, nodeport + shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass) -ORDER BY - placementid; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- - 1250000 | 1 | localhost | 57637 - 1250000 | 1 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | all_placements_healthy | replicated_to_all +---------+------------------------+------------------- + 1250000 | t | t +(1 row) -- check whether data was copied into distributed table SELECT * FROM reference_table_test; @@ -783,18 +783,16 @@ SELECT create_reference_table('reference_table_test_fourth'); (1 row) +\set VERBOSITY terse -- insert a row INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01'); -- now get the unique key violation INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01'); ERROR: duplicate key value violates unique constraint "reference_table_test_fourth_pkey_1250003" -DETAIL: Key (value_2)=(1) already exists. -CONTEXT: while executing command on localhost:57637 -- now get null constraint violation due to primary key INSERT INTO reference_table_test_fourth (value_1, value_3, value_4) VALUES (1, '1.0', '2016-12-01'); ERROR: null value in column "value_2" violates not-null constraint -DETAIL: Failing row contains (1, null, 1.0, 2016-12-01 00:00:00). -CONTEXT: while executing command on localhost:57637 +\set VERBOSITY default -- lets run some upserts INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01') ON CONFLICT DO NOTHING RETURNING *; value_1 | value_2 | value_3 | value_4 @@ -820,18 +818,17 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON C -- finally see that shard healths are OK SELECT - shardid, shardstate, nodename, nodeport + shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test_fourth'::regclass) -ORDER BY - placementid; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- - 1250003 | 1 | localhost | 57637 - 1250003 | 1 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | all_placements_healthy | replicated_to_all +---------+------------------------+------------------- + 1250003 | t | t +(1 row) -- let's not run some update/delete queries on arbitrary columns DELETE FROM @@ -1439,13 +1436,13 @@ SELECT shardid, shardstate FROM pg_dist_shard_placement WHERE placementid = :a_p (1 row) -- some queries that are captured in functions -CREATE FUNCTION select_count_all() RETURNS bigint AS ' +CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS ' SELECT count(*) FROM reference_table_test; ' LANGUAGE SQL; -CREATE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) +CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) RETURNS void AS ' INSERT INTO reference_table_test VALUES ($1, $2, $3, $4); ' LANGUAGE SQL; @@ -1587,10 +1584,12 @@ BEGIN; ALTER TABLE reference_table_test ADD COLUMN value_dummy INT; INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); ROLLBACK; --- clean up tables +-- clean up tables, ... +SET client_min_messages TO ERROR; +DROP SEQUENCE example_ref_value_seq; DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite; + reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite, + colocated_table_test, colocated_table_test_2, append_reference_tmp_table; +DROP TYPE reference_comp_key; DROP SCHEMA reference_schema CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table reference_schema.reference_table_test_sixth -drop cascades to table reference_schema.reference_table_test_seventh +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 15ca5ab7e..ab54c801b 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -104,7 +104,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -133,7 +134,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370001 | 1 | 0 | localhost | 57638 @@ -157,7 +159,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370001 | 1 | 0 | localhost | 57638 @@ -186,7 +189,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370001 | 1 | 0 | localhost | 57638 @@ -224,7 +228,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -255,7 +260,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -286,7 +292,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -317,7 +324,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370003 | 1 | 0 | localhost | 57638 @@ -384,29 +392,29 @@ WHERE colocationid IN 10004 | 1 | -1 | 0 (1 row) +SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype = 0 \gset SELECT - logicalrelid, partmethod, colocationid, repmodel + logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel FROM pg_dist_partition WHERE logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') ORDER BY logicalrelid; - logicalrelid | partmethod | colocationid | repmodel ------------------------------------------+------------+--------------+---------- - replicate_reference_table_reference_one | n | 10004 | t - replicate_reference_table_hash | h | 1360005 | c + logicalrelid | partmethod | ?column? | repmodel +-----------------------------------------+------------+----------+---------- + replicate_reference_table_reference_one | n | t | t + replicate_reference_table_hash | h | f | c (2 rows) BEGIN; +SET LOCAL client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638 ?column? ---------- 1 (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); -NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -426,8 +434,7 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port -ORDER BY - shardid; +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370004 | 1 | 0 | localhost | 57638 @@ -447,18 +454,18 @@ WHERE colocationid IN (1 row) SELECT - logicalrelid, partmethod, colocationid, repmodel + logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel FROM pg_dist_partition WHERE logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two') ORDER BY logicalrelid; - logicalrelid | partmethod | colocationid | repmodel ------------------------------------------+------------+--------------+---------- - replicate_reference_table_reference_one | n | 10004 | t - replicate_reference_table_hash | n | 10004 | t - replicate_reference_table_reference_two | n | 10004 | t + logicalrelid | partmethod | ?column? | repmodel +-----------------------------------------+------------+----------+---------- + replicate_reference_table_reference_one | n | t | t + replicate_reference_table_hash | n | t | t + replicate_reference_table_reference_two | n | t | t (3 rows) DROP TABLE replicate_reference_table_reference_one; @@ -526,7 +533,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -558,7 +566,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -589,7 +598,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -618,7 +628,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370011 | 1 | 0 | localhost | 57638 @@ -661,7 +672,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+----------+---------- (0 rows) @@ -681,7 +693,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- 1370012 | 1 | 0 | localhost | 57638 @@ -718,7 +731,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) --- we should see only one shard placements +-- we should see only one shard placements (other than coordinator) SELECT shardid, shardstate, shardlength, nodename, nodeport FROM @@ -730,6 +743,7 @@ WHERE pg_dist_shard WHERE logicalrelid = 'initially_not_replicated_reference_table'::regclass) + AND nodeport != :master_port ORDER BY 1,4,5; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- @@ -755,6 +769,7 @@ WHERE pg_dist_shard WHERE logicalrelid = 'initially_not_replicated_reference_table'::regclass) + AND nodeport != :master_port ORDER BY 1,4,5; shardid | shardstate | shardlength | nodename | nodeport ---------+------------+-------------+-----------+---------- diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index 65a316bd1..197a464a9 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -1,5 +1,16 @@ -- Tests for prepared transaction recovery SET citus.next_shard_id TO 1220000; +-- reference tables can have placements on the coordinator. Add it so +-- verify we recover transactions which do DML on coordinator placements +-- properly. +SET client_min_messages TO ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +---------- + 1 +(1 row) + +RESET client_min_messages; -- enforce 1 connection per placement since -- the tests are prepared for that SET citus.force_max_query_parallelization TO ON; @@ -30,14 +41,25 @@ BEGIN; CREATE TABLE should_be_sorted_into_middle (value int); PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; \c - - - :master_port +BEGIN; +CREATE TABLE should_abort (value int); +PREPARE TRANSACTION 'citus_0_should_abort'; +BEGIN; +CREATE TABLE should_commit (value int); +PREPARE TRANSACTION 'citus_0_should_commit'; +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; SET citus.force_max_query_parallelization TO ON; -- Add "fake" pg_dist_transaction records and run recovery -INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); -INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'), + (0, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'), + (0, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); recover_prepared_transactions ------------------------------- - 3 + 6 (1 row) SELECT count(*) FROM pg_dist_transaction; @@ -46,6 +68,18 @@ SELECT count(*) FROM pg_dist_transaction; 0 (1 row) +SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; + count +------- + 1 +(1 row) + -- Confirm that transactions were correctly rolled forward \c - - - :worker_1_port SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; @@ -328,3 +362,9 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; +SELECT 1 FROM master_remove_node('localhost', :master_port); + ?column? +---------- + 1 +(1 row) + diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index ab35cc706..c3706d815 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -5,6 +5,12 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; +-- We run this twice, once with coordinator node in pg_dist_node and once without. +-- Set client_min_messages to WARNING to discard NOTICE messages by +-- upgrade_to_reference_table() to make the output consistent in both cases. +-- We check that reference table placements were actually replicated by checking +-- pg_dist_placement. +SET client_min_messages TO WARNING; -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); SELECT upgrade_to_reference_table('upgrade_reference_table_local'); @@ -74,7 +80,8 @@ SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); (1 row) UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass; -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; +UPDATE pg_dist_shard_placement SET shardstate = 3 + WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid); SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); ERROR: could not find any healthy placement for shard 1360006 DROP TABLE upgrade_reference_table_unhealthy; @@ -91,13 +98,13 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); -NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638 upgrade_to_reference_table ---------------------------- (1 row) DROP TABLE upgrade_reference_table_composite; +DROP TYPE upgrade_test_composite_type; -- test with reference table CREATE TABLE upgrade_reference_table_reference(column1 int); SELECT create_reference_table('upgrade_reference_table_reference'); @@ -152,20 +159,22 @@ WHERE colocationid IN --------------+------------+-------------------+------------------------ (0 rows) +SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360009 | 1 | 8192 | localhost | 57637 + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360009 | f (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_append'); -NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -206,19 +215,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360009 | 1 | 8192 | localhost | 57637 - 1360009 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360009 | t +(1 row) DROP TABLE upgrade_reference_table_append; @@ -266,19 +274,20 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360010 | 1 | 0 | localhost | 57637 + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360010 | f (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); -NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -319,19 +328,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360010 | 1 | 0 | localhost | 57637 - 1360010 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360010 | t +(1 row) DROP TABLE upgrade_reference_table_one_worker; @@ -344,7 +352,8 @@ SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column (1 row) -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port; +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port; -- situation before upgrade_reference_table SELECT partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel @@ -380,19 +389,19 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360011 | 1 | 0 | localhost | 57637 - 1360011 | 1 | 0 | localhost | 57638 -(2 rows) + AND shardstate = 1 +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360011 | f +(1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); upgrade_to_reference_table @@ -435,19 +444,19 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360011 | 1 | 0 | localhost | 57637 - 1360011 | 1 | 0 | localhost | 57638 -(2 rows) + AND shardstate = 1 +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360011 | t +(1 row) DROP TABLE upgrade_reference_table_one_unhealthy; @@ -494,19 +503,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360012 | 1 | 0 | localhost | 57637 - 1360012 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid +--------- + 1360012 +(1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); upgrade_to_reference_table @@ -549,19 +557,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360012 | 1 | 0 | localhost | 57637 - 1360012 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360012 | t +(1 row) DROP TABLE upgrade_reference_table_both_healthy; @@ -610,20 +617,21 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360013 | 1 | 0 | localhost | 57637 + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360013 | f (1 row) BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); -NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -665,15 +673,17 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360013 | 1 | 0 | localhost | 57637 + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360013 | f (1 row) @@ -723,20 +733,21 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360014 | 1 | 0 | localhost | 57637 + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360014 | f (1 row) BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); -NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -778,19 +789,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass) -ORDER BY - nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360014 | 1 | 0 | localhost | 57637 - 1360014 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360014 | t +(1 row) -- verify that shard is replicated to other worker \c - - - :worker_2_port @@ -849,16 +859,17 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360015 | 1 | 0 | localhost | 57637 +GROUP BY shardid +ORDER BY shardid; + shardid +--------- + 1360015 (1 row) @@ -901,16 +912,17 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360015 | 1 | 0 | localhost | 57637 +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360015 | f (1 row) DROP TABLE upgrade_reference_table_mx; @@ -970,22 +982,22 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360016 | 1 | 0 | localhost | 57637 - 1360016 | 3 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid +--------- + 1360016 +(1 row) +SET client_min_messages TO WARNING; SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); -NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -1027,18 +1039,18 @@ WHERE colocationid IN (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360016 | 1 | 0 | localhost | 57637 - 1360016 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360016 | t +(1 row) -- situation on metadata worker @@ -1066,18 +1078,18 @@ WHERE (1 row) SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1360016 | 1 | 0 | localhost | 57637 - 1360016 | 1 | 0 | localhost | 57638 -(2 rows) +GROUP BY shardid +ORDER BY shardid; + shardid | ?column? +---------+---------- + 1360016 | t +(1 row) \c - - - :master_port @@ -1088,3 +1100,4 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); (1 row) +RESET client_min_messages; diff --git a/src/test/regress/expected/remove_coordinator.out b/src/test/regress/expected/remove_coordinator.out new file mode 100644 index 000000000..f06ad8c6b --- /dev/null +++ b/src/test/regress/expected/remove_coordinator.out @@ -0,0 +1,7 @@ +-- removing coordinator from pg_dist_node should update pg_dist_colocation +SELECT master_remove_node('localhost', :master_port); + master_remove_node +-------------------- + +(1 row) + diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out new file mode 100644 index 000000000..3f46d4189 --- /dev/null +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -0,0 +1,103 @@ +-- +-- REPLICATE_REF_TABLES_ON_COORDINATOR +-- +CREATE SCHEMA replicate_ref_to_coordinator; +SET search_path TO 'replicate_ref_to_coordinator'; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 8000000; +SET citus.next_placement_id TO 8000000; +--- enable logging to see which tasks are executed locally +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; +CREATE TABLE squares(a int, b int); +SELECT create_reference_table('squares'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; +-- should be executed locally +SELECT count(*) FROM squares; +LOG: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares + count +------- + 10 +(1 row) + +-- create a second reference table +CREATE TABLE numbers(a int); +SELECT create_reference_table('numbers'); + create_reference_table +------------------------ + +(1 row) + +INSERT INTO numbers VALUES (20), (21); +LOG: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21) +-- INSERT ... SELECT between reference tables +BEGIN; +EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; + QUERY PLAN +----------------------------------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57636 dbname=regression + -> Insert on squares_8000000 citus_table_alias (cost=0.00..41.88 rows=2550 width=8) + -> Seq Scan on numbers_8000001 numbers (cost=0.00..41.88 rows=2550 width=8) +(7 rows) + +INSERT INTO squares SELECT a, a*a FROM numbers; +SELECT * FROM squares WHERE a >= 20 ORDER BY a; + a | b +----+----- + 20 | 400 + 21 | 441 +(2 rows) + +ROLLBACK; +BEGIN; +EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; + QUERY PLAN +---------------------------------------------------------------------------------------------- + Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 1 + Tasks Shown: All + -> Task + Node: host=localhost port=57636 dbname=regression + -> Insert on numbers_8000001 citus_table_alias (cost=0.00..38.25 rows=753 width=4) + -> Seq Scan on squares_8000000 squares (cost=0.00..38.25 rows=753 width=4) + Filter: (a < 3) +(8 rows) + +INSERT INTO numbers SELECT a FROM squares WHERE a < 3; +SELECT * FROM numbers ORDER BY a; + a +---- + 1 + 2 + 20 + 21 +(4 rows) + +ROLLBACK; +-- Make sure we hide shard tables ... + SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); + citus_table_is_visible +------------------------ + f +(1 row) + +-- clean-up +SET client_min_messages TO ERROR; +DROP SCHEMA replicate_ref_to_coordinator CASCADE; +-- Make sure the shard was dropped + SELECT 'numbers_8000001'::regclass::oid; +ERROR: relation "numbers_8000001" does not exist +LINE 1: SELECT 'numbers_8000001'::regclass::oid; + ^ +SET search_path TO DEFAULT; +RESET client_min_messages; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index a058a6ef4..c16e633c5 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -30,6 +30,7 @@ test: isolation_dump_local_wait_edges test: isolation_replace_wait_function test: isolation_distributed_deadlock_detection +test: isolation_replicate_reference_tables_to_coordinator # creating a restore point briefly blocks all # writes, run this test serially. diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index c82b45d17..d59dd021c 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -21,6 +21,7 @@ test: multi_test_helpers # the following test has to be run sequentially test: multi_mx_create_table test: multi_mx_hide_shard_names +test: multi_mx_add_coordinator test: multi_mx_modifications_to_reference_tables test: multi_mx_partitioning test: multi_mx_copy_data multi_mx_router_planner diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 8be0962ac..0127673ba 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -251,6 +251,18 @@ test: multi_upgrade_reference_table test: multi_replicate_reference_table test: multi_remove_node_reference_table +# -------- +# Replicating reference tables to coordinator. Add coordinator to pg_dist_node +# and rerun some of the tests. +# -------- +test: add_coordinator +test: multi_upgrade_reference_table +test: multi_replicate_reference_table +test: multi_reference_table +test: foreign_key_to_reference_table +test: replicate_reference_tables_to_coordinator +test: remove_coordinator + # ---------- # multi_transactional_drop_shards tests for dropping shards using connection API # ---------- diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 09cec3b6f..78e93de6f 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -15,8 +15,8 @@ # --- test: multi_extension test: multi_cluster_management -test: multi_test_helpers test: multi_table_ddl +test: multi_test_helpers # ---------- # The following distributed tests depend on creating a partitioned table and diff --git a/src/test/regress/specs/isolation_distributed_deadlock_detection.spec b/src/test/regress/specs/isolation_distributed_deadlock_detection.spec index 34aa929af..056634fa7 100644 --- a/src/test/regress/specs/isolation_distributed_deadlock_detection.spec +++ b/src/test/regress/specs/isolation_distributed_deadlock_detection.spec @@ -368,7 +368,7 @@ step "s6-commit" COMMIT; } -# we disable the deamon during the regression tests in order to get consistent results +# we disable the daemon during the regression tests in order to get consistent results # thus we manually issue the deadlock detection session "deadlock-checker" diff --git a/src/test/regress/specs/isolation_replicate_reference_tables_to_coordinator.spec b/src/test/regress/specs/isolation_replicate_reference_tables_to_coordinator.spec new file mode 100644 index 000000000..9a7a13c5c --- /dev/null +++ b/src/test/regress/specs/isolation_replicate_reference_tables_to_coordinator.spec @@ -0,0 +1,134 @@ +setup +{ + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); + + SELECT master_add_node('localhost', 57636); + + CREATE TABLE ref_table(a int primary key); + SELECT create_reference_table('ref_table'); + INSERT INTO ref_table VALUES (1), (3), (5), (7); + + CREATE TABLE dist_table(a int, b int); + SELECT create_distributed_table('dist_table', 'a'); +} + +teardown +{ + SELECT citus_internal.restore_isolation_tester_func(); + DROP TABLE ref_table, dist_table; + SELECT master_remove_node('localhost', 57636); +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-end" +{ + END; +} + +step "s1-update-dist-table" +{ + update dist_table set b = 2 where a = 1; +} + +step "s1-update-ref-table" +{ + update ref_table set a = a + 1; +} + +step "s1-lock-ref-table-placement-on-coordinator" +{ + DO $$ + DECLARE refshardid int; + BEGIN + SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass; + EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text); + END + $$; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-end" +{ + END; +} + +step "s2-update-dist-table" +{ + update dist_table set b = 2 where a = 1; +} + +step "s2-lock-ref-table-placement-on-coordinator" +{ + DO $$ + DECLARE refshardid int; + BEGIN + SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass; + EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text); + END + $$; +} + +step "s2-view-dist" +{ + SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC; +} + +step "s2-view-worker" +{ + SELECT query, query_hostname, query_hostport, master_query_host_name, + master_query_host_port, state, wait_event_type, wait_event, usename, datname + FROM citus_worker_stat_activity + WHERE query NOT ILIKE '%pg_prepared_xacts%' AND + query NOT ILIKE '%COMMIT%' AND + query NOT ILIKE '%dump_local_wait_edges%' + ORDER BY query, query_hostport DESC; +} + + +step "s2-sleep" +{ + SELECT pg_sleep(0.5); +} + +step "s2-active-transactions" +{ + -- Admin should be able to see all transactions + SELECT count(*) FROM get_all_active_transactions(); + SELECT count(*) FROM get_global_active_transactions(); +} + +# we disable the daemon during the regression tests in order to get consistent results +# thus we manually issue the deadlock detection +session "deadlock-checker" + +# we issue the checker not only when there are deadlocks to ensure that we never cancel +# backend inappropriately +step "deadlock-checker-call" +{ + SELECT check_distributed_deadlocks(); +} + +# verify that locks on the placement of the reference table on the coordinator is +# taken into account when looking for distributed deadlocks +permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end" + +# verify that *_dist_stat_activity() functions return the correct result when query +# has a task on the coordinator. +permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end" + +# verify that get_*_active_transactions() functions return the correct result when +# the query has a task on the coordinator. +permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end" diff --git a/src/test/regress/sql/add_coordinator.sql b/src/test/regress/sql/add_coordinator.sql new file mode 100644 index 000000000..eb5e37778 --- /dev/null +++ b/src/test/regress/sql/add_coordinator.sql @@ -0,0 +1,14 @@ +-- +-- ADD_COORDINATOR +-- + +SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset + +-- adding the same node again should return the existing nodeid +SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid; + +-- adding another node with groupid=0 should error out +SELECT master_add_node('localhost', 12345, groupid => 0) = :master_nodeid; + +-- start_metadata_sync_to_node() for coordinator should raise a notice +SELECT start_metadata_sync_to_node('localhost', :master_port); diff --git a/src/test/regress/sql/foreign_key_to_reference_table.sql b/src/test/regress/sql/foreign_key_to_reference_table.sql index 039b45890..65a886e9d 100644 --- a/src/test/regress/sql/foreign_key_to_reference_table.sql +++ b/src/test/regress/sql/foreign_key_to_reference_table.sql @@ -249,8 +249,10 @@ INSERT INTO referencing_schema.referencing_table SELECT x, x from generate_serie DELETE FROM referenced_schema.referenced_table WHERE id > 800; SELECT count(*) FROM referencing_schema.referencing_table; +SET client_min_messages TO ERROR; DROP SCHEMA referenced_schema CASCADE; DROP SCHEMA referencing_schema CASCADE; +RESET client_min_messages; -- on delete set update cascades properly CREATE TABLE referenced_table(test_column int, test_column2 int, PRIMARY KEY(test_column)); @@ -952,5 +954,7 @@ ROLLBACK; DROP TABLE referenced_table CASCADE; DROP TABLE referencing_table; +SET client_min_messages TO ERROR; DROP SCHEMA fkey_reference_table CASCADE; SET search_path TO DEFAULT; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql new file mode 100644 index 000000000..b35949ddb --- /dev/null +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -0,0 +1,46 @@ +CREATE SCHEMA mx_add_coordinator; +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; + +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + +-- test that coordinator pg_dist_node entry is synced to the workers +SELECT wait_until_metadata_sync(); + +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + +CREATE TABLE ref(a int); +SELECT create_reference_table('ref'); + +-- test that changes from a metadata node is reflected in the coordinator placement +\c - - - :worker_1_port +SET search_path TO mx_add_coordinator,public; +INSERT INTO ref VALUES (1), (2), (3); +UPDATE ref SET a = a + 1; +DELETE FROM ref WHERE a > 3; + +\c - - - :master_port +SET search_path TO mx_add_coordinator,public; +SELECT * FROM ref ORDER BY a; + +-- Clear pg_dist_transaction before removing the node. This is to keep the output +-- of multi_mx_transaction_recovery consistent. +SELECT recover_prepared_transactions(); +SELECT count(*) FROM run_command_on_workers('SELECT recover_prepared_transactions()'); + +SELECT master_remove_node('localhost', :master_port); + +-- test that coordinator pg_dist_node entry was removed from the workers +SELECT wait_until_metadata_sync(); +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + +DROP SCHEMA mx_add_coordinator CASCADE; +SET search_path TO DEFAULT; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 954a37131..8f5f96be2 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -23,14 +23,17 @@ FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass; + +SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset + SELECT - shardid, shardstate, nodename, nodeport + shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass) -ORDER BY - placementid; +GROUP BY shardid +ORDER BY shardid; -- check whether data was copied into distributed table SELECT * FROM reference_table_test; @@ -478,6 +481,8 @@ ORDER BY CREATE TABLE reference_table_test_fourth (value_1 int, value_2 float PRIMARY KEY, value_3 text, value_4 timestamp); SELECT create_reference_table('reference_table_test_fourth'); +\set VERBOSITY terse + -- insert a row INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01'); @@ -487,6 +492,8 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01'); -- now get null constraint violation due to primary key INSERT INTO reference_table_test_fourth (value_1, value_3, value_4) VALUES (1, '1.0', '2016-12-01'); +\set VERBOSITY default + -- lets run some upserts INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01') ON CONFLICT DO NOTHING RETURNING *; INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON CONFLICT (value_2) DO @@ -499,13 +506,13 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON C -- finally see that shard healths are OK SELECT - shardid, shardstate, nodename, nodeport + shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test_fourth'::regclass) -ORDER BY - placementid; +GROUP BY shardid +ORDER BY shardid; -- let's not run some update/delete queries on arbitrary columns DELETE FROM @@ -909,14 +916,14 @@ SELECT master_copy_shard_placement(:a_shard_id, 'localhost', :worker_2_port, 'lo SELECT shardid, shardstate FROM pg_dist_shard_placement WHERE placementid = :a_placement_id; -- some queries that are captured in functions -CREATE FUNCTION select_count_all() RETURNS bigint AS ' +CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS ' SELECT count(*) FROM reference_table_test; ' LANGUAGE SQL; -CREATE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) +CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp) RETURNS void AS ' INSERT INTO reference_table_test VALUES ($1, $2, $3, $4); ' LANGUAGE SQL; @@ -992,7 +999,12 @@ ALTER TABLE reference_table_test ADD COLUMN value_dummy INT; INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); ROLLBACK; --- clean up tables +-- clean up tables, ... +SET client_min_messages TO ERROR; +DROP SEQUENCE example_ref_value_seq; DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third, - reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite; + reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite, + colocated_table_test, colocated_table_test_2, append_reference_tmp_table; +DROP TYPE reference_comp_key; DROP SCHEMA reference_schema CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index c116e1327..9e1686a8b 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -3,7 +3,6 @@ -- -- Tests that check that reference tables are replicated when adding new nodes. - SET citus.next_shard_id TO 1370000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; @@ -71,7 +70,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -88,7 +88,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -106,7 +107,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -123,7 +125,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -147,7 +150,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -166,7 +170,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -188,7 +193,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -207,7 +213,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -251,8 +258,10 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); +SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype = 0 \gset + SELECT - logicalrelid, partmethod, colocationid, repmodel + logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel FROM pg_dist_partition WHERE @@ -260,6 +269,7 @@ WHERE ORDER BY logicalrelid; BEGIN; +SET LOCAL client_min_messages TO ERROR; SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT upgrade_to_reference_table('replicate_reference_table_hash'); SELECT create_reference_table('replicate_reference_table_reference_two'); @@ -272,8 +282,7 @@ FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port -ORDER BY - shardid; +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -283,7 +292,7 @@ WHERE colocationid IN WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass); SELECT - logicalrelid, partmethod, colocationid, repmodel + logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel FROM pg_dist_partition WHERE @@ -350,7 +359,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -370,7 +380,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009; @@ -387,7 +398,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -404,7 +416,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT * FROM pg_dist_colocation @@ -434,7 +447,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; SELECT 1 FROM master_add_node('localhost', :worker_2_port); @@ -444,7 +458,8 @@ SELECT FROM pg_dist_shard_placement WHERE - nodeport = :worker_2_port; + nodeport = :worker_2_port +ORDER BY shardid, nodeport; -- verify constraints have been created on the new node SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';'); @@ -459,7 +474,7 @@ SELECT create_reference_table('initially_not_replicated_reference_table'); SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); --- we should see only one shard placements +-- we should see only one shard placements (other than coordinator) SELECT shardid, shardstate, shardlength, nodename, nodeport FROM @@ -471,6 +486,7 @@ WHERE pg_dist_shard WHERE logicalrelid = 'initially_not_replicated_reference_table'::regclass) + AND nodeport != :master_port ORDER BY 1,4,5; -- we should see the two shard placements after activation @@ -487,6 +503,7 @@ WHERE pg_dist_shard WHERE logicalrelid = 'initially_not_replicated_reference_table'::regclass) + AND nodeport != :master_port ORDER BY 1,4,5; -- this should have no effect diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 0c914d9f3..3b6efefba 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -1,6 +1,13 @@ -- Tests for prepared transaction recovery SET citus.next_shard_id TO 1220000; +-- reference tables can have placements on the coordinator. Add it so +-- verify we recover transactions which do DML on coordinator placements +-- properly. +SET client_min_messages TO ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + -- enforce 1 connection per placement since -- the tests are prepared for that SET citus.force_max_query_parallelization TO ON; @@ -28,14 +35,32 @@ CREATE TABLE should_be_sorted_into_middle (value int); PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; \c - - - :master_port + +BEGIN; +CREATE TABLE should_abort (value int); +PREPARE TRANSACTION 'citus_0_should_abort'; + +BEGIN; +CREATE TABLE should_commit (value int); +PREPARE TRANSACTION 'citus_0_should_commit'; + +BEGIN; +CREATE TABLE should_be_sorted_into_middle (value int); +PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; + SET citus.force_max_query_parallelization TO ON; -- Add "fake" pg_dist_transaction records and run recovery -INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'); -INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'), + (0, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'), + (0, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_transaction; +SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; +SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; + -- Confirm that transactions were correctly rolled forward \c - - - :worker_1_port SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort'; @@ -175,3 +200,5 @@ SELECT pg_reload_conf(); DROP TABLE test_recovery_ref; DROP TABLE test_recovery; DROP TABLE test_recovery_single; + +SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/multi_upgrade_reference_table.sql b/src/test/regress/sql/multi_upgrade_reference_table.sql index 1f94dd51b..8c79ca93a 100644 --- a/src/test/regress/sql/multi_upgrade_reference_table.sql +++ b/src/test/regress/sql/multi_upgrade_reference_table.sql @@ -7,6 +7,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000; +-- We run this twice, once with coordinator node in pg_dist_node and once without. +-- Set client_min_messages to WARNING to discard NOTICE messages by +-- upgrade_to_reference_table() to make the output consistent in both cases. +-- We check that reference table placements were actually replicated by checking +-- pg_dist_placement. +SET client_min_messages TO WARNING; + -- test with not distributed table CREATE TABLE upgrade_reference_table_local(column1 int); SELECT upgrade_to_reference_table('upgrade_reference_table_local'); @@ -48,7 +55,8 @@ DROP TABLE upgrade_reference_table_referenced; CREATE TABLE upgrade_reference_table_unhealthy(column1 int); SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass; -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; +UPDATE pg_dist_shard_placement SET shardstate = 3 + WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid); SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); DROP TABLE upgrade_reference_table_unhealthy; @@ -62,6 +70,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); DROP TABLE upgrade_reference_table_composite; +DROP TYPE upgrade_test_composite_type; -- test with reference table CREATE TABLE upgrade_reference_table_reference(column1 int); @@ -102,13 +111,17 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); +SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset + SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); + WHERE logicalrelid = 'upgrade_reference_table_append'::regclass) +GROUP BY shardid +ORDER BY shardid; SELECT upgrade_to_reference_table('upgrade_reference_table_append'); @@ -135,14 +148,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_append'::regclass) -ORDER BY - nodeport; +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_append; @@ -174,12 +187,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); + WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass) +GROUP BY shardid +ORDER BY shardid; SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); @@ -206,14 +221,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass) -ORDER BY - nodeport; +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_one_worker; @@ -221,7 +236,8 @@ DROP TABLE upgrade_reference_table_one_worker; SET citus.shard_replication_factor TO 2; CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1'); -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port; +UPDATE pg_dist_shard_placement SET shardstate = 3 +WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port; -- situation before upgrade_reference_table SELECT @@ -246,14 +262,15 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass) -ORDER BY - nodeport; + AND shardstate = 1 +GROUP BY shardid +ORDER BY shardid; SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy'); @@ -280,14 +297,15 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass) -ORDER BY - nodeport; + AND shardstate = 1 +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_one_unhealthy; @@ -318,14 +336,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass) -ORDER BY - nodeport; +GROUP BY shardid +ORDER BY shardid; SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy'); @@ -352,14 +370,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass) -ORDER BY - nodeport; +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_both_healthy; @@ -392,12 +410,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass) +GROUP BY shardid +ORDER BY shardid; BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); @@ -425,13 +445,16 @@ WHERE colocationid IN FROM pg_dist_partition WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); + WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass) +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_transaction_rollback; @@ -464,12 +487,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); + WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass) +GROUP BY shardid +ORDER BY shardid; BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); @@ -498,14 +523,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass) -ORDER BY - nodeport; +GROUP BY shardid +ORDER BY shardid; -- verify that shard is replicated to other worker \c - - - :worker_2_port @@ -544,13 +569,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; +GROUP BY shardid +ORDER BY shardid; SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); @@ -579,13 +605,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; +GROUP BY shardid +ORDER BY shardid; DROP TABLE upgrade_reference_table_mx; @@ -624,15 +651,16 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; +GROUP BY shardid +ORDER BY shardid; - +SET client_min_messages TO WARNING; SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); @@ -659,13 +687,14 @@ WHERE colocationid IN WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass); SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; +GROUP BY shardid +ORDER BY shardid; -- situation on metadata worker \c - - - :worker_1_port @@ -684,15 +713,16 @@ WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass; SELECT - shardid, shardstate, shardlength, nodename, nodeport + shardid, count(distinct nodeport) = :active_primaries FROM pg_dist_shard_placement WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass) -ORDER BY nodeport; +GROUP BY shardid +ORDER BY shardid; \c - - - :master_port DROP TABLE upgrade_reference_table_mx; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); - +RESET client_min_messages; diff --git a/src/test/regress/sql/remove_coordinator.sql b/src/test/regress/sql/remove_coordinator.sql new file mode 100644 index 000000000..2db26d4d7 --- /dev/null +++ b/src/test/regress/sql/remove_coordinator.sql @@ -0,0 +1,2 @@ +-- removing coordinator from pg_dist_node should update pg_dist_colocation +SELECT master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql new file mode 100644 index 000000000..945cf3dfe --- /dev/null +++ b/src/test/regress/sql/replicate_reference_tables_to_coordinator.sql @@ -0,0 +1,52 @@ +-- +-- REPLICATE_REF_TABLES_ON_COORDINATOR +-- + +CREATE SCHEMA replicate_ref_to_coordinator; +SET search_path TO 'replicate_ref_to_coordinator'; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 8000000; +SET citus.next_placement_id TO 8000000; + +--- enable logging to see which tasks are executed locally +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; + +CREATE TABLE squares(a int, b int); +SELECT create_reference_table('squares'); +INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; + +-- should be executed locally +SELECT count(*) FROM squares; + +-- create a second reference table +CREATE TABLE numbers(a int); +SELECT create_reference_table('numbers'); +INSERT INTO numbers VALUES (20), (21); + +-- INSERT ... SELECT between reference tables +BEGIN; +EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers; +INSERT INTO squares SELECT a, a*a FROM numbers; +SELECT * FROM squares WHERE a >= 20 ORDER BY a; +ROLLBACK; + +BEGIN; +EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3; +INSERT INTO numbers SELECT a FROM squares WHERE a < 3; +SELECT * FROM numbers ORDER BY a; +ROLLBACK; + +-- Make sure we hide shard tables ... + SELECT citus_table_is_visible('numbers_8000001'::regclass::oid); + +-- clean-up +SET client_min_messages TO ERROR; +DROP SCHEMA replicate_ref_to_coordinator CASCADE; + +-- Make sure the shard was dropped + SELECT 'numbers_8000001'::regclass::oid; + +SET search_path TO DEFAULT; +RESET client_min_messages;