Drop cleanup on failure (#6584)

DESCRIPTION: Defers cleanup after a failure in shard move or split

We don't need to do a cleanup in case of failure on a shard transfer or
split anymore. Because,
* Maintenance daemon will clean them up anyway.
* We trigger a cleanup at the beginning of shard transfers/splits. 
* The cleanup on failure logic also can fail sometimes and instead of
the original error, we throw the error that is raised by the cleanup
procedure, and it causes confusion.
pull/6586/head
Ahmet Gedemenli 2022-12-28 15:48:44 +03:00 committed by GitHub
parent cfc17385e9
commit 1b1e737e51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 347 additions and 493 deletions

View File

@ -478,73 +478,6 @@ RegisterOperationNeedingCleanup(void)
}
/*
* FinalizeOperationNeedingCleanupOnFailure is be called by an operation to signal
* completion with failure. This will trigger cleanup of appropriate resources.
*/
void
FinalizeOperationNeedingCleanupOnFailure(const char *operationName)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation();
/*
* We sort the records before cleaning up by their types, because of dependencies.
* For example, a subscription might depend on a publication.
*/
currentOperationRecordList = SortList(currentOperationRecordList,
CompareCleanupRecordsByObjectType);
int failedShardCountOnComplete = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, currentOperationRecordList)
{
if (record->policy == CLEANUP_ALWAYS || record->policy == CLEANUP_ON_FAILURE)
{
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
/*
* For all resources of CurrentOperationId that are marked as 'CLEANUP_ALWAYS' or
* 'CLEANUP_ON_FAILURE', drop resource and cleanup records.
*/
if (TryDropResourceByCleanupRecordOutsideTransaction(record,
workerNode->workerName,
workerNode->workerPort))
{
/*
* Given the operation is failing and we will abort its transaction, we cannot delete
* records in the current transaction. Delete these records outside of the
* current transaction via a localhost connection.
*/
DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId);
}
else if (record->objectType == CLEANUP_OBJECT_SHARD_PLACEMENT)
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountOnComplete++;
}
}
}
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d after "
"a %s operation failed",
failedShardCountOnComplete,
list_length(currentOperationRecordList),
operationName)));
}
}
/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal
* completion with success. This will trigger cleanup of appropriate resources.

View File

@ -596,85 +596,70 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardNode =
ActiveShardPlacementWorkerNode(firstShard->shardId);
PG_TRY();
{
ereport(LOG, (errmsg("creating child shards for %s", operationName)));
ereport(LOG, (errmsg("creating child shards for %s", operationName)));
/* Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/* Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
ConflictWithIsolationTestingBeforeCopy();
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshotName, distributionColumnOverrides);
ConflictWithIsolationTestingAfterCopy();
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
ConflictWithIsolationTestingBeforeCopy();
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshotName, distributionColumnOverrides);
ConflictWithIsolationTestingAfterCopy();
ereport(LOG, (errmsg(
"creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s",
operationName)));
ereport(LOG, (errmsg(
"creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s",
operationName)));
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
true /* includeReplicaIdentity*/);
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
true /* includeReplicaIdentity*/);
/*
* Up to this point, we performed various subtransactions that may
* require additional clean-up in case of failure. The remaining operations
* going forward are part of the same distributed transaction.
*/
/*
* Up to this point, we performed various subtransactions that may
* require additional clean-up in case of failure. The remaining operations
* going forward are part of the same distributed transaction.
*/
/*
* Delete old shards metadata and mark the shards as to be deferred drop.
* Have to do that before creating the new shard metadata,
* because there's cross-checks preventing inconsistent metadata
* (like overlapping shards).
*/
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
/*
* Delete old shards metadata and mark the shards as to be deferred drop.
* Have to do that before creating the new shard metadata,
* because there's cross-checks preventing inconsistent metadata
* (like overlapping shards).
*/
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
DropShardListMetadata(sourceColocatedShardIntervalList);
DropShardListMetadata(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/* Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/* create partitioning hierarchy, if any */
CreatePartitioningHierarchyForBlockingSplit(
shardGroupSplitIntervalListList,
workersForPlacementList);
/* create partitioning hierarchy, if any */
CreatePartitioningHierarchyForBlockingSplit(
shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* Create foreign keys if exists after the metadata changes happening in
* InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
}
PG_CATCH();
{
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}
PG_END_TRY();
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* Create foreign keys if exists after the metadata changes happening in
* InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
CitusInvalidateRelcacheByRelid(DistShardRelationId());
}
@ -1508,227 +1493,211 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceShardToCopyNode->workerPort);
/* Non-Blocking shard split workflow starts here */
PG_TRY();
ereport(LOG, (errmsg("creating child shards for %s",
operationName)));
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 2) Create dummy shards due to PG logical replication constraints.
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information.
*/
HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
CreateDummyShardsForShardGroup(
mapOfPlacementToDummyShardList,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
/*
* 3) Create replica identities on dummy shards. This needs to be done
* before the subscriptions are created. Otherwise the subscription
* creation will get stuck waiting for the publication to send a
* replica identity. Since we never actually write data into these
* dummy shards there's no point in creating these indexes after the
* initial COPY phase, like we do for the replica identities on the
* target shards.
*/
CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
ereport(LOG, (errmsg(
"creating replication artifacts (publications, replication slots, subscriptions for %s",
operationName)));
/* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash);
/* 5) Execute 'worker_split_shard_replication_setup UDF */
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList,
distributionColumnOverrides);
/*
* Subscriber flow starts from here.
* Populate 'ShardSplitSubscriberMetadata' for subscription management.
*/
List *logicalRepTargetList =
PopulateShardSplitSubscriptionsMetadataList(
publicationInfoHash, replicationSlotInfoList,
shardGroupSplitIntervalListList, workersForPlacementList);
HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash(
logicalRepTargetList);
/* Create connections to the target nodes */
CreateGroupedLogicalRepTargetsConnections(
groupedLogicalRepTargetsHash,
superUser, databaseName);
char *logicalRepDecoderPlugin = "citus";
/*
* 6) Create replication slots and keep track of their snapshot.
*/
char *snapshot = CreateReplicationSlots(
sourceConnection,
sourceReplicationConnection,
logicalRepTargetList,
logicalRepDecoderPlugin);
/*
* 7) Create subscriptions. This isn't strictly needed yet at this
* stage, but this way we error out quickly if it fails.
*/
CreateSubscriptions(
sourceConnection,
databaseName,
logicalRepTargetList);
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* 8) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshot, distributionColumnOverrides);
ereport(LOG, (errmsg("replicating changes for %s", operationName)));
/*
* 9) Logically replicate all the changes and do most of the table DDL,
* like index and foreign key creation.
*/
CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_SPLIT);
/*
* 10) Delete old shards metadata and mark the shards as to be deferred drop.
* Have to do that before creating the new shard metadata,
* because there's cross-checks preventing inconsistent metadata
* (like overlapping shards).
*/
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
DropShardListMetadata(sourceColocatedShardIntervalList);
/*
* 11) In case of create_distributed_table_concurrently, which converts
* a Citus local table to a distributed table, update the distributed
* table metadata now.
*
* We would rather have this be outside of the scope of NonBlockingShardSplit,
* but we cannot make metadata changes before replication slot creation, and
* we cannot create the replication slot before creating new shards and
* corresponding publications, because the decoder uses a catalog snapshot
* from the time of the slot creation, which means it would not be able to see
* the shards or publications when replication starts if it was created before.
*
* We also cannot easily move metadata changes to be after this function,
* because CreateForeignKeyConstraints relies on accurate metadata and
* we also want to perform the clean-up logic in PG_CATCH in case of
* failure.
*
* Hence, this appears to be the only suitable spot for updating
* pg_dist_partition and pg_dist_colocation.
*/
if (splitOperation == CREATE_DISTRIBUTED_TABLE)
{
ereport(LOG, (errmsg("creating child shards for %s",
operationName)));
/* we currently only use split for hash-distributed tables */
char distributionMethod = DISTRIBUTE_BY_HASH;
int shardCount = list_length(shardSplitPointsList) + 1;
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 2) Create dummy shards due to PG logical replication constraints.
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information.
*/
HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
CreateDummyShardsForShardGroup(
mapOfPlacementToDummyShardList,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
/*
* 3) Create replica identities on dummy shards. This needs to be done
* before the subscriptions are created. Otherwise the subscription
* creation will get stuck waiting for the publication to send a
* replica identity. Since we never actually write data into these
* dummy shards there's no point in creating these indexes after the
* initial COPY phase, like we do for the replica identities on the
* target shards.
*/
CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
ereport(LOG, (errmsg(
"creating replication artifacts (publications, replication slots, subscriptions for %s",
operationName)));
/* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash);
/* 5) Execute 'worker_split_shard_replication_setup UDF */
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList,
distributionColumnOverrides);
/*
* Subscriber flow starts from here.
* Populate 'ShardSplitSubscriberMetadata' for subscription management.
*/
List *logicalRepTargetList =
PopulateShardSplitSubscriptionsMetadataList(
publicationInfoHash, replicationSlotInfoList,
shardGroupSplitIntervalListList, workersForPlacementList);
HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash(
logicalRepTargetList);
/* Create connections to the target nodes */
CreateGroupedLogicalRepTargetsConnections(
groupedLogicalRepTargetsHash,
superUser, databaseName);
char *logicalRepDecoderPlugin = "citus";
/*
* 6) Create replication slots and keep track of their snapshot.
*/
char *snapshot = CreateReplicationSlots(
sourceConnection,
sourceReplicationConnection,
logicalRepTargetList,
logicalRepDecoderPlugin);
/*
* 7) Create subscriptions. This isn't strictly needed yet at this
* stage, but this way we error out quickly if it fails.
*/
CreateSubscriptions(
sourceConnection,
databaseName,
logicalRepTargetList);
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* 8) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshot, distributionColumnOverrides);
ereport(LOG, (errmsg("replicating changes for %s", operationName)));
/*
* 9) Logically replicate all the changes and do most of the table DDL,
* like index and foreign key creation.
*/
CompleteNonBlockingShardTransfer(sourceColocatedShardIntervalList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_SPLIT);
/*
* 10) Delete old shards metadata and mark the shards as to be deferred drop.
* Have to do that before creating the new shard metadata,
* because there's cross-checks preventing inconsistent metadata
* (like overlapping shards).
*/
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
DropShardListMetadata(sourceColocatedShardIntervalList);
/*
* 11) In case of create_distributed_table_concurrently, which converts
* a Citus local table to a distributed table, update the distributed
* table metadata now.
*
* We would rather have this be outside of the scope of NonBlockingShardSplit,
* but we cannot make metadata changes before replication slot creation, and
* we cannot create the replication slot before creating new shards and
* corresponding publications, because the decoder uses a catalog snapshot
* from the time of the slot creation, which means it would not be able to see
* the shards or publications when replication starts if it was created before.
*
* We also cannot easily move metadata changes to be after this function,
* because CreateForeignKeyConstraints relies on accurate metadata and
* we also want to perform the clean-up logic in PG_CATCH in case of
* failure.
*
* Hence, this appears to be the only suitable spot for updating
* pg_dist_partition and pg_dist_colocation.
*/
if (splitOperation == CREATE_DISTRIBUTED_TABLE)
{
/* we currently only use split for hash-distributed tables */
char distributionMethod = DISTRIBUTE_BY_HASH;
int shardCount = list_length(shardSplitPointsList) + 1;
UpdateDistributionColumnsForShardGroup(sourceColocatedShardIntervalList,
distributionColumnOverrides,
distributionMethod,
shardCount,
targetColocationId);
}
/* 12) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/* 13) create partitioning hierarchy, if any, this needs to be done
* after the metadata is correct, because it fails for some
* uninvestigated reason otherwise.
*/
CreatePartitioningHierarchy(logicalRepTargetList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* 14) Create foreign keys if exists after the metadata changes happening in
* InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateUncheckedForeignKeyConstraints(logicalRepTargetList);
/*
* 15) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node.
*/
ExecuteSplitShardReleaseSharedMemory(sourceConnection);
/* 16) Close source connection */
CloseConnection(sourceConnection);
/* 17) Close all subscriber connections */
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
/* 18) Close connection of template replication slot */
CloseConnection(sourceReplicationConnection);
UpdateDistributionColumnsForShardGroup(sourceColocatedShardIntervalList,
distributionColumnOverrides,
distributionMethod,
shardCount,
targetColocationId);
}
PG_CATCH();
{
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure(operationName);
/* 12) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
PG_RE_THROW();
}
PG_END_TRY();
/* 13) create partitioning hierarchy, if any, this needs to be done
* after the metadata is correct, because it fails for some
* uninvestigated reason otherwise.
*/
CreatePartitioningHierarchy(logicalRepTargetList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* 14) Create foreign keys if exists after the metadata changes happening in
* InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateUncheckedForeignKeyConstraints(logicalRepTargetList);
/*
* 15) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node.
*/
ExecuteSplitShardReleaseSharedMemory(sourceConnection);
/* 16) Close source connection */
CloseConnection(sourceConnection);
/* 17) Close all subscriber connections */
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
/* 18) Close connection of template replication slot */
CloseConnection(sourceReplicationConnection);
}

View File

@ -90,8 +90,7 @@ static void CopyShardTablesViaLogicalReplication(List *shardIntervalList,
char *sourceNodeName,
int32 sourceNodePort,
char *targetNodeName,
int32 targetNodePort,
char *operationName);
int32 targetNodePort);
static void CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort,
@ -1170,7 +1169,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
{
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName,
targetNodePort, operationName);
targetNodePort);
}
else
{
@ -1192,7 +1191,7 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
static void
CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort, char *operationName)
int32 targetNodePort)
{
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTablesViaLogicalReplication",
@ -1231,8 +1230,7 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
/* data copy is done seperately when logical replication is used */
LogicallyReplicateShards(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName, targetNodePort,
operationName);
sourceNodePort, targetNodeName, targetNodePort);
}

View File

@ -152,7 +152,7 @@ static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
*/
void
LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort,
char *targetNodeName, int targetNodePort, char *operationName)
char *targetNodeName, int targetNodePort)
{
AcquireLogicalReplicationLock();
char *superUser = CitusExtensionOwnerName();
@ -193,95 +193,79 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, superUser,
databaseName);
PG_TRY();
{
MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceConnection->hostname, sourceConnection->port);
MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceConnection->hostname, sourceConnection->port);
/* set up the publication on the source and subscription on the target */
CreatePublications(sourceConnection, publicationInfoHash);
char *snapshot = CreateReplicationSlots(
sourceConnection,
sourceReplicationConnection,
logicalRepTargetList,
"pgoutput");
/* set up the publication on the source and subscription on the target */
CreatePublications(sourceConnection, publicationInfoHash);
char *snapshot = CreateReplicationSlots(
sourceConnection,
sourceReplicationConnection,
logicalRepTargetList,
"pgoutput");
CreateSubscriptions(
sourceConnection,
sourceConnection->database,
logicalRepTargetList);
CreateSubscriptions(
sourceConnection,
sourceConnection->database,
logicalRepTargetList);
/* only useful for isolation testing, see the function comment for the details */
ConflictWithIsolationTestingBeforeCopy();
/* only useful for isolation testing, see the function comment for the details */
ConflictWithIsolationTestingBeforeCopy();
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
/*
* We have to create the primary key (or any other replica identity)
* before the update/delete operations that are queued will be
* replicated. Because if the replica identity does not exist on the
* target, the replication would fail.
*
* So the latest possible moment we could do this is right after the
* initial data COPY, but before enabling the susbcriptions. It might
* seem like a good idea to it after the initial data COPY, since
* it's generally the rule that it's cheaper to build an index at once
* than to create it incrementally. This general rule, is why we create
* all the regular indexes as late during the move as possible.
*
* But as it turns out in practice it's not as clear cut, and we saw a
* speed degradation in the time it takes to move shards when doing the
* replica identity creation after the initial COPY. So, instead we
* keep it before the COPY.
*/
CreateReplicaIdentities(logicalRepTargetList);
UpdatePlacementUpdateStatusForShardIntervalList(
shardList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
UpdatePlacementUpdateStatusForShardIntervalList(
shardList,
sourceNodeName,
sourceNodePort,
PLACEMENT_UPDATE_STATUS_COPYING_DATA);
CopyShardsToNode(sourceNode, targetNode, shardList, snapshot);
CopyShardsToNode(sourceNode, targetNode, shardList, snapshot);
/*
* We can close this connection now, because we're done copying the
* data and thus don't need access to the snapshot anymore. The
* replication slot will still be at the same LSN, because the
* subscriptions have not been enabled yet.
*/
CloseConnection(sourceReplicationConnection);
/*
* We can close this connection now, because we're done copying the
* data and thus don't need access to the snapshot anymore. The
* replication slot will still be at the same LSN, because the
* subscriptions have not been enabled yet.
*/
CloseConnection(sourceReplicationConnection);
/*
* Start the replication and copy all data
*/
CompleteNonBlockingShardTransfer(shardList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_MOVE);
/*
* Start the replication and copy all data
*/
CompleteNonBlockingShardTransfer(shardList,
sourceConnection,
publicationInfoHash,
logicalRepTargetList,
groupedLogicalRepTargetsHash,
SHARD_MOVE);
/*
* We use these connections exclusively for subscription management,
* because otherwise subsequent metadata changes may inadvertedly use
* these connections instead of the connections that were used to
* grab locks in BlockWritesToShardList.
*/
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
CloseConnection(sourceConnection);
}
PG_CATCH();
{
/* We don't need to UnclaimConnections since we're already erroring out */
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}
PG_END_TRY();
/*
* We use these connections exclusively for subscription management,
* because otherwise subsequent metadata changes may inadvertedly use
* these connections instead of the connections that were used to
* grab locks in BlockWritesToShardList.
*/
CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash);
CloseConnection(sourceConnection);
}

View File

@ -130,7 +130,7 @@ typedef enum LogicalRepType
extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort, char *operationName);
int targetNodePort);
extern void ConflictWithIsolationTestingBeforeCopy(void);
extern void ConflictWithIsolationTestingAfterCopy(void);

View File

@ -103,13 +103,6 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
int nodeGroupId,
CleanupPolicy policy);
/*
* FinalizeOperationNeedingCleanupOnFailure is be called by an operation to signal
* completion on failure. This will trigger cleanup of appropriate resources
* and cleanup records.
*/
extern void FinalizeOperationNeedingCleanupOnFailure(const char *operationName);
/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal
* completion on success. This will trigger cleanup of appropriate resources

View File

@ -74,6 +74,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'block_writes');
ERROR: relation citus_split_failure_test_schema.sensors_8981002 already exists on worker localhost:xxxxx
-- BEGIN : Split Shard, which is expected to fail.
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- BEGIN : Ensure tables were cleaned from worker
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";

View File

@ -54,10 +54,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers

View File

@ -122,13 +122,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISAB
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: role "citus_shard_move_subscription_role_xxxxxxx_xxxxxxx" cannot be dropped because some objects depend on it
DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx_xxxxxxx
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers
@ -397,9 +390,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").cancel(' || :pid || ')');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: canceling statement due to lock timeout
CONTEXT: while executing command on localhost:xxxxx
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
@ -422,9 +412,6 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
@ -460,9 +447,6 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- failure on parallel create index
ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size;
@ -479,9 +463,6 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 1 orphaned shards out of 5 after a citus_move_shard_placement operation failed
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');

View File

@ -51,17 +51,18 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(3 rows)
(5 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -165,11 +166,13 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(4 rows)
(6 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -271,21 +274,20 @@ WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
(5 rows)
(7 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -383,28 +385,25 @@ CONTEXT: while executing command on localhost:xxxxx
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 1 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 1 | 0
777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0
(8 rows)
(12 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -505,28 +504,25 @@ CONTEXT: while executing command on localhost:xxxxx
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 1 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 1 | 0
777 | 5 | citus_shard_split_subscription_role_xxxxxxx_xxxxxxx | 2 | 0
(8 rows)
(12 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');

View File

@ -766,8 +766,6 @@ SET citus.override_table_visibility TO false;
SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'block_writes');
WARNING: command DROP TABLE is disabled
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port

View File

@ -814,8 +814,6 @@ SET citus.override_table_visibility TO false;
SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical');
WARNING: command DROP TABLE is disabled
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port

View File

@ -64,6 +64,8 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'block_writes');
-- BEGIN : Split Shard, which is expected to fail.
SELECT public.wait_for_resource_cleanup();
-- BEGIN : Ensure tables were cleaned from worker
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";