diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 6f5443ba3..ec7db7da4 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -25,6 +25,7 @@ #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_planner.h" #include "distributed/listutils.h" #include "distributed/shard_cleaner.h" @@ -1207,39 +1208,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, tableOwner, ddlCommandList); } - int taskId = 0; - List *copyTaskList = NIL; - foreach_ptr(shardInterval, shardIntervalList) - { - /* - * Skip copying data for partitioned tables, because they contain no - * data themselves. Their partitions do contain data, but those are - * different colocated shards that will be copied seperately. - */ - if (!PartitionedTable(shardInterval->relationId)) - { - char *copyCommand = CreateShardCopyCommand( - shardInterval, targetNode); - - Task *copyTask = CreateBasicTask( - INVALID_JOB_ID, - taskId, - READ_TASK, - copyCommand); - - ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(taskPlacement, sourceNode); - - copyTask->taskPlacementList = list_make1(taskPlacement); - - copyTaskList = lappend(copyTaskList, copyTask); - taskId++; - } - } - - ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, - MaxAdaptiveExecutorPoolSize, - NULL /* jobIdList (ignored by API implementation) */); + CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL); foreach_ptr(shardInterval, shardIntervalList) { @@ -1309,6 +1278,85 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } +/* + * CopyShardsToNode copies the list of shards from the source to the target. + * When snapshotName is not NULL it will do the COPY using this snapshot name. + */ +void +CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, + char *snapshotName) +{ + int taskId = 0; + List *copyTaskList = NIL; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + /* + * Skip copying data for partitioned tables, because they contain no + * data themselves. Their partitions do contain data, but those are + * different colocated shards that will be copied seperately. + */ + if (PartitionedTable(shardInterval->relationId)) + { + continue; + } + + List *ddlCommandList = NIL; + + /* + * This uses repeatable read because we want to read the table in + * the state exactly as it was when the snapshot was created. This + * is needed when using this code for the initial data copy when + * using logical replication. The logical replication catchup might + * fail otherwise, because some of the updates that it needs to do + * have already been applied on the target. + */ + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + ddlCommandList = lappend(ddlCommandList, beginTransaction->data); + + /* Set snapshot for non-blocking shard split. */ + if (snapshotName != NULL) + { + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr( + snapshotName)); + ddlCommandList = lappend(ddlCommandList, snapShotString->data); + } + + char *copyCommand = CreateShardCopyCommand( + shardInterval, targetNode); + + ddlCommandList = lappend(ddlCommandList, copyCommand); + + StringInfo commitCommand = makeStringInfo(); + appendStringInfo(commitCommand, "COMMIT;"); + ddlCommandList = lappend(ddlCommandList, commitCommand->data); + + Task *task = CitusMakeNode(Task); + task->jobId = shardInterval->shardId; + task->taskId = taskId; + task->taskType = READ_TASK; + task->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(task, ddlCommandList); + + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, sourceNode); + + task->taskPlacementList = list_make1(taskPlacement); + + copyTaskList = lappend(copyTaskList, task); + taskId++; + } + + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList, + MaxAdaptiveExecutorPoolSize, + NULL /* jobIdList (ignored by API implementation) */); +} + + /* * CreateShardCopyCommand constructs the command to copy a shard to another * worker node. This command needs to be run on the node wher you want to copy diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 472ca6c73..25754699b 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -41,6 +41,7 @@ #include "distributed/multi_partitioning_utils.h" #include "distributed/distributed_planner.h" #include "distributed/remote_commands.h" +#include "distributed/repair_shards.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" #include "distributed/version_compat.h" @@ -119,16 +120,16 @@ static void DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds); static void CreateShardMovePublications(MultiConnection *connection, List *shardList, Bitmapset *tableOwnerIds); -static void CreateShardMoveSubscriptions(MultiConnection *connection, - char *sourceNodeName, - int sourceNodePort, char *userName, - char *databaseName, +static MultiConnection * GetReplicationConnection(char *nodeName, int nodePort); +static char * CreateShardMoveSubscriptions(MultiConnection *sourceConnection, + MultiConnection *targetConnection, + MultiConnection *sourceReplicationConnection, + char *databaseName, + Bitmapset *tableOwnerIds); +static void EnableShardMoveSubscriptions(MultiConnection *targetConnection, Bitmapset *tableOwnerIds); static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); - -static uint64 TotalRelationSizeForSubscription(MultiConnection *connection, - char *command); static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, Bitmapset *tableOwnerIds, char *operationPrefix); @@ -183,6 +184,9 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo GetNodeUserDatabaseConnection(connectionFlags, targetNodeName, targetNodePort, superUser, databaseName); + MultiConnection *sourceReplicationConnection = + GetReplicationConnection(sourceNodeName, sourceNodePort); + /* * Operations on publications and subscriptions cannot run in a transaction * block. Claim the connections exclusively to ensure they do not get used @@ -193,29 +197,57 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo PG_TRY(); { - /* - * We have to create the primary key (or any other replica identity) - * before the initial COPY is done. This is necessary because as soon - * as the COPY command finishes, the update/delete operations that - * are queued will be replicated. And, if the replica identity does not - * exist on the target, the replication would fail. - */ - CreateReplicaIdentity(shardList, targetNodeName, targetNodePort); - /* set up the publication on the source and subscription on the target */ CreateShardMovePublications(sourceConnection, replicationSubscriptionList, tableOwnerIds); - CreateShardMoveSubscriptions(targetConnection, sourceNodeName, sourceNodePort, - superUser, databaseName, tableOwnerIds); + char *snapshot = CreateShardMoveSubscriptions( + sourceConnection, + targetConnection, + sourceReplicationConnection, + databaseName, + tableOwnerIds); /* only useful for isolation testing, see the function comment for the details */ ConflictOnlyWithIsolationTesting(); + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + CopyShardsToNode(sourceNode, targetNode, shardList, snapshot); + /* - * Logical replication starts with copying the existing data for each table in - * the publication. During the copy operation the state of the associated relation - * subscription is not ready. There is no point of locking the shards before the - * subscriptions for each relation becomes ready, so wait for it. + * We can close this connection now, because we're done copying the + * data and thus don't need access to the snapshot anymore. The + * replication slot will still be at the same LSN, because the + * subscriptions have not been enabled yet. + */ + CloseConnection(sourceReplicationConnection); + + /* + * We have to create the primary key (or any other replica identity) + * before the update/delete operations that are queued will be + * replicated. Because if the replica identity does not exist on the + * target, the replication would fail. + * + * So we it right after the initial data COPY, but before enabling the + * susbcriptions. We do it at this latest possible moment, because its + * much cheaper to build an index at once than to create it + * incrementally. So this way we create the primary key index in one go + * for all data from the initial COPY. + */ + CreateReplicaIdentity(shardList, targetNodeName, targetNodePort); + + /* Start applying the changes from the replication slots to catch up. */ + EnableShardMoveSubscriptions(targetConnection, tableOwnerIds); + + /* + * The following check is a leftover from when used subscriptions with + * copy_data=true. It's probably not really necessary anymore, but it + * seemed like a nice check to keep. At least for debugging issues it + * seems nice to report differences between the subscription never + * becoming ready and the subscriber not applying WAL. It's also not + * entirely clear if the catchup check handles the case correctly where + * the subscription is not in the ready state yet, because so far it + * never had to. */ WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); @@ -1129,26 +1161,12 @@ ShardMovePublicationName(Oid ownerId) /* * ShardSubscriptionName returns the name of the subscription for the given - * owner. If we're running the isolation tester the function also appends the - * process id normal subscription name. - * - * When it contains the PID of the current process it is used for block detection - * by the isolation test runner, since the replication process on the publishing - * node uses the name of the subscription as the application_name of the SQL session. - * This PID is then extracted from the application_name to find out which PID on the - * coordinator is blocked by the blocked replication process. + * owner. */ char * ShardSubscriptionName(Oid ownerId, char *operationPrefix) { - if (RunningUnderIsolationTest) - { - return psprintf("%s%i_%i", operationPrefix, ownerId, MyProcPid); - } - else - { - return psprintf("%s%i", operationPrefix, ownerId); - } + return psprintf("%s%i", operationPrefix, ownerId); } @@ -1445,6 +1463,63 @@ CreateShardMovePublications(MultiConnection *connection, List *shardList, } +/* + * GetReplicationConnection opens a new replication connection to this node. + * This connection can be used to send replication commands, such as + * CREATE_REPLICATION_SLOT. + */ +static MultiConnection * +GetReplicationConnection(char *nodeName, int nodePort) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM; + + MultiConnection *connection = GetNodeUserDatabaseConnection( + connectionFlags, + nodeName, + nodePort, + CitusExtensionOwnerName(), + get_database_name(MyDatabaseId)); + ClaimConnectionExclusively(connection); + return connection; +} + + +/* + * CreateReplicationSlot creates a replication slot with the given slot name + * over the given connection. The given connection should be a replication + * connection. This function returns the name of the snapshot that is used for + * this replication slot. When using this snapshot name for other transactions + * you need to keep the given replication connection open until you have used + * the snapshot name. + */ +static char * +CreateReplicationSlot(MultiConnection *connection, char *slotname) +{ + StringInfo createReplicationSlotCommand = makeStringInfo(); + appendStringInfo(createReplicationSlotCommand, + "CREATE_REPLICATION_SLOT %s LOGICAL pgoutput EXPORT_SNAPSHOT;", + quote_identifier(slotname)); + + PGresult *result = NULL; + int response = ExecuteOptionalRemoteCommand(connection, + createReplicationSlotCommand->data, + &result); + + if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ReportResultError(connection, result, ERROR); + } + + /*'snapshot_name' is second column where index starts from zero. + * We're using the pstrdup to copy the data into the current memory context */ + char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); + PQclear(result); + ForgetResults(connection); + return snapShotName; +} + + /* * CreateShardMoveSubscriptions creates the subscriptions used for shard moves * over the given connection. One subscription is created for each of the table @@ -1454,18 +1529,28 @@ CreateShardMovePublications(MultiConnection *connection, List *shardList, * names directly (rather than looking up any relevant pg_dist_poolinfo rows), * all such connections remain direct and will not route through any configured * poolers. + * + * The subscriptions created by this function are created in the disabled + * state. This is done so a data copy can be done manually afterwards. To + * enable the subscriptions you can use EnableShardMoveSubscriptions(). + * + * This function returns the snapshot name of the replication slots that are + * used by the subscription. When using this snapshot name for other + * transactions you need to keep the given replication connection open until + * you have used the snapshot name. */ -static void -CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, - int sourceNodePort, char *userName, char *databaseName, +static char * +CreateShardMoveSubscriptions(MultiConnection *sourceConnection, + MultiConnection *targetConnection, + MultiConnection *sourceReplicationConnection, + char *databaseName, Bitmapset *tableOwnerIds) { int ownerId = -1; + char *firstReplicationSlot = NULL; + char *snapshot = NULL; while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) { - StringInfo createSubscriptionCommand = makeStringInfo(); - StringInfo conninfo = makeStringInfo(); - /* * The CREATE USER command should not propagate, so we temporarily * disable DDL propagation. @@ -1475,7 +1560,9 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, * This prevents permission escalations. */ SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, + targetConnection->hostname, + targetConnection->port, + targetConnection->user, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( @@ -1484,23 +1571,45 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, GetUserNameFromId(ownerId, false) ))); + if (!firstReplicationSlot) + { + firstReplicationSlot = ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX); + snapshot = CreateReplicationSlot( + sourceReplicationConnection, + firstReplicationSlot); + } + else + { + ExecuteCriticalRemoteCommand( + sourceConnection, + psprintf("SELECT pg_catalog.pg_copy_logical_replication_slot(%s, %s)", + quote_literal_cstr(firstReplicationSlot), + quote_literal_cstr(ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX)))); + } + + StringInfo conninfo = makeStringInfo(); appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " "connect_timeout=20", - escape_param_str(sourceNodeName), sourceNodePort, - escape_param_str(userName), escape_param_str(databaseName)); + escape_param_str(sourceConnection->hostname), + sourceConnection->port, + escape_param_str(sourceConnection->user), escape_param_str( + databaseName)); + StringInfo createSubscriptionCommand = makeStringInfo(); appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " - "WITH (citus_use_authinfo=true, enabled=false)", + "WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false)", quote_identifier(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)), quote_literal_cstr(conninfo->data), quote_identifier(ShardMovePublicationName(ownerId))); - ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); + ExecuteCriticalRemoteCommand(targetConnection, createSubscriptionCommand->data); pfree(createSubscriptionCommand->data); pfree(createSubscriptionCommand); - ExecuteCriticalRemoteCommand(connection, psprintf( + ExecuteCriticalRemoteCommand(targetConnection, psprintf( "ALTER SUBSCRIPTION %s OWNER TO %s", ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), @@ -1513,15 +1622,31 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, * disable DDL propagation. */ SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, + targetConnection->hostname, + targetConnection->port, + targetConnection->user, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "ALTER ROLE %s NOSUPERUSER", ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) ))); + } + return snapshot; +} - ExecuteCriticalRemoteCommand(connection, psprintf( + +/* + * EnableShardMoveSubscriptions enables all the the shard move subscriptions + * that belong to the given table owners. + */ +static void +EnableShardMoveSubscriptions(MultiConnection *targetConnection, Bitmapset *tableOwnerIds) +{ + int ownerId = -1; + while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + { + ExecuteCriticalRemoteCommand(targetConnection, psprintf( "ALTER SUBSCRIPTION %s ENABLE", ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX) @@ -1622,25 +1747,21 @@ GetRemoteLSN(MultiConnection *connection, char *command) /* - * WaitForRelationSubscriptionsBecomeReady waits until the states of the subsriptions - * for each shard becomes ready. This indicates that the initial COPY is finished - * on the shards. + * WaitForRelationSubscriptionsBecomeReady waits until the states of the + * subsriptions for each shard becomes ready. This should happen very quickly, + * because we don't use the COPY logic from the subscriptions. So all that's + * needed is to start reading from the replication slot. * - * The function errors if the total size of the relations that belong to the subscription - * on the target node doesn't change within LogicalReplicationErrorTimeout. The - * function also reports its progress in every logicalReplicationProgressReportTimeout. + * The function errors if the subscriptions don't become ready within + * LogicalReplicationErrorTimeout. The function also reports its progress in + * every logicalReplicationProgressReportTimeout. */ void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, Bitmapset *tableOwnerIds, char *operationPrefix) { - uint64 previousTotalRelationSizeForSubscription = 0; TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); - - /* report in the first iteration as well */ - TimestampTz previousReportTime = 0; - - uint64 previousReportedTotalSize = 0; + TimestampTz previousReportTime = GetCurrentTimestamp(); /* @@ -1671,94 +1792,37 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, break; } - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, - operationPrefix); - /* Get the current total size of tables belonging to the subscriber */ - uint64 currentTotalRelationSize = - TotalRelationSizeForSubscription(targetConnection, psprintf( - "SELECT sum(pg_total_relation_size(srrelid)) " - "FROM pg_subscription_rel, pg_stat_subscription " - "WHERE srsubid = subid AND subname IN %s", - subscriptionValueList - ) - ); - - /* - * The size has not been changed within the last iteration. If necessary - * log a messages. If size does not change over a given replication timeout - * error out. - */ - if (currentTotalRelationSize == previousTotalRelationSizeForSubscription) + /* log the progress if necessary */ + if (TimestampDifferenceExceeds(previousReportTime, + GetCurrentTimestamp(), + logicalReplicationProgressReportTimeout)) { - /* log the progress if necessary */ - if (TimestampDifferenceExceeds(previousReportTime, - GetCurrentTimestamp(), - logicalReplicationProgressReportTimeout)) - { - ereport(LOG, (errmsg("Subscription size has been staying same for the " - "last %d msec", - logicalReplicationProgressReportTimeout))); + ereport(LOG, (errmsg("Not all subscriptions for the shard move are " + "READY yet"))); - previousReportTime = GetCurrentTimestamp(); - } - - /* Error out if the size does not change within the given time threshold */ - if (TimestampDifferenceExceeds(previousSizeChangeTime, - GetCurrentTimestamp(), - LogicalReplicationTimeout)) - { - ereport(ERROR, (errmsg("The logical replication waiting timeout " - "%d msec exceeded", - LogicalReplicationTimeout), - errdetail("The subscribed relations haven't become " - "ready on the target node %s:%d", - targetConnection->hostname, - targetConnection->port), - errhint( - "There might have occurred problems on the target " - "node. If not, consider using higher values for " - "citus.logical_replication_timeout"))); - } + previousReportTime = GetCurrentTimestamp(); } - else + + /* Error out if the size does not change within the given time threshold */ + if (TimestampDifferenceExceeds(previousSizeChangeTime, + GetCurrentTimestamp(), + LogicalReplicationTimeout)) { - /* first, record that there is some change in the size */ - previousSizeChangeTime = GetCurrentTimestamp(); - - /* - * Subscription size may decrease or increase. - * - * Subscription size may decrease in case of VACUUM operation, which - * may get fired with autovacuum, on it. - * - * Increase of the relation's size belonging to subscriber means a successful - * copy from publisher to subscriber. - */ - bool sizeIncreased = currentTotalRelationSize > - previousTotalRelationSizeForSubscription; - - if (TimestampDifferenceExceeds(previousReportTime, - GetCurrentTimestamp(), - logicalReplicationProgressReportTimeout)) - { - ereport(LOG, ((errmsg("The total size of the relations belonging to " - "subscriptions %s from %ld to %ld at %s " - "on the target node %s:%d", - sizeIncreased ? "increased" : "decreased", - previousReportedTotalSize, - currentTotalRelationSize, - timestamptz_to_str(previousSizeChangeTime), + ereport(ERROR, (errmsg("The logical replication waiting timeout " + "of %d msec is exceeded", + LogicalReplicationTimeout), + errdetail("The subscribed relations haven't become " + "ready on the target node %s:%d", targetConnection->hostname, - targetConnection->port)))); - - previousReportedTotalSize = currentTotalRelationSize; - previousReportTime = GetCurrentTimestamp(); - } + targetConnection->port), + errhint( + "Logical replication has failed to initialize " + "on the target node. If not, consider using " + "higher values for " + "citus.logical_replication_timeout"))); } - previousTotalRelationSizeForSubscription = currentTotalRelationSize; - /* wait for 1 second (1000 miliseconds) and try again */ WaitForMiliseconds(1000); @@ -1769,62 +1833,6 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, } -/* - * TotalRelationSizeForSubscription is a helper function which returns the total - * size of the shards that are replicated via the subscription. Note that the - * function returns the total size including indexes. - */ -static uint64 -TotalRelationSizeForSubscription(MultiConnection *connection, char *command) -{ - bool raiseInterrupts = false; - uint64 remoteTotalSize = 0; - - int querySent = SendRemoteCommand(connection, command); - if (querySent == 0) - { - ReportConnectionError(connection, ERROR); - } - - PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); - if (!IsResponseOK(result)) - { - ReportResultError(connection, result, ERROR); - } - - int rowCount = PQntuples(result); - if (rowCount != 1) - { - ereport(ERROR, (errmsg("unexpected number of rows returned by: %s", - command))); - } - - int colCount = PQnfields(result); - if (colCount != 1) - { - ereport(ERROR, (errmsg("unexpected number of columns returned by: %s", - command))); - } - - if (!PQgetisnull(result, 0, 0)) - { - char *resultString = PQgetvalue(result, 0, 0); - - remoteTotalSize = SafeStringToUint64(resultString); - } - else - { - ereport(ERROR, (errmsg("unexpected value returned by: %s", - command))); - } - - PQclear(result); - ForgetResults(connection); - - return remoteTotalSize; -} - - /* * ShardSubscriptionNamesValueList returns a SQL value list containing the * subscription names for all of the given table owner ids. This value list can @@ -1997,7 +2005,7 @@ WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr LogicalReplicationTimeout)) { ereport(ERROR, (errmsg("The logical replication waiting timeout " - "%d msec exceeded", + "of %d msec is exceeded", LogicalReplicationTimeout), errdetail("The LSN on the target subscription hasn't " "caught up ready on the target node %s:%d", diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index a151dcd6b..7c04f9088 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2515,10 +2515,26 @@ CitusAuthHook(Port *port, int status) * * We do this so that this backend gets the chance to show * up in citus_lock_waits. + * + * We cannot assign a new global PID yet here, because that + * would require reading from catalogs, but that's not allowed + * this early in the connection startup (because no database + * has been assigned yet). */ InitializeBackendData(); SetBackendDataDistributedCommandOriginator(true); } + else + { + /* + * We set the global PID in the backend data here already to be able to + * do blocked process detection on connections that are opened over a + * replication connection. A replication connection backend will never + * call StartupCitusBackend, which normally sets up the global PID. + */ + InitializeBackendData(); + SetBackendDataGlobalPID(gpid); + } /* let other authentication hooks to kick in first */ if (original_client_auth_hook) diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 5d09a3aea..7160882b0 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -71,3 +71,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/worker_split_copy/11.1-1.sql" #include "udfs/worker_copy_table_to_node/11.1-1.sql" #include "udfs/worker_split_shard_replication_setup/11.1-1.sql" +#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.1-1.sql new file mode 100644 index 000000000..52174271b --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.1-1.sql @@ -0,0 +1,33 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedGlobalPid int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT 1 FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT global_pid INTO mBlockedGlobalPid + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM citus_internal_global_blocked_processes() + WHERE waiting_global_pid = mBlockedGlobalPid + ); + END; +$$ LANGUAGE plpgsql; + +REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql index d329abf29..52174271b 100644 --- a/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/latest.sql @@ -26,30 +26,7 @@ RETURNS boolean AS $$ RETURN EXISTS ( SELECT 1 FROM citus_internal_global_blocked_processes() WHERE waiting_global_pid = mBlockedGlobalPid - ) OR EXISTS ( - -- Check on the workers if any logical replication job spawned by the - -- current PID is blocked, by checking it's application name - -- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring - SELECT result FROM run_command_on_workers($two$ - SELECT blocked_activity.application_name AS blocked_application - FROM pg_catalog.pg_locks blocked_locks - JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid - JOIN pg_catalog.pg_locks blocking_locks - ON blocking_locks.locktype = blocked_locks.locktype - AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE - AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation - AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page - AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple - AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid - AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid - AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid - AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid - AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid - AND blocking_locks.pid != blocked_locks.pid - JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid - WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%' - $two$) where result LIKE 'citus_shard_move_subscription_%_' || pBlockedPid); - + ); END; $$ LANGUAGE plpgsql; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 82f41bea5..5d16b92b7 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -893,6 +893,23 @@ AssignGlobalPID(void) } +/* + * SetBackendDataGlobalPID sets the global PID. This specifically does not read + * from catalog tables, because it should be safe to run from our + * authentication hook. + */ +void +SetBackendDataGlobalPID(uint64 globalPID) +{ + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->globalPID = globalPID; + MyBackendData->distributedCommandOriginator = false; + + SpinLockRelease(&MyBackendData->mutex); +} + + /* * SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator * field on MyBackendData. diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 52f2f9c1b..6ba294604 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -58,6 +58,7 @@ extern void UnSetGlobalPID(void); extern void SetActiveMyBackend(bool value); extern void AssignDistributedTransactionId(void); extern void AssignGlobalPID(void); +extern void SetBackendDataGlobalPID(uint64 globalPID); extern uint64 GetGlobalPID(void); extern void SetBackendDataDistributedCommandOriginator(bool distributedCommandOriginator); diff --git a/src/include/distributed/repair_shards.h b/src/include/distributed/repair_shards.h index 930fb9a87..6a209a3b4 100644 --- a/src/include/distributed/repair_shards.h +++ b/src/include/distributed/repair_shards.h @@ -14,3 +14,5 @@ extern uint64 ShardListSizeInBytes(List *colocatedShardList, char *workerNodeName, uint32 workerNodePort); extern void ErrorIfMoveUnsupportedTableType(Oid relationId); +extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, + List *shardIntervalList, char *snapshotName); diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 269b3e33e..231470df0 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -77,6 +77,45 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard.t").cancel(' (1 row) +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- failure during COPY phase +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +while executing command on localhost:xxxxx +-- cancelation during COPY phase +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- failure when enabling the subscriptions +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +-- failure when enabling the subscriptions +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -- failure on polling subscription state @@ -96,25 +135,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscrip (1 row) -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: canceling statement due to user request --- failure on getting subscriber state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx --- cancellation on getting subscriber state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -- failure on polling last write-ahead log location reported to origin WAL sender diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index 6440c172b..d8e91aa1e 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -13,6 +13,7 @@ SELECT create_distributed_table('dist', 'id'); (1 row) +INSERT INTO dist SELECT generate_series(1, 100); SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0); NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? @@ -46,7 +47,14 @@ SELECT count(*) from pg_replication_slots; 0 (1 row) +SELECT count(*) FROM dist; + count +--------------------------------------------------------------------- + 100 +(1 row) + \c - - - :worker_1_port +SET search_path TO logical_replication; SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- @@ -65,7 +73,14 @@ SELECT count(*) from pg_replication_slots; 1 (1 row) +SELECT count(*) FROM dist; + count +--------------------------------------------------------------------- + 100 +(1 row) + \c - - - :master_port +SET search_path TO logical_replication; select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); citus_move_shard_placement --------------------------------------------------------------------- @@ -96,7 +111,14 @@ SELECT count(*) from pg_replication_slots; 0 (1 row) +SELECT count(*) from dist; + count +--------------------------------------------------------------------- + 100 +(1 row) + \c - - - :worker_1_port +SET search_path TO logical_replication; SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- @@ -115,7 +137,14 @@ SELECT count(*) from pg_replication_slots; 0 (1 row) +SELECT count(*) from dist; + count +--------------------------------------------------------------------- + 100 +(1 row) + \c - - - :worker_2_port +SET search_path TO logical_replication; SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- @@ -134,7 +163,13 @@ SELECT count(*) from pg_replication_slots; 0 (1 row) +SELECT count(*) from dist; + count +--------------------------------------------------------------------- + 100 +(1 row) + \c - - - :master_port -SET search_path TO public; +SET search_path TO logical_replication; +SET client_min_messages TO WARNING; DROP SCHEMA logical_replication CASCADE; -NOTICE: drop cascades to table logical_replication.dist diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 714c7b3b1..8028ccb24 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -47,6 +47,22 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard.t").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- failure during COPY phase +SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + +-- cancelation during COPY phase +SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + +-- failure when enabling the subscriptions +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").kill()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + +-- failure when enabling the subscriptions +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + -- failure on polling subscription state SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); @@ -55,14 +71,6 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); --- failure on getting subscriber state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").kill()'); -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); - --- cancellation on getting subscriber state -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").cancel(' || :pid || ')'); -SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); - -- failure on polling last write-ahead log location reported to origin WAL sender SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index e379edf36..103295f5c 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -13,6 +13,7 @@ CREATE TABLE dist ( SELECT oid AS postgres_oid FROM pg_roles where rolname = 'postgres' \gset SELECT create_distributed_table('dist', 'id'); +INSERT INTO dist SELECT generate_series(1, 100); SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0); @@ -28,14 +29,18 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :conn SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; +SELECT count(*) FROM dist; \c - - - :worker_1_port +SET search_path TO logical_replication; SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; +SELECT count(*) FROM dist; \c - - - :master_port +SET search_path TO logical_replication; select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); @@ -44,18 +49,27 @@ SELECT citus_remove_node('localhost', :master_port); SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; +SELECT count(*) from dist; \c - - - :worker_1_port +SET search_path TO logical_replication; + SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; +SELECT count(*) from dist; + \c - - - :worker_2_port +SET search_path TO logical_replication; SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; +SELECT count(*) from dist; \c - - - :master_port -SET search_path TO public; +SET search_path TO logical_replication; + +SET client_min_messages TO WARNING; DROP SCHEMA logical_replication CASCADE;