diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 55f1b67d6..f2cb9ce43 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -61,6 +61,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" #include "distributed/shard_split.h" #include "distributed/shard_transfer.h" @@ -355,6 +356,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, "citus.shard_replication_factor > 1"))); } + DropOrphanedResourcesInSeparateTransaction(); + EnsureCitusTableCanBeCreated(relationId); EnsureValidDistributionColumn(relationId, distributionColumnName); diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index c74b86071..4a9f64188 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -33,6 +33,10 @@ #include "distributed/worker_transaction.h" #include "distributed/pg_dist_cleanup.h" +#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots" +#define STR_ERRCODE_OBJECT_IN_USE "55006" +#define STR_ERRCODE_UNDEFINED_OBJECT "42704" + /* GUC configuration for shard cleaner */ int NextOperationId = 0; int NextCleanupRecordId = 0; @@ -72,10 +76,22 @@ PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards); PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_resources); static int DropOrphanedShardsForMove(bool waitForLocks); -static bool TryDropShardOutsideTransaction(OperationId operationId, - char *qualifiedTableName, +static bool TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record, + char *nodeName, + int nodePort); +static bool TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort); +static bool TryDropSubscriptionOutsideTransaction(char *subscriptionName, + char *nodeName, + int nodePort); +static bool TryDropPublicationOutsideTransaction(char *publicationName, + char *nodeName, + int nodePort); +static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName, + char *nodeName, + int nodePort); +static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort); static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode); /* Functions for cleanup infrastructure */ @@ -91,7 +107,9 @@ static void DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId); static bool CleanupRecordExists(uint64 recordId); static List * ListCleanupRecords(void); static List * ListCleanupRecordsForCurrentOperation(void); -static int DropOrphanedShardsForCleanup(void); +static int DropOrphanedResourcesForCleanup(void); +static int CompareCleanupRecordsByObjectType(const void *leftElement, + const void *rightElement); /* * citus_cleanup_orphaned_shards implements a user-facing UDF to delete @@ -154,8 +172,6 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS) * orphaned resources that are present in the system. These resources are * orphaned by previous actions that either failed or marked the resources * for deferred cleanup. - * The UDF only supports dropping shards at the moment but will be extended in - * near future to clean any type of resource. * * The function takes no arguments and runs on co-ordinator. It cannot be run in a * transaction, because holding the locks it takes for a long time is not good. @@ -169,7 +185,7 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS) EnsureCoordinator(); PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources"); - int droppedCount = DropOrphanedShardsForCleanup(); + int droppedCount = DropOrphanedResourcesForCleanup(); if (droppedCount > 0) { ereport(NOTICE, (errmsg("cleaned up %d orphaned resources", droppedCount))); @@ -180,33 +196,34 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS) /* - * DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by + * DropOrphanedResourcesInSeparateTransaction cleans up orphaned resources by * connecting to localhost. This is done, so that the locks that * DropOrphanedShardsForMove takes are only held for a short time. */ void -DropOrphanedShardsInSeparateTransaction(void) +DropOrphanedResourcesInSeparateTransaction(void) { int connectionFlag = FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); + ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_resources()"); ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards()"); CloseConnection(connection); } /* - * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove and - * DropOrphanedShardsForCleanup that catches any errors to make it safe to + * TryDropOrphanedResources is a wrapper around DropOrphanedShardsForMove and + * DropOrphanedResourcesForCleanup that catches any errors to make it safe to * use in the maintenance daemon. * * If dropping any of the shards failed this function returns -1, otherwise it - * returns the number of dropped shards. + * returns the number of dropped resources. */ int -TryDropOrphanedShards(bool waitForLocks) +TryDropOrphanedResources(bool waitForLocks) { - int droppedShardCount = 0; + int droppedResourceCount = 0; MemoryContext savedContext = CurrentMemoryContext; /* @@ -217,8 +234,8 @@ TryDropOrphanedShards(bool waitForLocks) PG_TRY(); { - droppedShardCount = DropOrphanedShardsForMove(waitForLocks); - droppedShardCount += DropOrphanedShardsForCleanup(); + droppedResourceCount = DropOrphanedShardsForMove(waitForLocks); + droppedResourceCount += DropOrphanedResourcesForCleanup(); /* * Releasing a subtransaction doesn't free its memory context, since the @@ -241,19 +258,19 @@ TryDropOrphanedShards(bool waitForLocks) } PG_END_TRY(); - return droppedShardCount; + return droppedResourceCount; } /* - * DropOrphanedShardsForCleanup removes resources that were marked for cleanup by operation. + * DropOrphanedResourcesForCleanup removes resources that were marked for cleanup by operation. * It does so by trying to take an exclusive lock on the resources. If the lock cannot be * obtained it skips the resource and continues with others. * The resource that has been skipped will be removed at a later iteration when there are no * locks held anymore. */ static int -DropOrphanedShardsForCleanup() +DropOrphanedResourcesForCleanup() { /* Only runs on Coordinator */ if (!IsCoordinator()) @@ -263,20 +280,19 @@ DropOrphanedShardsForCleanup() List *cleanupRecordList = ListCleanupRecords(); - int removedShardCountForCleanup = 0; - int failedShardCountForCleanup = 0; + /* + * We sort the records before cleaning up by their types, because of dependencies. + * For example, a subscription might depend on a publication. + */ + cleanupRecordList = SortList(cleanupRecordList, + CompareCleanupRecordsByObjectType); + + int removedResourceCountForCleanup = 0; + int failedResourceCountForCleanup = 0; CleanupRecord *record = NULL; foreach_ptr(record, cleanupRecordList) { - /* We only support one resource type at the moment */ - if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT) - { - ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", - record->objectType))); - continue; - } - if (!PrimaryNodeForGroup(record->nodeGroupId, NULL)) { continue; @@ -289,7 +305,7 @@ DropOrphanedShardsForCleanup() continue; } - char *qualifiedTableName = record->objectName; + char *resourceName = record->objectName; WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); /* @@ -302,29 +318,28 @@ DropOrphanedShardsForCleanup() continue; } - if (TryDropShardOutsideTransaction(record->operationId, - qualifiedTableName, - workerNode->workerName, - workerNode->workerPort)) + if (TryDropResourceByCleanupRecordOutsideTransaction(record, + workerNode->workerName, + workerNode->workerPort)) { if (record->policy == CLEANUP_DEFERRED_ON_SUCCESS) { - ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d " + ereport(LOG, (errmsg("deferred drop of orphaned resource %s on %s:%d " "completed", - qualifiedTableName, + resourceName, workerNode->workerName, workerNode->workerPort))); } else { - ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d which " + ereport(LOG, (errmsg("cleaned up orphaned resource %s on %s:%d which " "was left behind after a failed operation", - qualifiedTableName, + resourceName, workerNode->workerName, workerNode->workerPort))); } /* delete the cleanup record */ DeleteCleanupRecordByRecordId(record->recordId); - removedShardCountForCleanup++; + removedResourceCountForCleanup++; } else { @@ -332,18 +347,18 @@ DropOrphanedShardsForCleanup() * We log failures at the end, since they occur repeatedly * for a large number of objects. */ - failedShardCountForCleanup++; + failedResourceCountForCleanup++; } } - if (failedShardCountForCleanup > 0) + if (failedResourceCountForCleanup > 0) { - ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d", - failedShardCountForCleanup, + ereport(WARNING, (errmsg("failed to clean up %d orphaned resources out of %d", + failedResourceCountForCleanup, list_length(cleanupRecordList)))); } - return removedShardCountForCleanup; + return removedResourceCountForCleanup; } @@ -414,8 +429,7 @@ DropOrphanedShardsForMove(bool waitForLocks) ShardInterval *shardInterval = LoadShardInterval(placement->shardId); char *qualifiedTableName = ConstructQualifiedShardName(shardInterval); - if (TryDropShardOutsideTransaction(INVALID_OPERATION_ID, - qualifiedTableName, + if (TryDropShardOutsideTransaction(qualifiedTableName, shardPlacement->nodeName, shardPlacement->nodePort)) { @@ -478,50 +492,38 @@ FinalizeOperationNeedingCleanupOnFailure(const char *operationName) List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation(); - int removedShardCountOnComplete = 0; + /* + * We sort the records before cleaning up by their types, because of dependencies. + * For example, a subscription might depend on a publication. + */ + currentOperationRecordList = SortList(currentOperationRecordList, + CompareCleanupRecordsByObjectType); + int failedShardCountOnComplete = 0; CleanupRecord *record = NULL; foreach_ptr(record, currentOperationRecordList) { - /* We only supporting cleaning shards right now */ - if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT) - { - ereport(WARNING, (errmsg( - "Invalid object type %d on failed operation cleanup", - record->objectType))); - continue; - } - if (record->policy == CLEANUP_ALWAYS || record->policy == CLEANUP_ON_FAILURE) { - char *qualifiedTableName = record->objectName; WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); /* * For all resources of CurrentOperationId that are marked as 'CLEANUP_ALWAYS' or * 'CLEANUP_ON_FAILURE', drop resource and cleanup records. */ - if (TryDropShardOutsideTransaction(CurrentOperationId, - qualifiedTableName, - workerNode->workerName, - workerNode->workerPort)) + if (TryDropResourceByCleanupRecordOutsideTransaction(record, + workerNode->workerName, + workerNode->workerPort)) { - ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a " - "%s operation failed", - qualifiedTableName, - workerNode->workerName, workerNode->workerPort, - operationName))); - /* * Given the operation is failing and we will abort its transaction, we cannot delete * records in the current transaction. Delete these records outside of the * current transaction via a localhost connection. */ DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId); - removedShardCountOnComplete++; } - else + else if (record->objectType == CLEANUP_OBJECT_SHARD_PLACEMENT) { /* * We log failures at the end, since they occur repeatedly @@ -557,50 +559,38 @@ FinalizeOperationNeedingCleanupOnSuccess(const char *operationName) List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation(); - int removedShardCountOnComplete = 0; + /* + * We sort the records before cleaning up by their types, because of dependencies. + * For example, a subscription might depend on a publication. + */ + currentOperationRecordList = SortList(currentOperationRecordList, + CompareCleanupRecordsByObjectType); + int failedShardCountOnComplete = 0; CleanupRecord *record = NULL; foreach_ptr(record, currentOperationRecordList) { - /* We only supporting cleaning shards right now */ - if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT) - { - ereport(WARNING, (errmsg( - "Invalid object type %d on operation cleanup", - record->objectType))); - continue; - } - if (record->policy == CLEANUP_ALWAYS) { - char *qualifiedTableName = record->objectName; WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); /* * For all resources of CurrentOperationId that are marked as 'CLEANUP_ALWAYS' * drop resource and cleanup records. */ - if (TryDropShardOutsideTransaction(CurrentOperationId, - qualifiedTableName, - workerNode->workerName, - workerNode->workerPort)) + if (TryDropResourceByCleanupRecordOutsideTransaction(record, + workerNode->workerName, + workerNode->workerPort)) { - ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a " - "%s operation completed", - qualifiedTableName, - workerNode->workerName, workerNode->workerPort, - operationName))); - /* * Delete cleanup records outside transaction as: * The resources are marked as 'CLEANUP_ALWAYS' and should be cleaned no matter * the operation succeeded or failed. */ DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId); - removedShardCountOnComplete++; } - else + else if (record->objectType == CLEANUP_OBJECT_SHARD_PLACEMENT) { /* * We log failures at the end, since they occur repeatedly @@ -632,6 +622,30 @@ FinalizeOperationNeedingCleanupOnSuccess(const char *operationName) } +/* + * CompareRecordsByObjectType is a comparison function for sort + * cleanup records by their object type. + */ +static int +CompareCleanupRecordsByObjectType(const void *leftElement, const void *rightElement) +{ + CleanupRecord *leftRecord = *((CleanupRecord **) leftElement); + CleanupRecord *rightRecord = *((CleanupRecord **) rightElement); + + /* we compare 64-bit integers, instead of casting their difference to int */ + if (leftRecord->objectType > rightRecord->objectType) + { + return 1; + } + else if (leftRecord->objectType < rightRecord->objectType) + { + return -1; + } + + return 0; +} + + /* * InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup_record entry * as part of the current transaction. This is primarily useful for deferred drop scenarios, @@ -680,7 +694,7 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType, /* - * InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup_record entry + * InsertCleanupRecordInSubtransaction inserts a new pg_dist_cleanup_record entry * in a separate transaction to ensure the record persists after rollback. We should * delete these records if the operation completes successfully. * @@ -769,12 +783,64 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode) /* - * TryDropShard tries to drop the given shard placement and returns + * TryDropResourceByCleanupRecordOutsideTransaction tries to drop the given resource + * and returns true on success. + */ +static bool +TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record, + char *nodeName, + int nodePort) +{ + switch (record->objectType) + { + case CLEANUP_OBJECT_SHARD_PLACEMENT: + { + return TryDropShardOutsideTransaction(record->objectName, + nodeName, nodePort); + } + + case CLEANUP_OBJECT_SUBSCRIPTION: + { + return TryDropSubscriptionOutsideTransaction(record->objectName, + nodeName, nodePort); + } + + case CLEANUP_OBJECT_PUBLICATION: + { + return TryDropPublicationOutsideTransaction(record->objectName, + nodeName, nodePort); + } + + case CLEANUP_OBJECT_REPLICATION_SLOT: + { + return TryDropReplicationSlotOutsideTransaction(record->objectName, + nodeName, nodePort); + } + + case CLEANUP_OBJECT_USER: + { + return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort); + } + + default: + { + ereport(WARNING, (errmsg( + "Invalid object type %d on failed operation cleanup", + record->objectType))); + return false; + } + } + + return false; +} + + +/* + * TryDropShardOutsideTransaction tries to drop the given shard placement and returns * true on success. */ static bool -TryDropShardOutsideTransaction(OperationId operationId, - char *qualifiedTableName, +TryDropShardOutsideTransaction(char *qualifiedTableName, char *nodeName, int nodePort) { @@ -808,6 +874,234 @@ TryDropShardOutsideTransaction(OperationId operationId, } +/* + * TryDropSubscriptionOutsideTransaction drops subscription with the given name on the + * subscriber node if it exists. Note that this doesn't drop the replication slot on the + * publisher node. The reason is that sometimes this is not possible. To known + * cases where this is not possible are: + * 1. Due to the node with the replication slot being down. + * 2. Due to a deadlock when the replication is on the same node as the + * subscription, which is the case for shard splits to the local node. + * + * So instead of directly dropping the subscription, including the attached + * replication slot, the subscription is first disconnected from the + * replication slot before dropping it. The replication slot itself should be + * dropped using DropReplicationSlot on the source connection. + */ +static bool +TryDropSubscriptionOutsideTransaction(char *subscriptionName, + char *nodeName, + int nodePort) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CitusExtensionOwnerName(), + NULL); + + RemoteTransactionBegin(connection); + + if (ExecuteOptionalRemoteCommand(connection, + "SET LOCAL lock_timeout TO '1s'", NULL) != 0) + { + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return false; + } + + int querySent = SendRemoteCommand( + connection, + psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(subscriptionName))); + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return false; + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + + if (!IsResponseOK(result)) + { + char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE); + if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_UNDEFINED_OBJECT) == 0) + { + /* + * The subscription doesn't exist, so we can return right away. + * This DropSubscription call is effectively a no-op. + */ + PQclear(result); + ForgetResults(connection); + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return true; + } + else + { + ReportResultError(connection, result, WARNING); + PQclear(result); + ForgetResults(connection); + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return false; + } + } + + PQclear(result); + ForgetResults(connection); + RemoteTransactionCommit(connection); + ResetRemoteTransaction(connection); + + StringInfo alterQuery = makeStringInfo(); + appendStringInfo(alterQuery, + "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", + quote_identifier(subscriptionName)); + + StringInfo dropQuery = makeStringInfo(); + appendStringInfo(dropQuery, + "DROP SUBSCRIPTION %s", + quote_identifier(subscriptionName)); + + List *dropCommandList = list_make3("SET LOCAL lock_timeout TO '1s'", + alterQuery->data, dropQuery->data); + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + connection, + dropCommandList); + + return success; +} + + +/* + * TryDropPublicationOutsideTransaction drops the publication with the given name if it + * exists. + */ +static bool +TryDropPublicationOutsideTransaction(char *publicationName, + char *nodeName, + int nodePort) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CitusExtensionOwnerName(), + NULL); + StringInfo dropQuery = makeStringInfo(); + appendStringInfo(dropQuery, + "DROP PUBLICATION IF EXISTS %s", + quote_identifier(publicationName)); + + List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'", + dropQuery->data); + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + connection, + dropCommandList); + + return success; +} + + +/* + * TryDropReplicationSlotOutsideTransaction drops the replication slot with the given + * name if it exists. + */ +static bool +TryDropReplicationSlotOutsideTransaction(char *replicationSlotName, + char *nodeName, + int nodePort) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CitusExtensionOwnerName(), + NULL); + + RemoteTransactionBegin(connection); + + if (ExecuteOptionalRemoteCommand(connection, + "SET LOCAL lock_timeout TO '1s'", NULL) != 0) + { + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return false; + } + + int querySent = SendRemoteCommand( + connection, + psprintf( + "select pg_drop_replication_slot(slot_name) from " + REPLICATION_SLOT_CATALOG_TABLE_NAME + " where slot_name = %s", + quote_literal_cstr(replicationSlotName)) + ); + + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + return false; + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + + if (IsResponseOK(result)) + { + PQclear(result); + ForgetResults(connection); + RemoteTransactionCommit(connection); + ResetRemoteTransaction(connection); + return true; + } + + char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE); + if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) != 0) + { + /* throw a warning unless object is in use */ + ReportResultError(connection, result, WARNING); + } + + PQclear(result); + ForgetResults(connection); + RemoteTransactionAbort(connection); + ResetRemoteTransaction(connection); + + return false; +} + + +/* + * TryDropUserOutsideTransaction drops the user with the given name if it exists. + */ +static bool +TryDropUserOutsideTransaction(char *username, + char *nodeName, int nodePort) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + CitusExtensionOwnerName(), + NULL); + + /* + * The DROP USER command should not propagate, so we temporarily disable + * DDL propagation. + */ + bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection( + connection, + list_make3( + "SET LOCAL lock_timeout TO '1s'", + "SET LOCAL citus.enable_ddl_propagation TO OFF;", + psprintf("DROP USER IF EXISTS %s;", + quote_identifier(username)))); + + return success; +} + + /* * GetNextOperationId allocates and returns a unique operationId for an operation * requiring potential cleanup. This allocation occurs both in shared memory and diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index bebb23b48..2687eaa6b 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -772,7 +772,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, ListCell *placementUpdateCell = NULL; - DropOrphanedShardsInSeparateTransaction(); + DropOrphanedResourcesInSeparateTransaction(); foreach(placementUpdateCell, placementUpdateList) { @@ -1901,7 +1901,7 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo return 0; } - DropOrphanedShardsInSeparateTransaction(); + DropOrphanedResourcesInSeparateTransaction(); /* find the name of the shard transfer mode to interpolate in the scheduled command */ Datum shardTranferModeLabelDatum = diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index e62412c7c..4185576a6 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -522,6 +522,8 @@ SplitShard(SplitMode splitMode, sourceColocatedShardIntervalList = colocatedShardIntervalList; } + DropOrphanedResourcesInSeparateTransaction(); + /* use the user-specified shard ID as the split workflow ID */ uint64 splitWorkflowId = shardIntervalToSplit->shardId; @@ -771,12 +773,11 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, workerPlacementNode->workerPort))); } - CleanupPolicy policy = CLEANUP_ON_FAILURE; InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, ConstructQualifiedShardName( shardInterval), workerPlacementNode->groupId, - policy); + CLEANUP_ON_FAILURE); /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, @@ -1407,11 +1408,10 @@ InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList) * We also log cleanup record in the current transaction. If the current transaction rolls back, * we do not generate a record at all. */ - CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS; InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, qualifiedShardName, placement->groupId, - policy); + CLEANUP_DEFERRED_ON_SUCCESS); } } } @@ -1494,8 +1494,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, workersForPlacementList); - DropAllLogicalReplicationLeftovers(SHARD_SPLIT); - int connectionFlags = FORCE_NEW_CONNECTION; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection( connectionFlags, @@ -1722,12 +1720,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* end ongoing transactions to enable us to clean up */ ShutdownAllConnections(); - /* Do a best effort cleanup of shards created on workers in the above block - * TODO(niupre): We don't need to do this once shard cleaner can clean replication - * artifacts. - */ - DropAllLogicalReplicationLeftovers(SHARD_SPLIT); - /* * Drop temporary objects that were marked as CLEANUP_ON_FAILURE * or CLEANUP_ALWAYS. @@ -1820,12 +1812,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, * we want to cleanup irrespective of operation success or failure. */ - CleanupPolicy policy = CLEANUP_ALWAYS; InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, ConstructQualifiedShardName( shardInterval), workerPlacementNode->groupId, - policy); + CLEANUP_ALWAYS); /* Create dummy source shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, @@ -1883,12 +1874,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, * we want to cleanup irrespective of operation success or failure. */ - CleanupPolicy policy = CLEANUP_ALWAYS; InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, ConstructQualifiedShardName( shardInterval), sourceWorkerNode->groupId, - policy); + CLEANUP_ALWAYS); /* Create dummy split child shard on source worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 81bef791a..9e777f2a1 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -1157,6 +1157,8 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP return; } + DropOrphanedResourcesInSeparateTransaction(); + if (useLogicalReplication) { CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName, @@ -1209,9 +1211,17 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa MemoryContextSwitchTo(oldContext); + /* Start operation to prepare for generating cleanup records */ + RegisterOperationNeedingCleanup(); + /* data copy is done seperately when logical replication is used */ LogicallyReplicateShards(shardIntervalList, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); + + /* + * Drop temporary objects that were marked as CLEANUP_ALWAYS. + */ + FinalizeOperationNeedingCleanupOnSuccess("citus_[move/copy]_shard_placement"); } @@ -1493,10 +1503,10 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo if (targetPlacement->shardState == SHARD_STATE_TO_DELETE) { /* - * Trigger deletion of orphaned shards and hope that this removes + * Trigger deletion of orphaned resources and hope that this removes * the shard. */ - DropOrphanedShardsInSeparateTransaction(); + DropOrphanedResourcesInSeparateTransaction(); shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 1edd48c48..051c88735 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -45,6 +45,7 @@ #include "distributed/distributed_planner.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" +#include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" #include "distributed/shard_transfer.h" #include "distributed/version_compat.h" @@ -65,11 +66,6 @@ #include "utils/ruleutils.h" #include "utils/syscache.h" -#define STR_ERRCODE_UNDEFINED_OBJECT "42704" -#define STR_ERRCODE_OBJECT_IN_USE "55006" - - -#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots" #define CURRENT_LOG_POSITION_COMMAND "SELECT pg_current_wal_lsn()" /* decimal representation of Adler-16 hash value of citus_shard_move_publication */ @@ -135,17 +131,7 @@ static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition( GroupedLogicalRepTargets *groupedLogicalRepTargets); static void AcquireLogicalReplicationLock(void); -static void DropSubscription(MultiConnection *connection, - char *subscriptionName); -static void DropPublication(MultiConnection *connection, char *publicationName); -static void DropUser(MultiConnection *connection, char *username); -static void DropReplicationSlot(MultiConnection *connection, - char *publicationName); -static void DropAllSubscriptions(MultiConnection *connection, LogicalRepType type); -static void DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type); -static void DropAllPublications(MultiConnection *connection, LogicalRepType type); -static void DropAllUsers(MultiConnection *connection, LogicalRepType type); static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals); static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, @@ -153,10 +139,6 @@ static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, GroupedLogicalRepTargets * groupedLogicalRepTargets); -static void RecreateGroupedLogicalRepTargetsConnections( - HTAB *groupedLogicalRepTargetsHash, - char *user, - char *databaseName); /* * LogicallyReplicateShards replicates a list of shards from one node to another @@ -184,8 +166,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo return; } - DropAllLogicalReplicationLeftovers(SHARD_MOVE); - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort, superUser, databaseName); @@ -291,34 +271,14 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo } PG_CATCH(); { - /* - * Try our best not to leave any left-over subscription or publication. - * - * Although it is not very advisable to use code-paths that could throw - * new errors, we prefer to do it here since we expect the cost of leaving - * left-overs not be very low. - */ - - /* reconnect if the connection failed or is waiting for a command */ - RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, - superUser, databaseName); - - DropSubscriptions(logicalRepTargetList); - - /* reconnect if the connection failed or is waiting for a command */ - if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK || - PQisBusy(sourceConnection->pgConn)) - { - sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceNodeName, - sourceNodePort, superUser, - databaseName); - } - DropReplicationSlots(sourceConnection, logicalRepTargetList); - DropPublications(sourceConnection, publicationInfoHash); - /* We don't need to UnclaimConnections since we're already erroring out */ + /* + * Drop temporary objects that were marked as CLEANUP_ON_FAILURE + * or CLEANUP_ALWAYS. + */ + FinalizeOperationNeedingCleanupOnFailure("citus_[move/copy]_shard_placement"); + PG_RE_THROW(); } PG_END_TRY(); @@ -452,11 +412,6 @@ CompleteNonBlockingShardTransfer(List *shardList, sourceConnection->hostname, sourceConnection->port, PLACEMENT_UPDATE_STATUS_COMPLETING); - - /* we're done, cleanup the publication and subscription */ - DropSubscriptions(logicalRepTargetList); - DropReplicationSlots(sourceConnection, logicalRepTargetList); - DropPublications(sourceConnection, publicationInfoHash); } @@ -573,74 +528,6 @@ AcquireLogicalReplicationLock(void) } -/* - * DropAllLogicalReplicationLeftovers drops all subscriptions, publications, - * roles and replication slots on all nodes that were related to this - * LogicalRepType. These might have been left there after the coordinator - * crashed during a shard move/split. It's important to delete them for two - * reasons: - * 1. Starting new shard moves will fail when they exist, because it cannot - * create them. - * 2. Leftover replication slots that are not consumed from anymore make it - * impossible for WAL to be dropped. This can cause out-of-disk issues. - */ -void -DropAllLogicalReplicationLeftovers(LogicalRepType type) -{ - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - - /* - * We need connections that are not currently inside a transaction. The - * reason for this is that operations on subscriptions, publications and - * replication slots cannot be run in a transaction. By forcing a new - * connection we make sure no transaction is active on the connection. - */ - int connectionFlags = FORCE_NEW_CONNECTION; - - List *workerNodeList = ActivePrimaryNodeList(AccessShareLock); - List *cleanupConnectionList = NIL; - WorkerNode *workerNode = NULL; - - /* - * First we try to remove the subscription, everywhere and only after - * having done that we try to remove the publication everywhere. This is - * needed, because the publication can only be removed if there's no active - * subscription on it. - */ - foreach_ptr(workerNode, workerNodeList) - { - MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection( - connectionFlags, workerNode->workerName, workerNode->workerPort, - superUser, databaseName); - cleanupConnectionList = lappend(cleanupConnectionList, cleanupConnection); - - DropAllSubscriptions(cleanupConnection, type); - DropAllUsers(cleanupConnection, type); - } - - MultiConnection *cleanupConnection = NULL; - foreach_ptr(cleanupConnection, cleanupConnectionList) - { - /* - * If replication slot could not be dropped while dropping the - * subscriber, drop it here. - */ - DropAllReplicationSlots(cleanupConnection, type); - DropAllPublications(cleanupConnection, type); - - /* - * We close all connections that we opened for the dropping here. That - * way we don't keep these connections open unnecessarily during the - * 'LogicalRepType' operation (which can take a long time). We might - * need to reopen a few later on, but that seems better than keeping - * many open for no reason for a long time. - */ - CloseConnection(cleanupConnection); - } -} - - /* * PrepareReplicationSubscriptionList returns list of shards to be logically * replicated from given shard list. This is needed because Postgres does not @@ -1314,115 +1201,6 @@ ConflictWithIsolationTestingAfterCopy(void) } -/* - * DropReplicationSlots drops the replication slots used for shard moves/splits - * over the given connection (if they exist). - */ -void -DropReplicationSlots(MultiConnection *sourceConnection, List *logicalRepTargetList) -{ - LogicalRepTarget *target = NULL; - foreach_ptr(target, logicalRepTargetList) - { - DropReplicationSlot(sourceConnection, target->replicationSlot->name); - } -} - - -/* - * DropPublications drops the publications used for shard moves/splits over the - * given connection (if they exist). - */ -void -DropPublications(MultiConnection *sourceConnection, HTAB *publicationInfoHash) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, publicationInfoHash); - - PublicationInfo *entry = NULL; - while ((entry = (PublicationInfo *) hash_seq_search(&status)) != NULL) - { - DropPublication(sourceConnection, entry->name); - } -} - - -/* - * DropReplicationSlot drops the replication slot with the given name - * if it exists. It retries if the command fails with an OBJECT_IN_USE error. - */ -static void -DropReplicationSlot(MultiConnection *connection, char *replicationSlotName) -{ - int maxSecondsToTryDropping = 20; - bool raiseInterrupts = true; - PGresult *result = NULL; - - /* we'll retry in case of an OBJECT_IN_USE error */ - while (maxSecondsToTryDropping >= 0) - { - int querySent = SendRemoteCommand( - connection, - psprintf( - "select pg_drop_replication_slot(slot_name) from " - REPLICATION_SLOT_CATALOG_TABLE_NAME - " where slot_name = %s", - quote_literal_cstr(replicationSlotName)) - ); - - if (querySent == 0) - { - ReportConnectionError(connection, ERROR); - } - - result = GetRemoteCommandResult(connection, raiseInterrupts); - - if (IsResponseOK(result)) - { - /* no error, we are good to go */ - break; - } - - char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE); - if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) == 0 && - maxSecondsToTryDropping > 0) - { - /* retry dropping the replication slot after sleeping for one sec */ - maxSecondsToTryDropping--; - pg_usleep(1000); - } - else - { - /* - * Report error if: - * - Error code is not 55006 (Object In Use) - * - Or, we have made enough number of retries (currently 20), but didn't work - */ - ReportResultError(connection, result, ERROR); - } - - PQclear(result); - ForgetResults(connection); - } - - PQclear(result); - ForgetResults(connection); -} - - -/* - * DropPublication drops the publication with the given name if it - * exists. - */ -static void -DropPublication(MultiConnection *connection, char *publicationName) -{ - ExecuteCriticalRemoteCommand(connection, psprintf( - "DROP PUBLICATION IF EXISTS %s", - quote_identifier(publicationName))); -} - - /* * PublicationName returns the name of the publication for the given node and * table owner. @@ -1530,187 +1308,6 @@ GetQueryResultStringList(MultiConnection *connection, char *query) } -/* - * DropAllSubscriptions drops all the existing subscriptions that - * match our shard move naming scheme on the node that the connection points - * to. - */ -static void -DropAllSubscriptions(MultiConnection *connection, LogicalRepType type) -{ - char *query = psprintf( - "SELECT subname FROM pg_subscription " - "WHERE subname LIKE %s || '%%'", - quote_literal_cstr(subscriptionPrefix[type])); - List *subscriptionNameList = GetQueryResultStringList(connection, query); - char *subscriptionName; - foreach_ptr(subscriptionName, subscriptionNameList) - { - DropSubscription(connection, subscriptionName); - } -} - - -/* - * DropAllUsers drops all the users that match our shard move naming - * scheme for temporary shard move users on the node that the connection points - * to. - */ -static void -DropAllUsers(MultiConnection *connection, LogicalRepType type) -{ - char *query = psprintf( - "SELECT rolname FROM pg_roles " - "WHERE rolname LIKE %s || '%%'", - quote_literal_cstr(subscriptionRolePrefix[type])); - List *usernameList = GetQueryResultStringList(connection, query); - char *username; - foreach_ptr(username, usernameList) - { - DropUser(connection, username); - } -} - - -/* - * DropAllReplicationSlots drops all the existing replication slots - * that match our shard move naming scheme on the node that the connection - * points to. - */ -static void -DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type) -{ - char *query = psprintf( - "SELECT slot_name FROM pg_replication_slots " - "WHERE slot_name LIKE %s || '%%'", - quote_literal_cstr(replicationSlotPrefix[type])); - List *slotNameList = GetQueryResultStringList(connection, query); - char *slotName; - foreach_ptr(slotName, slotNameList) - { - DropReplicationSlot(connection, slotName); - } -} - - -/* - * DropAllPublications drops all the existing publications that - * match our shard move naming scheme on the node that the connection points - * to. - */ -static void -DropAllPublications(MultiConnection *connection, LogicalRepType type) -{ - char *query = psprintf( - "SELECT pubname FROM pg_publication " - "WHERE pubname LIKE %s || '%%'", - quote_literal_cstr(publicationPrefix[type])); - List *publicationNameList = GetQueryResultStringList(connection, query); - char *publicationName; - foreach_ptr(publicationName, publicationNameList) - { - DropPublication(connection, publicationName); - } -} - - -/* - * DropSubscriptions drops all the subscriptions in the logicalRepTargetList - * from the subscriber node. It also drops the temporary users that are used as - * owners for of the subscription. This doesn't drop the replication slots on - * the publisher, these should be dropped using DropReplicationSlots. - */ -void -DropSubscriptions(List *logicalRepTargetList) -{ - LogicalRepTarget *target = NULL; - foreach_ptr(target, logicalRepTargetList) - { - DropSubscription(target->superuserConnection, - target->subscriptionName); - DropUser(target->superuserConnection, target->subscriptionOwnerName); - } -} - - -/* - * DropSubscription drops subscription with the given name on the subscriber - * node if it exists. Note that this doesn't drop the replication slot on the - * publisher node. The reason is that sometimes this is not possible. To known - * cases where this is not possible are: - * 1. Due to the node with the replication slot being down. - * 2. Due to a deadlock when the replication is on the same node as the - * subscription, which is the case for shard splits to the local node. - * - * So instead of directly dropping the subscription, including the attached - * replication slot, the subscription is first disconnected from the - * replication slot before dropping it. The replication slot itself should be - * dropped using DropReplicationSlot on the source connection. - */ -static void -DropSubscription(MultiConnection *connection, char *subscriptionName) -{ - int querySent = SendRemoteCommand( - connection, - psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(subscriptionName))); - if (querySent == 0) - { - ReportConnectionError(connection, ERROR); - } - - bool raiseInterrupts = true; - PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); - if (!IsResponseOK(result)) - { - char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE); - if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_UNDEFINED_OBJECT) == 0) - { - /* - * The subscription doesn't exist, so we can return right away. - * This DropSubscription call is effectively a no-op. - */ - return; - } - else - { - ReportResultError(connection, result, ERROR); - PQclear(result); - ForgetResults(connection); - } - } - - PQclear(result); - ForgetResults(connection); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", - quote_identifier(subscriptionName))); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "DROP SUBSCRIPTION %s", - quote_identifier(subscriptionName))); -} - - -/* - * DropUser drops the user with the given name if it exists. - */ -static void -DropUser(MultiConnection *connection, char *username) -{ - /* - * The DROP USER command should not propagate, so we temporarily disable - * DDL propagation. - */ - SendCommandListToWorkerOutsideTransactionWithConnection( - connection, - list_make2( - "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf("DROP USER IF EXISTS %s;", - quote_identifier(username)))); -} - - /* * CreatePublications creates a the publications defined in the * publicationInfoHash over the given connection. @@ -1744,6 +1341,13 @@ CreatePublications(MultiConnection *connection, prefixWithComma = true; } + WorkerNode *worker = FindWorkerNode(connection->hostname, + connection->port); + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_PUBLICATION, + entry->name, + worker->groupId, + CLEANUP_ALWAYS); + ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); pfree(createPublicationCommand->data); pfree(createPublicationCommand); @@ -1835,6 +1439,13 @@ CreateReplicationSlots(MultiConnection *sourceConnection, { ReplicationSlotInfo *replicationSlot = target->replicationSlot; + WorkerNode *worker = FindWorkerNode(sourceConnection->hostname, + sourceConnection->port); + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_REPLICATION_SLOT, + replicationSlot->name, + worker->groupId, + CLEANUP_ALWAYS); + if (!firstReplicationSlot) { firstReplicationSlot = replicationSlot; @@ -1880,6 +1491,9 @@ CreateSubscriptions(MultiConnection *sourceConnection, { int ownerId = target->tableOwnerId; + WorkerNode *worker = FindWorkerNode(target->superuserConnection->hostname, + target->superuserConnection->port); + /* * The CREATE USER command should not propagate, so we temporarily * disable DDL propagation. @@ -1898,6 +1512,11 @@ CreateSubscriptions(MultiConnection *sourceConnection, GetUserNameFromId(ownerId, false) ))); + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_USER, + target->subscriptionOwnerName, + worker->groupId, + CLEANUP_ALWAYS); + StringInfo conninfo = makeStringInfo(); appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " "connect_timeout=20", @@ -1937,6 +1556,12 @@ CreateSubscriptions(MultiConnection *sourceConnection, createSubscriptionCommand->data); pfree(createSubscriptionCommand->data); pfree(createSubscriptionCommand); + + InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SUBSCRIPTION, + target->subscriptionName, + worker->groupId, + CLEANUP_ALWAYS); + ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf( "ALTER SUBSCRIPTION %s OWNER TO %s", target->subscriptionName, @@ -2110,62 +1735,6 @@ CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, } -/* - * RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the - * nodes in the groupedLogicalRepTargetsHash where the old connection is broken or - * currently running a query. - * - * IMPORTANT: When it recreates the connection, it doesn't close the existing - * connection. This means that this function should only be called when we know - * we'll throw an error afterwards, otherwise we would leak these connections. - */ -static void -RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, - char *user, - char *databaseName) -{ - int connectionFlags = FORCE_NEW_CONNECTION; - HASH_SEQ_STATUS status; - GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; - foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) - { - MultiConnection *superuserConnection = - groupedLogicalRepTargets->superuserConnection; - if (superuserConnection && - PQstatus(superuserConnection->pgConn) == CONNECTION_OK && - !PQisBusy(superuserConnection->pgConn) - ) - { - continue; - } - WorkerNode *targetWorkerNode = FindNodeWithNodeId( - groupedLogicalRepTargets->nodeId, - false); - superuserConnection = GetNodeUserDatabaseConnection( - connectionFlags, - targetWorkerNode->workerName, - targetWorkerNode->workerPort, - user, - databaseName); - - /* - * Operations on subscriptions cannot run in a transaction block. We - * claim the connections exclusively to ensure they do not get used for - * metadata syncing, which does open a transaction block. - */ - ClaimConnectionExclusively(superuserConnection); - - groupedLogicalRepTargets->superuserConnection = superuserConnection; - - LogicalRepTarget *target = NULL; - foreach_ptr(target, groupedLogicalRepTargets->logicalRepTargetList) - { - target->superuserConnection = superuserConnection; - } - } -} - - /* * CreateGroupedLogicalRepTargetsConnections closes the connections for all of the * nodes in the groupedLogicalRepTargetsHash. diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 9836c132a..5cfc4c59f 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -52,7 +52,7 @@ static ShardCost GetShardCost(uint64 shardId, void *context); PG_FUNCTION_INFO_V1(shard_placement_rebalance_array); PG_FUNCTION_INFO_V1(shard_placement_replication_array); PG_FUNCTION_INFO_V1(worker_node_responsive); -PG_FUNCTION_INFO_V1(run_try_drop_marked_shards); +PG_FUNCTION_INFO_V1(run_try_drop_marked_resources); typedef struct ShardPlacementTestInfo { @@ -75,13 +75,13 @@ typedef struct RebalancePlanContext } RebalancePlacementContext; /* - * run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards. + * run_try_drop_marked_resources is a wrapper to run TryDropOrphanedResources. */ Datum -run_try_drop_marked_shards(PG_FUNCTION_ARGS) +run_try_drop_marked_resources(PG_FUNCTION_ARGS) { bool waitForLocks = false; - TryDropOrphanedShards(waitForLocks); + TryDropOrphanedResources(waitForLocks); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 6cc0330a9..444dd9377 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -627,7 +627,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampDifferenceExceeds(lastShardCleanTime, GetCurrentTimestamp(), DeferShardDeleteInterval)) { - int numberOfDroppedShards = 0; + int numberOfDroppedResources = 0; InvalidateMetadataSystemCache(); StartTransactionCommand(); @@ -646,16 +646,16 @@ CitusMaintenanceDaemonMain(Datum main_arg) lastShardCleanTime = GetCurrentTimestamp(); bool waitForLocks = false; - numberOfDroppedShards = TryDropOrphanedShards(waitForLocks); + numberOfDroppedResources = TryDropOrphanedResources(waitForLocks); } CommitTransactionCommand(); - if (numberOfDroppedShards > 0) + if (numberOfDroppedResources > 0) { - ereport(LOG, (errmsg("maintenance daemon dropped %d distributed " - "shards previously marked to be removed", - numberOfDroppedShards))); + ereport(LOG, (errmsg("maintenance daemon dropped %d " + "resources previously marked to be removed", + numberOfDroppedResources))); } /* make sure we don't wait too long */ diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 1db36402b..838236c79 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -150,12 +150,6 @@ extern char * CreateReplicationSlots(MultiConnection *sourceConnection, List *subscriptionInfoList, char *outputPlugin); extern void EnableSubscriptions(List *subscriptionInfoList); -extern void DropSubscriptions(List *subscriptionInfoList); -extern void DropReplicationSlots(MultiConnection *sourceConnection, - List *subscriptionInfoList); -extern void DropPublications(MultiConnection *sourceConnection, - HTAB *publicationInfoHash); -extern void DropAllLogicalReplicationLeftovers(LogicalRepType type); extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId); extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index af983c428..2412d26e9 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -23,8 +23,8 @@ extern bool CheckAvailableSpaceBeforeMove; extern int NextOperationId; extern int NextCleanupRecordId; -extern int TryDropOrphanedShards(bool waitForLocks); -extern void DropOrphanedShardsInSeparateTransaction(void); +extern int TryDropOrphanedResources(bool waitForLocks); +extern void DropOrphanedResourcesInSeparateTransaction(void); /* Members for cleanup infrastructure */ typedef uint64 OperationId; @@ -36,7 +36,11 @@ extern OperationId CurrentOperationId; typedef enum CleanupObject { CLEANUP_OBJECT_INVALID = 0, - CLEANUP_OBJECT_SHARD_PLACEMENT = 1 + CLEANUP_OBJECT_SHARD_PLACEMENT = 1, + CLEANUP_OBJECT_SUBSCRIPTION = 2, + CLEANUP_OBJECT_REPLICATION_SLOT = 3, + CLEANUP_OBJECT_PUBLICATION = 4, + CLEANUP_OBJECT_USER = 5 } CleanupObject; /* diff --git a/src/test/regress/expected/citus_non_blocking_split_columnar.out b/src/test/regress/expected/citus_non_blocking_split_columnar.out index c58fec967..3098b98c1 100644 --- a/src/test/regress/expected/citus_non_blocking_split_columnar.out +++ b/src/test/regress/expected/citus_non_blocking_split_columnar.out @@ -275,7 +275,7 @@ SELECT pg_reload_conf(); -- END: Split a shard along its co-located shards -- BEGIN: Perform deferred cleanup. CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 13 orphaned resources +NOTICE: cleaned up 11 orphaned resources -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/citus_split_shard_by_split_points.out b/src/test/regress/expected/citus_split_shard_by_split_points.out index 737533c4d..5e4b24190 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points.out @@ -234,7 +234,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- BEGIN: Perform deferred cleanup. CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 3 orphaned resources -- END: Perform deferred cleanup. -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( diff --git a/src/test/regress/expected/failure_create_distributed_table_concurrently.out b/src/test/regress/expected/failure_create_distributed_table_concurrently.out index 27007f598..bd3382256 100644 --- a/src/test/regress/expected/failure_create_distributed_table_concurrently.out +++ b/src/test/regress/expected/failure_create_distributed_table_concurrently.out @@ -189,8 +189,8 @@ SELECT create_distributed_table_concurrently('table_1', 'id'); SELECT * FROM pg_dist_shard WHERE logicalrelid = 'table_1'::regclass; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------------------------------------------------------------- - table_1 | 1880084 | t | -2147483648 | -1 - table_1 | 1880085 | t | 0 | 2147483647 + table_1 | 1880080 | t | -2147483648 | -1 + table_1 | 1880081 | t | 0 | 2147483647 (2 rows) DROP SCHEMA create_dist_tbl_con CASCADE; diff --git a/src/test/regress/expected/failure_on_create_subscription.out b/src/test/regress/expected/failure_on_create_subscription.out index cdb9e822e..ffc327e26 100644 --- a/src/test/regress/expected/failure_on_create_subscription.out +++ b/src/test/regress/expected/failure_on_create_subscription.out @@ -54,8 +54,29 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 1 orphaned resources +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request -- Verify that the shard is not moved and the number of rows are still 100k SELECT * FROM shards_in_workers; shardid | worker @@ -73,12 +94,15 @@ SELECT count(*) FROM t; (1 row) -- Verify that shard can be moved after a temporary failure +-- cleanup leftovers, as it can cause flakiness in the following test files SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 4 orphaned resources SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); master_move_shard_placement --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 280936067..15684a3b1 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -122,15 +122,43 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISAB (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +CONTEXT: while executing command on localhost:xxxxx ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx --- try again +-- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 2 orphaned resources +-- cancel on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 4 orphaned resources +-- try again SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); master_move_shard_placement --------------------------------------------------------------------- @@ -170,8 +198,85 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kil (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it +DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx +CONTEXT: while executing command on localhost:xxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- should succeed with warnings (subscription not dropped) +-- move the shard back +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 2 orphaned resources +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards +-- failure on setting lock_timeout (right before dropping subscriptions & replication slots) +SELECT citus.mitmproxy('conn.onQuery(query="^SET LOCAL lock_timeout").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- should succeed with warnings (objects not dropped) +-- move the shard back +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx + master_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 4 orphaned resources +CALL citus_cleanup_orphaned_shards(); +NOTICE: cleaned up 1 orphaned shards -- cancellation on disabling subscription (right before dropping it) SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')'); mitmproxy @@ -181,16 +286,47 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 4 orphaned resources -- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,9060,t,"DROP SUBSCRIPTION") + (localhost,57637,t,"DROP SUBSCRIPTION") +(2 rows) + +-- cleanup leftovers +-- verify we don't see any error for already dropped subscription +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 3 orphaned resources -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy @@ -227,14 +363,18 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +-- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) +CALL citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 3 orphaned resources -- lets create few more indexes and fail with both -- parallel mode and sequential mode CREATE INDEX index_failure_2 ON t(id); @@ -256,14 +396,9 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- failure on parallel create index ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size; SELECT pg_reload_conf(); @@ -279,8 +414,9 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -ERROR: connection not open -CONTEXT: while executing command on localhost:xxxxx +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index f86a1de1f..5765d3e4f 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -48,16 +48,17 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shard_by_split_points operation failed +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 -(4 rows) + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 +(3 rows) -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -98,7 +99,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 3 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type @@ -144,24 +145,23 @@ NOTICE: cleaned up 4 orphaned resources (1 row) + -- set log level to prevent flakiness + SET client_min_messages TO ERROR; SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry. + RESET client_min_messages; SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 (4 rows) -- we need to allow connection so that we can connect to proxy @@ -224,11 +224,9 @@ NOTICE: cleaned up 4 orphaned resources -- Empty publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty replication slot table SELECT slot_name FROM pg_replication_slots; @@ -262,16 +260,21 @@ WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shard_by_split_points operation failed +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 -(4 rows) + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 +(5 rows) -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -314,7 +317,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 5 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type @@ -333,11 +336,9 @@ NOTICE: cleaned up 4 orphaned resources -- Empty publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty replication slot table SELECT slot_name FROM pg_replication_slots; @@ -367,16 +368,28 @@ NOTICE: cleaned up 4 orphaned resources ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 -(4 rows) + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 +(8 rows) -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -422,7 +435,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type @@ -441,26 +454,21 @@ NOTICE: cleaned up 4 orphaned resources -- Empty publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty replication slot table SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx -(1 row) +(0 rows) --5. Failure on polling last write-ahead log location reported to origin WAL sender \c - postgres - :master_port @@ -478,16 +486,28 @@ NOTICE: cleaned up 4 orphaned resources ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 -(4 rows) + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 +(8 rows) -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -533,7 +553,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type @@ -552,26 +572,21 @@ NOTICE: cleaned up 4 orphaned resources -- Empty publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty replication slot table SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx -(1 row) +(0 rows) --6. Failure on dropping subscription \c - postgres - :master_port @@ -589,15 +604,29 @@ NOTICE: cleaned up 4 orphaned resources ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open +WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx - -- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating - -- any resources. SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; - operation_id | object_type | object_name | node_group_id | policy_type + operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- -(0 rows) + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 + 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 +(8 rows) SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; relname @@ -619,7 +648,8 @@ CONTEXT: while executing command on localhost:xxxxx relname --------------------------------------------------------------------- table_to_split_8981000 -(1 row) + table_to_split_8981003 +(2 rows) -- Left over publications SELECT pubname FROM pg_publication; @@ -646,6 +676,7 @@ CONTEXT: while executing command on localhost:xxxxx \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; operation_id | object_type | object_name | node_group_id | policy_type @@ -664,26 +695,21 @@ CONTEXT: while executing command on localhost:xxxxx -- Empty publications SELECT pubname FROM pg_publication; - pubname + pubname --------------------------------------------------------------------- - citus_shard_split_publication_xxxxxxx_xxxxxxx - citus_shard_split_publication_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty replication slot table SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_shard_split_slot_xxxxxxx_xxxxxxx - citus_shard_split_slot_xxxxxxx_xxxxxxx -(2 rows) +(0 rows) -- Empty subscriptions SELECT subname FROM pg_subscription; - subname + subname --------------------------------------------------------------------- - citus_shard_split_subscription_xxxxxxx -(1 row) +(0 rows) -- Cleanup \c - postgres - :master_port diff --git a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out index b36ce1f08..a5f2cf5fd 100644 --- a/src/test/regress/expected/failure_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/failure_tenant_isolation_nonblocking.out @@ -18,6 +18,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +-- cleanup leftovers if any +CALL citus_cleanup_orphaned_resources(); CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); SELECT create_distributed_table('table_1', 'id'); @@ -255,7 +257,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").cancel(' SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").killall()'); mitmproxy --------------------------------------------------------------------- @@ -264,6 +266,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy @@ -273,8 +283,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- failure on dropping publication -SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); mitmproxy --------------------------------------------------------------------- @@ -283,6 +301,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- cancellation on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); mitmproxy @@ -292,8 +318,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- failure on dropping replication slot -SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); mitmproxy --------------------------------------------------------------------- @@ -302,6 +336,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").ki SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- cancellation on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); mitmproxy @@ -311,6 +353,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").ca SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +CALL citus_cleanup_orphaned_resources(); -- failure on foreign key creation SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); mitmproxy diff --git a/src/test/regress/expected/ignoring_orphaned_shards.out b/src/test/regress/expected/ignoring_orphaned_shards.out index 2da99e43f..a43dd9d59 100644 --- a/src/test/regress/expected/ignoring_orphaned_shards.out +++ b/src/test/regress/expected/ignoring_orphaned_shards.out @@ -212,9 +212,6 @@ SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER B rep2 (2 rows) --- cannot copy from an orphaned shard -SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); -ERROR: source placement must be in active state -- Make sure we don't send a query to the orphaned shard BEGIN; SET LOCAL citus.log_remote_commands TO ON; @@ -288,7 +285,7 @@ SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localh (1 row) CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 3 orphaned shards +NOTICE: cleaned up 1 orphaned shards SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid; shardid | shardstate | nodeport --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_blocking_shard_split.out b/src/test/regress/expected/isolation_blocking_shard_split.out index 86d50e3e3..5b7ff549b 100644 --- a/src/test/regress/expected/isolation_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_blocking_shard_split.out @@ -1131,9 +1131,9 @@ step s2-blocking-shard-split: 'block_writes'); step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -1162,9 +1162,9 @@ citus_split_shard_by_split_points (1 row) step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -1199,9 +1199,9 @@ step s2-blocking-shard-split: 'block_writes'); step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -1230,9 +1230,9 @@ citus_split_shard_by_split_points (1 row) step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) diff --git a/src/test/regress/expected/isolation_non_blocking_shard_split.out b/src/test/regress/expected/isolation_non_blocking_shard_split.out index 65356f2d6..5c8b27213 100644 --- a/src/test/regress/expected/isolation_non_blocking_shard_split.out +++ b/src/test/regress/expected/isolation_non_blocking_shard_split.out @@ -972,9 +972,9 @@ step s2-non-blocking-shard-split: 'force_logical'); step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -982,13 +982,21 @@ run_try_drop_marked_shards step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002| 1| 1 -public.to_split_table_1500003| 1| 1 -public.to_split_table_1500001| 1| 0 -public.to_split_table_1500003| 1| 0 -(4 rows) +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_10| 5| 0 +citus_shard_split_subscription_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_10| 5| 0 +citus_shard_split_subscription_xxxxxxx | 2| 0 +(12 rows) step s1-release-split-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -1005,9 +1013,9 @@ citus_split_shard_by_split_points (1 row) step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -1042,9 +1050,9 @@ step s2-non-blocking-shard-split: 'force_logical'); step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) @@ -1052,13 +1060,21 @@ run_try_drop_marked_shards step s1-show-pg_dist_cleanup: SELECT object_name, object_type, policy_type FROM pg_dist_cleanup; -object_name |object_type|policy_type +object_name |object_type|policy_type --------------------------------------------------------------------- -public.to_split_table_1500002| 1| 1 -public.to_split_table_1500003| 1| 1 -public.to_split_table_1500001| 1| 0 -public.to_split_table_1500003| 1| 0 -(4 rows) +public.to_split_table_1500002 | 1| 1 +public.to_split_table_1500003 | 1| 1 +public.to_split_table_1500001 | 1| 0 +public.to_split_table_1500003 | 1| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0 +citus_shard_split_subscription_role_10| 5| 0 +citus_shard_split_subscription_xxxxxxx | 2| 0 +citus_shard_split_subscription_role_10| 5| 0 +citus_shard_split_subscription_xxxxxxx | 2| 0 +(12 rows) step s1-release-split-advisory-lock: SELECT pg_advisory_unlock(44000, 55152); @@ -1075,9 +1091,9 @@ citus_split_shard_by_split_points (1 row) step s1-run-cleaner: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index eb8659d72..d915703f7 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -177,9 +177,9 @@ step s1-lock-pg-dist-placement: s2: DEBUG: could not acquire shard lock to cleanup placements step s2-drop-old-shards: - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); -run_try_drop_marked_shards +run_try_drop_marked_resources --------------------------------------------------------------------- (1 row) diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index 866df4037..0b2585bfb 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -101,10 +101,12 @@ SELECT citus_remove_node('localhost', :master_port); (1 row) +-- the subscription is still there, as there is no cleanup record for it +-- we have created it manually SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) from pg_publication; @@ -127,6 +129,8 @@ SELECT count(*) from dist; \c - - - :worker_1_port SET search_path TO logical_replication; +-- the publication and repslot are still there, as there are no cleanup records for them +-- we have created them manually SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- @@ -136,13 +140,13 @@ SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) from pg_replication_slots; count --------------------------------------------------------------------- - 0 + 1 (1 row) SELECT count(*) from dist; @@ -180,4 +184,7 @@ SELECT count(*) from dist; \c - - - :master_port SET search_path TO logical_replication; SET client_min_messages TO WARNING; +ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE; +ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE); +DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid; DROP SCHEMA logical_replication CASCADE; diff --git a/src/test/regress/expected/multi_colocated_shard_rebalance.out b/src/test/regress/expected/multi_colocated_shard_rebalance.out index 613050fc8..749a565cf 100644 --- a/src/test/regress/expected/multi_colocated_shard_rebalance.out +++ b/src/test/regress/expected/multi_colocated_shard_rebalance.out @@ -565,7 +565,7 @@ SELECT master_move_shard_placement(13000034, 'localhost', :worker_1_port, 'local (1 row) CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 5 orphaned shards +NOTICE: cleaned up 1 orphaned shards -- confirm the successfull move SELECT * FROM run_command_on_placements('serial_move_test', 'SELECT DISTINCT key FROM %s WHERE key = 15') WHERE result = '15' AND shardid = 13000034; nodename | nodeport | shardid | success | result diff --git a/src/test/regress/expected/multi_tenant_isolation.out b/src/test/regress/expected/multi_tenant_isolation.out index 4063c0703..7277926be 100644 --- a/src/test/regress/expected/multi_tenant_isolation.out +++ b/src/test/regress/expected/multi_tenant_isolation.out @@ -450,7 +450,7 @@ ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 24 orphaned resources +NOTICE: cleaned up 2 orphaned resources -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port SET search_path to "Tenant Isolation"; @@ -1018,7 +1018,7 @@ SELECT count(*) FROM test_colocated_table_2; \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 4 orphaned resources +NOTICE: cleaned up 3 orphaned resources \c - postgres - :worker_1_port -- show the foreign keys of the main table & its colocated shard on other tables SELECT tbl.relname, fk."Constraint", fk."Definition" diff --git a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out index 55e0741ce..b2c8d62af 100644 --- a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out @@ -260,6 +260,8 @@ SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_t ERROR: table lineitem_streaming has already been isolated for the given value SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); ERROR: table orders_streaming has already been isolated for the given value +CALL pg_catalog.citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 2 orphaned resources -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); isolate_tenant_to_new_shard @@ -273,6 +275,8 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', s 1230047 (1 row) +CALL pg_catalog.citus_cleanup_orphaned_resources(); +NOTICE: cleaned up 2 orphaned resources SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; count --------------------------------------------------------------------- @@ -450,7 +454,6 @@ ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046". \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 30 orphaned resources -- connect to the worker node with metadata \c - mx_isolation_role_ent - :worker_1_port SET search_path to "Tenant Isolation"; @@ -821,93 +824,6 @@ SET citus.override_table_visibility TO false; (18 rows) DROP EVENT TRIGGER abort_ddl; --- create a trigger for drops -SET citus.enable_metadata_sync TO OFF; -CREATE OR REPLACE FUNCTION abort_drop_command() - RETURNS event_trigger - LANGUAGE plpgsql - AS $$ -BEGIN - RAISE EXCEPTION 'command % is disabled', tg_tag; -END; -$$; -RESET citus.enable_metadata_sync; -CREATE EVENT TRIGGER abort_drop ON sql_drop - EXECUTE PROCEDURE abort_drop_command(); -\c - mx_isolation_role_ent - :master_port -SET search_path to "Tenant Isolation"; -\set VERBOSITY terse -SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical'); -ERROR: command DROP SUBSCRIPTION is disabled -\set VERBOSITY default --- check if metadata is changed -SELECT * FROM pg_dist_shard - WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass - ORDER BY shardminvalue::BIGINT, logicalrelid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------------------------------------------------------------- - lineitem_streaming | 1230040 | t | -2147483648 | -2147483648 - orders_streaming | 1230042 | t | -2147483648 | -2147483648 - lineitem_streaming | 1230041 | t | -2147483647 | -136164586 - orders_streaming | 1230043 | t | -2147483647 | -136164586 - lineitem_streaming | 1230035 | t | -136164585 | -136164585 - orders_streaming | 1230038 | t | -136164585 | -136164585 - lineitem_streaming | 1230036 | t | -136164584 | -85071815 - orders_streaming | 1230039 | t | -136164584 | -85071815 - lineitem_streaming | 1230011 | t | -85071814 | -85071814 - orders_streaming | 1230014 | t | -85071814 | -85071814 - lineitem_streaming | 1230012 | t | -85071813 | -1 - orders_streaming | 1230015 | t | -85071813 | -1 - lineitem_streaming | 1230004 | t | 0 | 108199380 - orders_streaming | 1230007 | t | 0 | 108199380 - lineitem_streaming | 1230005 | t | 108199381 | 108199381 - orders_streaming | 1230008 | t | 108199381 | 108199381 - lineitem_streaming | 1230028 | t | 108199382 | 412880111 - orders_streaming | 1230031 | t | 108199382 | 412880111 - lineitem_streaming | 1230029 | t | 412880112 | 412880112 - orders_streaming | 1230032 | t | 412880112 | 412880112 - lineitem_streaming | 1230044 | t | 412880113 | 2147483646 - orders_streaming | 1230046 | t | 412880113 | 2147483646 - lineitem_streaming | 1230045 | t | 2147483647 | 2147483647 - orders_streaming | 1230047 | t | 2147483647 | 2147483647 -(24 rows) - -\c - - - :worker_1_port -SET search_path to "Tenant Isolation"; --- however, new tables are already created -SET citus.override_table_visibility TO false; -\d - List of relations - Schema | Name | Type | Owner ---------------------------------------------------------------------- - Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230061 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230062 | table | mx_isolation_role_ent - Tenant Isolation | lineitem_streaming_1230063 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230064 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230065 | table | mx_isolation_role_ent - Tenant Isolation | orders_streaming_1230066 | table | mx_isolation_role_ent - Tenant Isolation | text_column | table | mx_isolation_role_ent - Tenant Isolation | text_column_1230052 | table | mx_isolation_role_ent - Tenant Isolation | text_column_1230053 | table | mx_isolation_role_ent - Tenant Isolation | text_column_1230054 | table | mx_isolation_role_ent -(24 rows) - -\c - postgres - :worker_1_port -DROP EVENT TRIGGER abort_drop; \c - mx_isolation_role_ent - :master_port SET search_path to "Tenant Isolation"; -- tests for cluster health @@ -1130,7 +1046,7 @@ DROP TABLE test_colocated_table_no_rep_identity; SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard_transfer_mode => 'auto'); isolate_tenant_to_new_shard --------------------------------------------------------------------- - 1230108 + 1230102 (1 row) SELECT count(*) FROM test_colocated_table_2; @@ -1141,7 +1057,7 @@ SELECT count(*) FROM test_colocated_table_2; \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 10 orphaned resources +NOTICE: cleaned up 3 orphaned resources \c - postgres - :worker_1_port -- show the foreign keys of the main table & its colocated shard on other tables SELECT tbl.relname, fk."Constraint", fk."Definition" @@ -1152,47 +1068,47 @@ ORDER BY 1, 2; relname | Constraint | Definition --------------------------------------------------------------------- test_colocated_table_1 | test_colocated_table_1_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id) - test_colocated_table_1_1230074 | test_colocated_table_1_id_fkey_1230074 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id) - test_colocated_table_1_1230076 | test_colocated_table_1_id_fkey_1230076 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id) - test_colocated_table_1_1230078 | test_colocated_table_1_id_fkey_1230078 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id) - test_colocated_table_1_1230104 | test_colocated_table_1_id_fkey_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id) - test_colocated_table_1_1230105 | test_colocated_table_1_id_fkey_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id) - test_colocated_table_1_1230106 | test_colocated_table_1_id_fkey_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id) + test_colocated_table_1_1230068 | test_colocated_table_1_id_fkey_1230068 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id) + test_colocated_table_1_1230070 | test_colocated_table_1_id_fkey_1230070 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id) + test_colocated_table_1_1230072 | test_colocated_table_1_id_fkey_1230072 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id) + test_colocated_table_1_1230098 | test_colocated_table_1_id_fkey_1230098 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id) + test_colocated_table_1_1230099 | test_colocated_table_1_id_fkey_1230099 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id) + test_colocated_table_1_1230100 | test_colocated_table_1_id_fkey_1230100 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id) test_colocated_table_2 | test_colocated_table_2_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id) test_colocated_table_2 | test_colocated_table_2_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id) - test_colocated_table_2_1230082 | test_colocated_table_2_id_fkey_1230082 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id) - test_colocated_table_2_1230082 | test_colocated_table_2_value_1_fkey_1230082 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_2_1230084 | test_colocated_table_2_id_fkey_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id) - test_colocated_table_2_1230084 | test_colocated_table_2_value_1_fkey_1230084 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_2_1230086 | test_colocated_table_2_id_fkey_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id) - test_colocated_table_2_1230086 | test_colocated_table_2_value_1_fkey_1230086 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_2_1230107 | test_colocated_table_2_id_fkey_1230107 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id) - test_colocated_table_2_1230107 | test_colocated_table_2_value_1_fkey_1230107 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_2_1230108 | test_colocated_table_2_id_fkey_1230108 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id) - test_colocated_table_2_1230108 | test_colocated_table_2_value_1_fkey_1230108 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_2_1230109 | test_colocated_table_2_id_fkey_1230109 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id) - test_colocated_table_2_1230109 | test_colocated_table_2_value_1_fkey_1230109 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) + test_colocated_table_2_1230076 | test_colocated_table_2_id_fkey_1230076 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id) + test_colocated_table_2_1230076 | test_colocated_table_2_value_1_fkey_1230076 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_2_1230078 | test_colocated_table_2_id_fkey_1230078 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id) + test_colocated_table_2_1230078 | test_colocated_table_2_value_1_fkey_1230078 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_2_1230080 | test_colocated_table_2_id_fkey_1230080 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id) + test_colocated_table_2_1230080 | test_colocated_table_2_value_1_fkey_1230080 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_2_1230101 | test_colocated_table_2_id_fkey_1230101 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id) + test_colocated_table_2_1230101 | test_colocated_table_2_value_1_fkey_1230101 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_2_1230102 | test_colocated_table_2_id_fkey_1230102 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id) + test_colocated_table_2_1230102 | test_colocated_table_2_value_1_fkey_1230102 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_2_1230103 | test_colocated_table_2_id_fkey_1230103 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id) + test_colocated_table_2_1230103 | test_colocated_table_2_value_1_fkey_1230103 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) test_colocated_table_3 | test_colocated_table_3_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id) test_colocated_table_3 | test_colocated_table_3_id_fkey1 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2(id) test_colocated_table_3 | test_colocated_table_3_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id) - test_colocated_table_3_1230090 | test_colocated_table_3_id_fkey1_1230090 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230082(id) - test_colocated_table_3_1230090 | test_colocated_table_3_id_fkey_1230090 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id) - test_colocated_table_3_1230090 | test_colocated_table_3_value_1_fkey_1230090 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_3_1230092 | test_colocated_table_3_id_fkey1_1230092 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230084(id) - test_colocated_table_3_1230092 | test_colocated_table_3_id_fkey_1230092 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id) - test_colocated_table_3_1230092 | test_colocated_table_3_value_1_fkey_1230092 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_3_1230094 | test_colocated_table_3_id_fkey1_1230094 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230086(id) - test_colocated_table_3_1230094 | test_colocated_table_3_id_fkey_1230094 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id) - test_colocated_table_3_1230094 | test_colocated_table_3_value_1_fkey_1230094 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_3_1230110 | test_colocated_table_3_id_fkey1_1230110 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230107(id) - test_colocated_table_3_1230110 | test_colocated_table_3_id_fkey_1230110 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id) - test_colocated_table_3_1230110 | test_colocated_table_3_value_1_fkey_1230110 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_3_1230111 | test_colocated_table_3_id_fkey1_1230111 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230108(id) - test_colocated_table_3_1230111 | test_colocated_table_3_id_fkey_1230111 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id) - test_colocated_table_3_1230111 | test_colocated_table_3_value_1_fkey_1230111 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) - test_colocated_table_3_1230112 | test_colocated_table_3_id_fkey1_1230112 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230109(id) - test_colocated_table_3_1230112 | test_colocated_table_3_id_fkey_1230112 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id) - test_colocated_table_3_1230112 | test_colocated_table_3_value_1_fkey_1230112 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id) + test_colocated_table_3_1230084 | test_colocated_table_3_id_fkey1_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230076(id) + test_colocated_table_3_1230084 | test_colocated_table_3_id_fkey_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id) + test_colocated_table_3_1230084 | test_colocated_table_3_value_1_fkey_1230084 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_3_1230086 | test_colocated_table_3_id_fkey1_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230078(id) + test_colocated_table_3_1230086 | test_colocated_table_3_id_fkey_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id) + test_colocated_table_3_1230086 | test_colocated_table_3_value_1_fkey_1230086 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_3_1230088 | test_colocated_table_3_id_fkey1_1230088 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230080(id) + test_colocated_table_3_1230088 | test_colocated_table_3_id_fkey_1230088 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id) + test_colocated_table_3_1230088 | test_colocated_table_3_value_1_fkey_1230088 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_3_1230104 | test_colocated_table_3_id_fkey1_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230101(id) + test_colocated_table_3_1230104 | test_colocated_table_3_id_fkey_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id) + test_colocated_table_3_1230104 | test_colocated_table_3_value_1_fkey_1230104 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey1_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230102(id) + test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id) + test_colocated_table_3_1230105 | test_colocated_table_3_value_1_fkey_1230105 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) + test_colocated_table_3_1230106 | test_colocated_table_3_id_fkey1_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230103(id) + test_colocated_table_3_1230106 | test_colocated_table_3_id_fkey_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id) + test_colocated_table_3_1230106 | test_colocated_table_3_value_1_fkey_1230106 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id) (42 rows) \c - mx_isolation_role_ent - :master_port diff --git a/src/test/regress/expected/shard_move_deferred_delete.out b/src/test/regress/expected/shard_move_deferred_delete.out index d8be7068d..b0f2e32c0 100644 --- a/src/test/regress/expected/shard_move_deferred_delete.out +++ b/src/test/regress/expected/shard_move_deferred_delete.out @@ -196,7 +196,7 @@ $cmd$); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,1) - (localhost,57638,t,1) + (localhost,57638,t,0) (2 rows) -- override the function for testing purpose diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index baeee5fd7..88c13b958 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -200,6 +200,8 @@ BEGIN; SET citus.log_remote_commands TO ON; SET SESSION citus.max_adaptive_executor_pool_size TO 5; SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes'); +NOTICE: issuing CALL citus_cleanup_orphaned_resources() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing CALL citus_cleanup_orphaned_shards() DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ... @@ -1754,7 +1756,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 4 orphaned shards +NOTICE: cleaned up 2 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1853,7 +1855,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 3 orphaned shards +NOTICE: cleaned up 1 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- @@ -1928,7 +1930,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ... (1 row) CALL citus_cleanup_orphaned_shards(); -NOTICE: cleaned up 4 orphaned shards +NOTICE: cleaned up 1 orphaned shards SELECT * FROM public.table_placements_per_node; nodeport | logicalrelid | count --------------------------------------------------------------------- diff --git a/src/test/regress/spec/isolation_blocking_shard_split.spec b/src/test/regress/spec/isolation_blocking_shard_split.spec index 86611cc2a..57876a8d6 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split.spec @@ -7,7 +7,7 @@ setup SELECT setval('pg_dist_shardid_seq', 1500000); -- Cleanup any orphan shards that might be left over from a previous run. - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; @@ -18,7 +18,7 @@ setup teardown { - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); DROP TABLE to_split_table; } @@ -104,7 +104,7 @@ step "s1-release-split-advisory-lock" step "s1-run-cleaner" { - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); } step "s1-show-pg_dist_cleanup" diff --git a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec index 7b4d75c46..b3baebd9a 100644 --- a/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec +++ b/src/test/regress/spec/isolation_blocking_shard_split_with_fkey_to_reference.spec @@ -14,11 +14,11 @@ setup teardown { -- Cleanup any orphan shards that might be left over from a previous run. - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); DROP TABLE table_to_split CASCADE; DROP TABLE reference_table CASCADE; diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split.spec b/src/test/regress/spec/isolation_non_blocking_shard_split.spec index d5065d66b..a59c89f19 100644 --- a/src/test/regress/spec/isolation_non_blocking_shard_split.spec +++ b/src/test/regress/spec/isolation_non_blocking_shard_split.spec @@ -13,7 +13,7 @@ setup SELECT setval('pg_dist_shardid_seq', 1500000); -- Cleanup any orphan shards that might be left over from a previous run. - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; @@ -24,7 +24,7 @@ setup teardown { - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); DROP TABLE to_split_table; } @@ -64,7 +64,7 @@ step "s1-release-split-advisory-lock" step "s1-run-cleaner" { - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); } step "s1-start-connection" diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split_fkey.spec b/src/test/regress/spec/isolation_non_blocking_shard_split_fkey.spec index eba2b6f6a..588c086ab 100644 --- a/src/test/regress/spec/isolation_non_blocking_shard_split_fkey.spec +++ b/src/test/regress/spec/isolation_non_blocking_shard_split_fkey.spec @@ -20,11 +20,11 @@ setup teardown { -- Cleanup any orphan shards that might be left over from a previous run. - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); DROP TABLE table_to_split CASCADE; DROP TABLE reference_table CASCADE; diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec index 79f49d6a5..a92bb6a13 100644 --- a/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec +++ b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec @@ -19,11 +19,11 @@ setup teardown { -- Cleanup any orphan shards that might be left over from a previous run. - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); DROP TABLE to_split_table CASCADE; } diff --git a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec index f27378fe5..fa4c25ad1 100644 --- a/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec +++ b/src/test/regress/spec/isolation_rebalancer_deferred_drop.spec @@ -3,7 +3,7 @@ setup { SET citus.enable_metadata_sync TO off; - CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() + CREATE OR REPLACE FUNCTION run_try_drop_marked_resources() RETURNS VOID AS 'citus' LANGUAGE C STRICT VOLATILE; @@ -83,7 +83,7 @@ step "s1-commit" session "s2" step "s2-drop-old-shards" { - SELECT run_try_drop_marked_shards(); + SELECT run_try_drop_marked_resources(); } step "s2-start-session-level-connection" diff --git a/src/test/regress/sql/failure_on_create_subscription.sql b/src/test/regress/sql/failure_on_create_subscription.sql index 0a464da17..a8c199775 100644 --- a/src/test/regress/sql/failure_on_create_subscription.sql +++ b/src/test/regress/sql/failure_on_create_subscription.sql @@ -42,12 +42,21 @@ SELECT * FROM shards_in_workers; SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + +SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + -- Verify that the shard is not moved and the number of rows are still 100k SELECT * FROM shards_in_workers; SELECT count(*) FROM t; -- Verify that shard can be moved after a temporary failure +-- cleanup leftovers, as it can cause flakiness in the following test files SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT * FROM shards_in_workers; SELECT count(*) FROM t; diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index a8bec48da..ce33ce6e0 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -68,8 +68,20 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- failure on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); --- try again + +-- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + +-- cancel on dropping subscription +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + +-- try again SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); @@ -84,20 +96,50 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' -- failure on disabling subscription (right before dropping it) SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- should succeed with warnings (subscription not dropped) +-- move the shard back +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); + +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); +CALL citus_cleanup_orphaned_shards(); + +-- failure on setting lock_timeout (right before dropping subscriptions & replication slots) +SELECT citus.mitmproxy('conn.onQuery(query="^SET LOCAL lock_timeout").kill()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- should succeed with warnings (objects not dropped) +-- move the shard back +SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port); + +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); +CALL citus_cleanup_orphaned_shards(); -- cancellation on disabling subscription (right before dropping it) SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +SELECT citus.mitmproxy('conn.allow()'); +-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup +SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); +-- cleanup leftovers +-- verify we don't see any error for already dropped subscription +CALL citus_cleanup_orphaned_resources(); + -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); - -- failure on creating the primary key SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); @@ -110,7 +152,10 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- lets create few more indexes and fail with both -- parallel mode and sequential mode CREATE INDEX index_failure_2 ON t(id); @@ -125,8 +170,6 @@ SELECT pg_reload_conf(); SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.allow()'); - -- failure on parallel create index ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size; SELECT pg_reload_conf(); diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 21ed31885..03b6cb45c 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -81,11 +81,16 @@ SELECT create_distributed_table('table_to_split', 'id'); SET citus.next_cleanup_record_id TO 11; SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()'); + + -- set log level to prevent flakiness + SET client_min_messages TO ERROR; SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); + RESET client_min_messages; + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; -- we need to allow connection so that we can connect to proxy @@ -270,8 +275,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY['-100000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); - -- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating - -- any resources. + SELECT operation_id, object_type, object_name, node_group_id, policy_type FROM pg_dist_cleanup where operation_id = 777; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; diff --git a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql index 8b66d11a1..50cad7162 100644 --- a/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql +++ b/src/test/regress/sql/failure_tenant_isolation_nonblocking.sql @@ -16,6 +16,9 @@ SET citus.max_adaptive_executor_pool_size TO 1; SELECT pg_backend_pid() as pid \gset SELECT citus.mitmproxy('conn.allow()'); +-- cleanup leftovers if any +CALL citus_cleanup_orphaned_resources(); + CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); @@ -126,29 +129,53 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").cancel(' SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); -- failure on dropping subscription -SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").killall()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- failure on dropping publication -SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- cancellation on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- failure on dropping replication slot -SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- cancellation on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); +-- cleanup leftovers +SELECT citus.mitmproxy('conn.allow()'); +CALL citus_cleanup_orphaned_resources(); + -- failure on foreign key creation SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); diff --git a/src/test/regress/sql/ignoring_orphaned_shards.sql b/src/test/regress/sql/ignoring_orphaned_shards.sql index fffd43f92..937587e2e 100644 --- a/src/test/regress/sql/ignoring_orphaned_shards.sql +++ b/src/test/regress/sql/ignoring_orphaned_shards.sql @@ -82,9 +82,6 @@ SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER B SELECT update_distributed_table_colocation('rep2', 'rep1'); SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1; --- cannot copy from an orphaned shard -SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port); - -- Make sure we don't send a query to the orphaned shard BEGIN; SET LOCAL citus.log_remote_commands TO ON; diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index 94b08a5d1..f155aaa49 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -55,6 +55,8 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho SELECT citus_remove_node('localhost', :master_port); +-- the subscription is still there, as there is no cleanup record for it +-- we have created it manually SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; @@ -63,7 +65,8 @@ SELECT count(*) from dist; \c - - - :worker_1_port SET search_path TO logical_replication; - +-- the publication and repslot are still there, as there are no cleanup records for them +-- we have created them manually SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; SELECT count(*) from pg_replication_slots; @@ -81,4 +84,7 @@ SELECT count(*) from dist; SET search_path TO logical_replication; SET client_min_messages TO WARNING; +ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE; +ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE); +DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid; DROP SCHEMA logical_replication CASCADE; diff --git a/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql b/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql index 400e78d33..f5f2997da 100644 --- a/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql +++ b/src/test/regress/sql/multi_tenant_isolation_nonblocking.sql @@ -175,10 +175,14 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE', shard_tra SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical'); +CALL pg_catalog.citus_cleanup_orphaned_resources(); + -- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647 SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical'); SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', shard_transfer_mode => 'force_logical'); +CALL pg_catalog.citus_cleanup_orphaned_resources(); + SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554; SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1686493264; @@ -393,45 +397,6 @@ SET citus.override_table_visibility TO false; DROP EVENT TRIGGER abort_ddl; --- create a trigger for drops -SET citus.enable_metadata_sync TO OFF; -CREATE OR REPLACE FUNCTION abort_drop_command() - RETURNS event_trigger - LANGUAGE plpgsql - AS $$ -BEGIN - RAISE EXCEPTION 'command % is disabled', tg_tag; -END; -$$; -RESET citus.enable_metadata_sync; - -CREATE EVENT TRIGGER abort_drop ON sql_drop - EXECUTE PROCEDURE abort_drop_command(); - -\c - mx_isolation_role_ent - :master_port -SET search_path to "Tenant Isolation"; - -\set VERBOSITY terse -SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical'); - -\set VERBOSITY default - --- check if metadata is changed -SELECT * FROM pg_dist_shard - WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass - ORDER BY shardminvalue::BIGINT, logicalrelid; - -\c - - - :worker_1_port -SET search_path to "Tenant Isolation"; - --- however, new tables are already created -SET citus.override_table_visibility TO false; -\d - -\c - postgres - :worker_1_port - -DROP EVENT TRIGGER abort_drop; - \c - mx_isolation_role_ent - :master_port SET search_path to "Tenant Isolation";