mirror of https://github.com/citusdata/citus.git
Use faster custom copy logic for non-blocking shard moves (#6119)
DESCRIPTION: Use faster custom copy logic for non-blocking shard moves Non-blocking shard moves consist of two main phases: 1. Initial data copy 2. Catchup phase This changes the first of these phases significantly. Previously we used the copy logic provided by postgres subscriptions. This meant we didn't have to implement it ourselves, but it came with the downside of little control. When implementing shard splits we needed more control to even make it work, so we implemented our own logic for copying data between nodes. This PR starts using that logic for non-blocking shard moves. Doing so has four main advantages: 1. It uses COPY in binary format when possible, which is cheaper to encode and decode. Furthermore it very often results in less data that needs to be sent over the network. 2. It allows us to create the primary key (or other replica identity) after doing the initial data copy. This should give some speed up over the total run, because creating an index is bulk is much faster than incrementally building it. 3. It doesn't require a replication slot per parallel copy. Increasing the maximum number of replication slots uses resources in postgres, even if they are not used. So reducing the number of replication slots that shard moves need is nice. 4. Logical replication table_sync workers are slow to start up, so if lots of shards need to be copied that can make it quite slow. This can happen easily when combining Postgres partitioning with Citus.pull/6142/head
parent
cc2afb4b63
commit
dd548ee3c7
|
@ -25,6 +25,7 @@
|
|||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/shard_cleaner.h"
|
||||
|
@ -1207,39 +1208,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
|
|||
tableOwner, ddlCommandList);
|
||||
}
|
||||
|
||||
int taskId = 0;
|
||||
List *copyTaskList = NIL;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
/*
|
||||
* Skip copying data for partitioned tables, because they contain no
|
||||
* data themselves. Their partitions do contain data, but those are
|
||||
* different colocated shards that will be copied seperately.
|
||||
*/
|
||||
if (!PartitionedTable(shardInterval->relationId))
|
||||
{
|
||||
char *copyCommand = CreateShardCopyCommand(
|
||||
shardInterval, targetNode);
|
||||
|
||||
Task *copyTask = CreateBasicTask(
|
||||
INVALID_JOB_ID,
|
||||
taskId,
|
||||
READ_TASK,
|
||||
copyCommand);
|
||||
|
||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||
SetPlacementNodeMetadata(taskPlacement, sourceNode);
|
||||
|
||||
copyTask->taskPlacementList = list_make1(taskPlacement);
|
||||
|
||||
copyTaskList = lappend(copyTaskList, copyTask);
|
||||
taskId++;
|
||||
}
|
||||
}
|
||||
|
||||
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
|
||||
MaxAdaptiveExecutorPoolSize,
|
||||
NULL /* jobIdList (ignored by API implementation) */);
|
||||
CopyShardsToNode(sourceNode, targetNode, shardIntervalList, NULL);
|
||||
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
|
@ -1309,6 +1278,85 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyShardsToNode copies the list of shards from the source to the target.
|
||||
* When snapshotName is not NULL it will do the COPY using this snapshot name.
|
||||
*/
|
||||
void
|
||||
CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList,
|
||||
char *snapshotName)
|
||||
{
|
||||
int taskId = 0;
|
||||
List *copyTaskList = NIL;
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
/*
|
||||
* Skip copying data for partitioned tables, because they contain no
|
||||
* data themselves. Their partitions do contain data, but those are
|
||||
* different colocated shards that will be copied seperately.
|
||||
*/
|
||||
if (PartitionedTable(shardInterval->relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *ddlCommandList = NIL;
|
||||
|
||||
/*
|
||||
* This uses repeatable read because we want to read the table in
|
||||
* the state exactly as it was when the snapshot was created. This
|
||||
* is needed when using this code for the initial data copy when
|
||||
* using logical replication. The logical replication catchup might
|
||||
* fail otherwise, because some of the updates that it needs to do
|
||||
* have already been applied on the target.
|
||||
*/
|
||||
StringInfo beginTransaction = makeStringInfo();
|
||||
appendStringInfo(beginTransaction,
|
||||
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
|
||||
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
|
||||
|
||||
/* Set snapshot for non-blocking shard split. */
|
||||
if (snapshotName != NULL)
|
||||
{
|
||||
StringInfo snapShotString = makeStringInfo();
|
||||
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
|
||||
quote_literal_cstr(
|
||||
snapshotName));
|
||||
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
|
||||
}
|
||||
|
||||
char *copyCommand = CreateShardCopyCommand(
|
||||
shardInterval, targetNode);
|
||||
|
||||
ddlCommandList = lappend(ddlCommandList, copyCommand);
|
||||
|
||||
StringInfo commitCommand = makeStringInfo();
|
||||
appendStringInfo(commitCommand, "COMMIT;");
|
||||
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
|
||||
|
||||
Task *task = CitusMakeNode(Task);
|
||||
task->jobId = shardInterval->shardId;
|
||||
task->taskId = taskId;
|
||||
task->taskType = READ_TASK;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
SetTaskQueryStringList(task, ddlCommandList);
|
||||
|
||||
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
|
||||
SetPlacementNodeMetadata(taskPlacement, sourceNode);
|
||||
|
||||
task->taskPlacementList = list_make1(taskPlacement);
|
||||
|
||||
copyTaskList = lappend(copyTaskList, task);
|
||||
taskId++;
|
||||
}
|
||||
|
||||
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
|
||||
MaxAdaptiveExecutorPoolSize,
|
||||
NULL /* jobIdList (ignored by API implementation) */);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardCopyCommand constructs the command to copy a shard to another
|
||||
* worker node. This command needs to be run on the node wher you want to copy
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/repair_shards.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
@ -119,16 +120,16 @@ static void DropShardMoveSubscriptions(MultiConnection *connection,
|
|||
Bitmapset *tableOwnerIds);
|
||||
static void CreateShardMovePublications(MultiConnection *connection, List *shardList,
|
||||
Bitmapset *tableOwnerIds);
|
||||
static void CreateShardMoveSubscriptions(MultiConnection *connection,
|
||||
char *sourceNodeName,
|
||||
int sourceNodePort, char *userName,
|
||||
char *databaseName,
|
||||
static MultiConnection * GetReplicationConnection(char *nodeName, int nodePort);
|
||||
static char * CreateShardMoveSubscriptions(MultiConnection *sourceConnection,
|
||||
MultiConnection *targetConnection,
|
||||
MultiConnection *sourceReplicationConnection,
|
||||
char *databaseName,
|
||||
Bitmapset *tableOwnerIds);
|
||||
static void EnableShardMoveSubscriptions(MultiConnection *targetConnection,
|
||||
Bitmapset *tableOwnerIds);
|
||||
static char * escape_param_str(const char *str);
|
||||
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
|
||||
|
||||
static uint64 TotalRelationSizeForSubscription(MultiConnection *connection,
|
||||
char *command);
|
||||
static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection,
|
||||
Bitmapset *tableOwnerIds,
|
||||
char *operationPrefix);
|
||||
|
@ -183,6 +184,9 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
|
|||
GetNodeUserDatabaseConnection(connectionFlags, targetNodeName, targetNodePort,
|
||||
superUser, databaseName);
|
||||
|
||||
MultiConnection *sourceReplicationConnection =
|
||||
GetReplicationConnection(sourceNodeName, sourceNodePort);
|
||||
|
||||
/*
|
||||
* Operations on publications and subscriptions cannot run in a transaction
|
||||
* block. Claim the connections exclusively to ensure they do not get used
|
||||
|
@ -193,29 +197,57 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
|
|||
|
||||
PG_TRY();
|
||||
{
|
||||
/*
|
||||
* We have to create the primary key (or any other replica identity)
|
||||
* before the initial COPY is done. This is necessary because as soon
|
||||
* as the COPY command finishes, the update/delete operations that
|
||||
* are queued will be replicated. And, if the replica identity does not
|
||||
* exist on the target, the replication would fail.
|
||||
*/
|
||||
CreateReplicaIdentity(shardList, targetNodeName, targetNodePort);
|
||||
|
||||
/* set up the publication on the source and subscription on the target */
|
||||
CreateShardMovePublications(sourceConnection, replicationSubscriptionList,
|
||||
tableOwnerIds);
|
||||
CreateShardMoveSubscriptions(targetConnection, sourceNodeName, sourceNodePort,
|
||||
superUser, databaseName, tableOwnerIds);
|
||||
char *snapshot = CreateShardMoveSubscriptions(
|
||||
sourceConnection,
|
||||
targetConnection,
|
||||
sourceReplicationConnection,
|
||||
databaseName,
|
||||
tableOwnerIds);
|
||||
|
||||
/* only useful for isolation testing, see the function comment for the details */
|
||||
ConflictOnlyWithIsolationTesting();
|
||||
|
||||
WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort);
|
||||
WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort);
|
||||
CopyShardsToNode(sourceNode, targetNode, shardList, snapshot);
|
||||
|
||||
/*
|
||||
* Logical replication starts with copying the existing data for each table in
|
||||
* the publication. During the copy operation the state of the associated relation
|
||||
* subscription is not ready. There is no point of locking the shards before the
|
||||
* subscriptions for each relation becomes ready, so wait for it.
|
||||
* We can close this connection now, because we're done copying the
|
||||
* data and thus don't need access to the snapshot anymore. The
|
||||
* replication slot will still be at the same LSN, because the
|
||||
* subscriptions have not been enabled yet.
|
||||
*/
|
||||
CloseConnection(sourceReplicationConnection);
|
||||
|
||||
/*
|
||||
* We have to create the primary key (or any other replica identity)
|
||||
* before the update/delete operations that are queued will be
|
||||
* replicated. Because if the replica identity does not exist on the
|
||||
* target, the replication would fail.
|
||||
*
|
||||
* So we it right after the initial data COPY, but before enabling the
|
||||
* susbcriptions. We do it at this latest possible moment, because its
|
||||
* much cheaper to build an index at once than to create it
|
||||
* incrementally. So this way we create the primary key index in one go
|
||||
* for all data from the initial COPY.
|
||||
*/
|
||||
CreateReplicaIdentity(shardList, targetNodeName, targetNodePort);
|
||||
|
||||
/* Start applying the changes from the replication slots to catch up. */
|
||||
EnableShardMoveSubscriptions(targetConnection, tableOwnerIds);
|
||||
|
||||
/*
|
||||
* The following check is a leftover from when used subscriptions with
|
||||
* copy_data=true. It's probably not really necessary anymore, but it
|
||||
* seemed like a nice check to keep. At least for debugging issues it
|
||||
* seems nice to report differences between the subscription never
|
||||
* becoming ready and the subscriber not applying WAL. It's also not
|
||||
* entirely clear if the catchup check handles the case correctly where
|
||||
* the subscription is not in the ready state yet, because so far it
|
||||
* never had to.
|
||||
*/
|
||||
WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX);
|
||||
|
@ -1129,26 +1161,12 @@ ShardMovePublicationName(Oid ownerId)
|
|||
|
||||
/*
|
||||
* ShardSubscriptionName returns the name of the subscription for the given
|
||||
* owner. If we're running the isolation tester the function also appends the
|
||||
* process id normal subscription name.
|
||||
*
|
||||
* When it contains the PID of the current process it is used for block detection
|
||||
* by the isolation test runner, since the replication process on the publishing
|
||||
* node uses the name of the subscription as the application_name of the SQL session.
|
||||
* This PID is then extracted from the application_name to find out which PID on the
|
||||
* coordinator is blocked by the blocked replication process.
|
||||
* owner.
|
||||
*/
|
||||
char *
|
||||
ShardSubscriptionName(Oid ownerId, char *operationPrefix)
|
||||
{
|
||||
if (RunningUnderIsolationTest)
|
||||
{
|
||||
return psprintf("%s%i_%i", operationPrefix, ownerId, MyProcPid);
|
||||
}
|
||||
else
|
||||
{
|
||||
return psprintf("%s%i", operationPrefix, ownerId);
|
||||
}
|
||||
return psprintf("%s%i", operationPrefix, ownerId);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1445,6 +1463,63 @@ CreateShardMovePublications(MultiConnection *connection, List *shardList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetReplicationConnection opens a new replication connection to this node.
|
||||
* This connection can be used to send replication commands, such as
|
||||
* CREATE_REPLICATION_SLOT.
|
||||
*/
|
||||
static MultiConnection *
|
||||
GetReplicationConnection(char *nodeName, int nodePort)
|
||||
{
|
||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||
connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM;
|
||||
|
||||
MultiConnection *connection = GetNodeUserDatabaseConnection(
|
||||
connectionFlags,
|
||||
nodeName,
|
||||
nodePort,
|
||||
CitusExtensionOwnerName(),
|
||||
get_database_name(MyDatabaseId));
|
||||
ClaimConnectionExclusively(connection);
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateReplicationSlot creates a replication slot with the given slot name
|
||||
* over the given connection. The given connection should be a replication
|
||||
* connection. This function returns the name of the snapshot that is used for
|
||||
* this replication slot. When using this snapshot name for other transactions
|
||||
* you need to keep the given replication connection open until you have used
|
||||
* the snapshot name.
|
||||
*/
|
||||
static char *
|
||||
CreateReplicationSlot(MultiConnection *connection, char *slotname)
|
||||
{
|
||||
StringInfo createReplicationSlotCommand = makeStringInfo();
|
||||
appendStringInfo(createReplicationSlotCommand,
|
||||
"CREATE_REPLICATION_SLOT %s LOGICAL pgoutput EXPORT_SNAPSHOT;",
|
||||
quote_identifier(slotname));
|
||||
|
||||
PGresult *result = NULL;
|
||||
int response = ExecuteOptionalRemoteCommand(connection,
|
||||
createReplicationSlotCommand->data,
|
||||
&result);
|
||||
|
||||
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
/*'snapshot_name' is second column where index starts from zero.
|
||||
* We're using the pstrdup to copy the data into the current memory context */
|
||||
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
return snapShotName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateShardMoveSubscriptions creates the subscriptions used for shard moves
|
||||
* over the given connection. One subscription is created for each of the table
|
||||
|
@ -1454,18 +1529,28 @@ CreateShardMovePublications(MultiConnection *connection, List *shardList,
|
|||
* names directly (rather than looking up any relevant pg_dist_poolinfo rows),
|
||||
* all such connections remain direct and will not route through any configured
|
||||
* poolers.
|
||||
*
|
||||
* The subscriptions created by this function are created in the disabled
|
||||
* state. This is done so a data copy can be done manually afterwards. To
|
||||
* enable the subscriptions you can use EnableShardMoveSubscriptions().
|
||||
*
|
||||
* This function returns the snapshot name of the replication slots that are
|
||||
* used by the subscription. When using this snapshot name for other
|
||||
* transactions you need to keep the given replication connection open until
|
||||
* you have used the snapshot name.
|
||||
*/
|
||||
static void
|
||||
CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
|
||||
int sourceNodePort, char *userName, char *databaseName,
|
||||
static char *
|
||||
CreateShardMoveSubscriptions(MultiConnection *sourceConnection,
|
||||
MultiConnection *targetConnection,
|
||||
MultiConnection *sourceReplicationConnection,
|
||||
char *databaseName,
|
||||
Bitmapset *tableOwnerIds)
|
||||
{
|
||||
int ownerId = -1;
|
||||
char *firstReplicationSlot = NULL;
|
||||
char *snapshot = NULL;
|
||||
while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0)
|
||||
{
|
||||
StringInfo createSubscriptionCommand = makeStringInfo();
|
||||
StringInfo conninfo = makeStringInfo();
|
||||
|
||||
/*
|
||||
* The CREATE USER command should not propagate, so we temporarily
|
||||
* disable DDL propagation.
|
||||
|
@ -1475,7 +1560,9 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
|
|||
* This prevents permission escalations.
|
||||
*/
|
||||
SendCommandListToWorkerOutsideTransaction(
|
||||
connection->hostname, connection->port, connection->user,
|
||||
targetConnection->hostname,
|
||||
targetConnection->port,
|
||||
targetConnection->user,
|
||||
list_make2(
|
||||
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
|
||||
psprintf(
|
||||
|
@ -1484,23 +1571,45 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
|
|||
GetUserNameFromId(ownerId, false)
|
||||
)));
|
||||
|
||||
if (!firstReplicationSlot)
|
||||
{
|
||||
firstReplicationSlot = ShardSubscriptionName(ownerId,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX);
|
||||
snapshot = CreateReplicationSlot(
|
||||
sourceReplicationConnection,
|
||||
firstReplicationSlot);
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteCriticalRemoteCommand(
|
||||
sourceConnection,
|
||||
psprintf("SELECT pg_catalog.pg_copy_logical_replication_slot(%s, %s)",
|
||||
quote_literal_cstr(firstReplicationSlot),
|
||||
quote_literal_cstr(ShardSubscriptionName(ownerId,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX))));
|
||||
}
|
||||
|
||||
StringInfo conninfo = makeStringInfo();
|
||||
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
|
||||
"connect_timeout=20",
|
||||
escape_param_str(sourceNodeName), sourceNodePort,
|
||||
escape_param_str(userName), escape_param_str(databaseName));
|
||||
escape_param_str(sourceConnection->hostname),
|
||||
sourceConnection->port,
|
||||
escape_param_str(sourceConnection->user), escape_param_str(
|
||||
databaseName));
|
||||
|
||||
StringInfo createSubscriptionCommand = makeStringInfo();
|
||||
appendStringInfo(createSubscriptionCommand,
|
||||
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
|
||||
"WITH (citus_use_authinfo=true, enabled=false)",
|
||||
"WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false)",
|
||||
quote_identifier(ShardSubscriptionName(ownerId,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX)),
|
||||
quote_literal_cstr(conninfo->data),
|
||||
quote_identifier(ShardMovePublicationName(ownerId)));
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data);
|
||||
ExecuteCriticalRemoteCommand(targetConnection, createSubscriptionCommand->data);
|
||||
pfree(createSubscriptionCommand->data);
|
||||
pfree(createSubscriptionCommand);
|
||||
ExecuteCriticalRemoteCommand(connection, psprintf(
|
||||
ExecuteCriticalRemoteCommand(targetConnection, psprintf(
|
||||
"ALTER SUBSCRIPTION %s OWNER TO %s",
|
||||
ShardSubscriptionName(ownerId,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX),
|
||||
|
@ -1513,15 +1622,31 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
|
|||
* disable DDL propagation.
|
||||
*/
|
||||
SendCommandListToWorkerOutsideTransaction(
|
||||
connection->hostname, connection->port, connection->user,
|
||||
targetConnection->hostname,
|
||||
targetConnection->port,
|
||||
targetConnection->user,
|
||||
list_make2(
|
||||
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
|
||||
psprintf(
|
||||
"ALTER ROLE %s NOSUPERUSER",
|
||||
ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)
|
||||
)));
|
||||
}
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, psprintf(
|
||||
|
||||
/*
|
||||
* EnableShardMoveSubscriptions enables all the the shard move subscriptions
|
||||
* that belong to the given table owners.
|
||||
*/
|
||||
static void
|
||||
EnableShardMoveSubscriptions(MultiConnection *targetConnection, Bitmapset *tableOwnerIds)
|
||||
{
|
||||
int ownerId = -1;
|
||||
while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0)
|
||||
{
|
||||
ExecuteCriticalRemoteCommand(targetConnection, psprintf(
|
||||
"ALTER SUBSCRIPTION %s ENABLE",
|
||||
ShardSubscriptionName(ownerId,
|
||||
SHARD_MOVE_SUBSCRIPTION_PREFIX)
|
||||
|
@ -1622,25 +1747,21 @@ GetRemoteLSN(MultiConnection *connection, char *command)
|
|||
|
||||
|
||||
/*
|
||||
* WaitForRelationSubscriptionsBecomeReady waits until the states of the subsriptions
|
||||
* for each shard becomes ready. This indicates that the initial COPY is finished
|
||||
* on the shards.
|
||||
* WaitForRelationSubscriptionsBecomeReady waits until the states of the
|
||||
* subsriptions for each shard becomes ready. This should happen very quickly,
|
||||
* because we don't use the COPY logic from the subscriptions. So all that's
|
||||
* needed is to start reading from the replication slot.
|
||||
*
|
||||
* The function errors if the total size of the relations that belong to the subscription
|
||||
* on the target node doesn't change within LogicalReplicationErrorTimeout. The
|
||||
* function also reports its progress in every logicalReplicationProgressReportTimeout.
|
||||
* The function errors if the subscriptions don't become ready within
|
||||
* LogicalReplicationErrorTimeout. The function also reports its progress in
|
||||
* every logicalReplicationProgressReportTimeout.
|
||||
*/
|
||||
void
|
||||
WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
|
||||
Bitmapset *tableOwnerIds, char *operationPrefix)
|
||||
{
|
||||
uint64 previousTotalRelationSizeForSubscription = 0;
|
||||
TimestampTz previousSizeChangeTime = GetCurrentTimestamp();
|
||||
|
||||
/* report in the first iteration as well */
|
||||
TimestampTz previousReportTime = 0;
|
||||
|
||||
uint64 previousReportedTotalSize = 0;
|
||||
TimestampTz previousReportTime = GetCurrentTimestamp();
|
||||
|
||||
|
||||
/*
|
||||
|
@ -1671,94 +1792,37 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
|
|||
|
||||
break;
|
||||
}
|
||||
char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds,
|
||||
operationPrefix);
|
||||
|
||||
/* Get the current total size of tables belonging to the subscriber */
|
||||
uint64 currentTotalRelationSize =
|
||||
TotalRelationSizeForSubscription(targetConnection, psprintf(
|
||||
"SELECT sum(pg_total_relation_size(srrelid)) "
|
||||
"FROM pg_subscription_rel, pg_stat_subscription "
|
||||
"WHERE srsubid = subid AND subname IN %s",
|
||||
subscriptionValueList
|
||||
)
|
||||
);
|
||||
|
||||
/*
|
||||
* The size has not been changed within the last iteration. If necessary
|
||||
* log a messages. If size does not change over a given replication timeout
|
||||
* error out.
|
||||
*/
|
||||
if (currentTotalRelationSize == previousTotalRelationSizeForSubscription)
|
||||
/* log the progress if necessary */
|
||||
if (TimestampDifferenceExceeds(previousReportTime,
|
||||
GetCurrentTimestamp(),
|
||||
logicalReplicationProgressReportTimeout))
|
||||
{
|
||||
/* log the progress if necessary */
|
||||
if (TimestampDifferenceExceeds(previousReportTime,
|
||||
GetCurrentTimestamp(),
|
||||
logicalReplicationProgressReportTimeout))
|
||||
{
|
||||
ereport(LOG, (errmsg("Subscription size has been staying same for the "
|
||||
"last %d msec",
|
||||
logicalReplicationProgressReportTimeout)));
|
||||
ereport(LOG, (errmsg("Not all subscriptions for the shard move are "
|
||||
"READY yet")));
|
||||
|
||||
previousReportTime = GetCurrentTimestamp();
|
||||
}
|
||||
|
||||
/* Error out if the size does not change within the given time threshold */
|
||||
if (TimestampDifferenceExceeds(previousSizeChangeTime,
|
||||
GetCurrentTimestamp(),
|
||||
LogicalReplicationTimeout))
|
||||
{
|
||||
ereport(ERROR, (errmsg("The logical replication waiting timeout "
|
||||
"%d msec exceeded",
|
||||
LogicalReplicationTimeout),
|
||||
errdetail("The subscribed relations haven't become "
|
||||
"ready on the target node %s:%d",
|
||||
targetConnection->hostname,
|
||||
targetConnection->port),
|
||||
errhint(
|
||||
"There might have occurred problems on the target "
|
||||
"node. If not, consider using higher values for "
|
||||
"citus.logical_replication_timeout")));
|
||||
}
|
||||
previousReportTime = GetCurrentTimestamp();
|
||||
}
|
||||
else
|
||||
|
||||
/* Error out if the size does not change within the given time threshold */
|
||||
if (TimestampDifferenceExceeds(previousSizeChangeTime,
|
||||
GetCurrentTimestamp(),
|
||||
LogicalReplicationTimeout))
|
||||
{
|
||||
/* first, record that there is some change in the size */
|
||||
previousSizeChangeTime = GetCurrentTimestamp();
|
||||
|
||||
/*
|
||||
* Subscription size may decrease or increase.
|
||||
*
|
||||
* Subscription size may decrease in case of VACUUM operation, which
|
||||
* may get fired with autovacuum, on it.
|
||||
*
|
||||
* Increase of the relation's size belonging to subscriber means a successful
|
||||
* copy from publisher to subscriber.
|
||||
*/
|
||||
bool sizeIncreased = currentTotalRelationSize >
|
||||
previousTotalRelationSizeForSubscription;
|
||||
|
||||
if (TimestampDifferenceExceeds(previousReportTime,
|
||||
GetCurrentTimestamp(),
|
||||
logicalReplicationProgressReportTimeout))
|
||||
{
|
||||
ereport(LOG, ((errmsg("The total size of the relations belonging to "
|
||||
"subscriptions %s from %ld to %ld at %s "
|
||||
"on the target node %s:%d",
|
||||
sizeIncreased ? "increased" : "decreased",
|
||||
previousReportedTotalSize,
|
||||
currentTotalRelationSize,
|
||||
timestamptz_to_str(previousSizeChangeTime),
|
||||
ereport(ERROR, (errmsg("The logical replication waiting timeout "
|
||||
"of %d msec is exceeded",
|
||||
LogicalReplicationTimeout),
|
||||
errdetail("The subscribed relations haven't become "
|
||||
"ready on the target node %s:%d",
|
||||
targetConnection->hostname,
|
||||
targetConnection->port))));
|
||||
|
||||
previousReportedTotalSize = currentTotalRelationSize;
|
||||
previousReportTime = GetCurrentTimestamp();
|
||||
}
|
||||
targetConnection->port),
|
||||
errhint(
|
||||
"Logical replication has failed to initialize "
|
||||
"on the target node. If not, consider using "
|
||||
"higher values for "
|
||||
"citus.logical_replication_timeout")));
|
||||
}
|
||||
|
||||
previousTotalRelationSizeForSubscription = currentTotalRelationSize;
|
||||
|
||||
/* wait for 1 second (1000 miliseconds) and try again */
|
||||
WaitForMiliseconds(1000);
|
||||
|
||||
|
@ -1769,62 +1833,6 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* TotalRelationSizeForSubscription is a helper function which returns the total
|
||||
* size of the shards that are replicated via the subscription. Note that the
|
||||
* function returns the total size including indexes.
|
||||
*/
|
||||
static uint64
|
||||
TotalRelationSizeForSubscription(MultiConnection *connection, char *command)
|
||||
{
|
||||
bool raiseInterrupts = false;
|
||||
uint64 remoteTotalSize = 0;
|
||||
|
||||
int querySent = SendRemoteCommand(connection, command);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, ERROR);
|
||||
}
|
||||
|
||||
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, ERROR);
|
||||
}
|
||||
|
||||
int rowCount = PQntuples(result);
|
||||
if (rowCount != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected number of rows returned by: %s",
|
||||
command)));
|
||||
}
|
||||
|
||||
int colCount = PQnfields(result);
|
||||
if (colCount != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected number of columns returned by: %s",
|
||||
command)));
|
||||
}
|
||||
|
||||
if (!PQgetisnull(result, 0, 0))
|
||||
{
|
||||
char *resultString = PQgetvalue(result, 0, 0);
|
||||
|
||||
remoteTotalSize = SafeStringToUint64(resultString);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected value returned by: %s",
|
||||
command)));
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
|
||||
return remoteTotalSize;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardSubscriptionNamesValueList returns a SQL value list containing the
|
||||
* subscription names for all of the given table owner ids. This value list can
|
||||
|
@ -1997,7 +2005,7 @@ WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr
|
|||
LogicalReplicationTimeout))
|
||||
{
|
||||
ereport(ERROR, (errmsg("The logical replication waiting timeout "
|
||||
"%d msec exceeded",
|
||||
"of %d msec is exceeded",
|
||||
LogicalReplicationTimeout),
|
||||
errdetail("The LSN on the target subscription hasn't "
|
||||
"caught up ready on the target node %s:%d",
|
||||
|
|
|
@ -2515,10 +2515,26 @@ CitusAuthHook(Port *port, int status)
|
|||
*
|
||||
* We do this so that this backend gets the chance to show
|
||||
* up in citus_lock_waits.
|
||||
*
|
||||
* We cannot assign a new global PID yet here, because that
|
||||
* would require reading from catalogs, but that's not allowed
|
||||
* this early in the connection startup (because no database
|
||||
* has been assigned yet).
|
||||
*/
|
||||
InitializeBackendData();
|
||||
SetBackendDataDistributedCommandOriginator(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We set the global PID in the backend data here already to be able to
|
||||
* do blocked process detection on connections that are opened over a
|
||||
* replication connection. A replication connection backend will never
|
||||
* call StartupCitusBackend, which normally sets up the global PID.
|
||||
*/
|
||||
InitializeBackendData();
|
||||
SetBackendDataGlobalPID(gpid);
|
||||
}
|
||||
|
||||
/* let other authentication hooks to kick in first */
|
||||
if (original_client_auth_hook)
|
||||
|
|
|
@ -71,3 +71,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
|||
#include "udfs/worker_split_copy/11.1-1.sql"
|
||||
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
|
||||
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
||||
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
||||
|
|
33
src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.1-1.sql
generated
Normal file
33
src/backend/distributed/sql/udfs/citus_isolation_test_session_is_blocked/11.1-1.sql
generated
Normal file
|
@ -0,0 +1,33 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
|
||||
RETURNS boolean AS $$
|
||||
DECLARE
|
||||
mBlockedGlobalPid int8;
|
||||
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
|
||||
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
|
||||
BEGIN
|
||||
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
|
||||
RETURN true;
|
||||
END IF;
|
||||
|
||||
-- pg says we're not blocked locally; check whether we're blocked globally.
|
||||
-- Note that worker process may be blocked or waiting for a lock. So we need to
|
||||
-- get transaction number for both of them. Following IF provides the transaction
|
||||
-- number when the worker process waiting for other session.
|
||||
IF EXISTS (SELECT 1 FROM get_global_active_transactions()
|
||||
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
|
||||
SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
|
||||
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
|
||||
ELSE
|
||||
-- Check whether transactions initiated from the coordinator get locked
|
||||
SELECT global_pid INTO mBlockedGlobalPid
|
||||
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
||||
END IF;
|
||||
|
||||
RETURN EXISTS (
|
||||
SELECT 1 FROM citus_internal_global_blocked_processes()
|
||||
WHERE waiting_global_pid = mBlockedGlobalPid
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;
|
|
@ -26,30 +26,7 @@ RETURNS boolean AS $$
|
|||
RETURN EXISTS (
|
||||
SELECT 1 FROM citus_internal_global_blocked_processes()
|
||||
WHERE waiting_global_pid = mBlockedGlobalPid
|
||||
) OR EXISTS (
|
||||
-- Check on the workers if any logical replication job spawned by the
|
||||
-- current PID is blocked, by checking it's application name
|
||||
-- Query is heavily based on: https://wiki.postgresql.org/wiki/Lock_Monitoring
|
||||
SELECT result FROM run_command_on_workers($two$
|
||||
SELECT blocked_activity.application_name AS blocked_application
|
||||
FROM pg_catalog.pg_locks blocked_locks
|
||||
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
|
||||
JOIN pg_catalog.pg_locks blocking_locks
|
||||
ON blocking_locks.locktype = blocked_locks.locktype
|
||||
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
|
||||
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
|
||||
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
|
||||
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
|
||||
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
|
||||
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
|
||||
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
|
||||
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
|
||||
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
|
||||
AND blocking_locks.pid != blocked_locks.pid
|
||||
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
|
||||
WHERE NOT blocked_locks.GRANTED AND blocked_activity.application_name LIKE 'citus_shard_move_subscription_%'
|
||||
$two$) where result LIKE 'citus_shard_move_subscription_%_' || pBlockedPid);
|
||||
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
|
|
@ -893,6 +893,23 @@ AssignGlobalPID(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetBackendDataGlobalPID sets the global PID. This specifically does not read
|
||||
* from catalog tables, because it should be safe to run from our
|
||||
* authentication hook.
|
||||
*/
|
||||
void
|
||||
SetBackendDataGlobalPID(uint64 globalPID)
|
||||
{
|
||||
SpinLockAcquire(&MyBackendData->mutex);
|
||||
|
||||
MyBackendData->globalPID = globalPID;
|
||||
MyBackendData->distributedCommandOriginator = false;
|
||||
|
||||
SpinLockRelease(&MyBackendData->mutex);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator
|
||||
* field on MyBackendData.
|
||||
|
|
|
@ -58,6 +58,7 @@ extern void UnSetGlobalPID(void);
|
|||
extern void SetActiveMyBackend(bool value);
|
||||
extern void AssignDistributedTransactionId(void);
|
||||
extern void AssignGlobalPID(void);
|
||||
extern void SetBackendDataGlobalPID(uint64 globalPID);
|
||||
extern uint64 GetGlobalPID(void);
|
||||
extern void SetBackendDataDistributedCommandOriginator(bool
|
||||
distributedCommandOriginator);
|
||||
|
|
|
@ -14,3 +14,5 @@
|
|||
extern uint64 ShardListSizeInBytes(List *colocatedShardList,
|
||||
char *workerNodeName, uint32 workerNodePort);
|
||||
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
|
||||
extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode,
|
||||
List *shardIntervalList, char *snapshotName);
|
||||
|
|
|
@ -77,6 +77,45 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard.t").cancel('
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure during COPY phase
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
while executing command on localhost:xxxxx
|
||||
-- cancelation during COPY phase
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure when enabling the subscriptions
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
-- failure when enabling the subscriptions
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure on polling subscription state
|
||||
|
@ -96,25 +135,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscrip
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure on getting subscriber state
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").kill()');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: connection not open
|
||||
CONTEXT: while executing command on localhost:xxxxx
|
||||
-- cancellation on getting subscriber state
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").cancel(' || :pid || ')');
|
||||
mitmproxy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
ERROR: canceling statement due to user request
|
||||
-- failure on polling last write-ahead log location reported to origin WAL sender
|
||||
|
|
|
@ -13,6 +13,7 @@ SELECT create_distributed_table('dist', 'id');
|
|||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist SELECT generate_series(1, 100);
|
||||
SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0);
|
||||
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
|
||||
?column?
|
||||
|
@ -46,7 +47,14 @@ SELECT count(*) from pg_replication_slots;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM dist;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO logical_replication;
|
||||
SELECT count(*) from pg_subscription;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -65,7 +73,14 @@ SELECT count(*) from pg_replication_slots;
|
|||
1
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM dist;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO logical_replication;
|
||||
select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
@ -96,7 +111,14 @@ SELECT count(*) from pg_replication_slots;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) from dist;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO logical_replication;
|
||||
SELECT count(*) from pg_subscription;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -115,7 +137,14 @@ SELECT count(*) from pg_replication_slots;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) from dist;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO logical_replication;
|
||||
SELECT count(*) from pg_subscription;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
@ -134,7 +163,13 @@ SELECT count(*) from pg_replication_slots;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) from dist;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO public;
|
||||
SET search_path TO logical_replication;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA logical_replication CASCADE;
|
||||
NOTICE: drop cascades to table logical_replication.dist
|
||||
|
|
|
@ -47,6 +47,22 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
|
|||
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE move_shard.t").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure during COPY phase
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- cancelation during COPY phase
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="COPY").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure when enabling the subscriptions
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure when enabling the subscriptions
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* ENABLE").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure on polling subscription state
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
@ -55,14 +71,6 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
|
|||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure on getting subscriber state
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- cancellation on getting subscriber state
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT sum").cancel(' || :pid || ')');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
||||
-- failure on polling last write-ahead log location reported to origin WAL sender
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").kill()');
|
||||
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
|
||||
|
|
|
@ -13,6 +13,7 @@ CREATE TABLE dist (
|
|||
SELECT oid AS postgres_oid FROM pg_roles where rolname = 'postgres' \gset
|
||||
|
||||
SELECT create_distributed_table('dist', 'id');
|
||||
INSERT INTO dist SELECT generate_series(1, 100);
|
||||
|
||||
SELECT 1 from citus_add_node('localhost', :master_port, groupId := 0);
|
||||
|
||||
|
@ -28,14 +29,18 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :conn
|
|||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
SELECT count(*) from pg_replication_slots;
|
||||
SELECT count(*) FROM dist;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO logical_replication;
|
||||
|
||||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
SELECT count(*) from pg_replication_slots;
|
||||
SELECT count(*) FROM dist;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO logical_replication;
|
||||
|
||||
select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
|
||||
|
@ -44,18 +49,27 @@ SELECT citus_remove_node('localhost', :master_port);
|
|||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
SELECT count(*) from pg_replication_slots;
|
||||
SELECT count(*) from dist;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO logical_replication;
|
||||
|
||||
|
||||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
SELECT count(*) from pg_replication_slots;
|
||||
SELECT count(*) from dist;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO logical_replication;
|
||||
|
||||
SELECT count(*) from pg_subscription;
|
||||
SELECT count(*) from pg_publication;
|
||||
SELECT count(*) from pg_replication_slots;
|
||||
SELECT count(*) from dist;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO public;
|
||||
SET search_path TO logical_replication;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA logical_replication CASCADE;
|
||||
|
|
Loading…
Reference in New Issue