Cleanup for shard moves (#6472)

DESCRIPTION: Extend cleanup process for replication artifacts

This PR adds new cleanup record types for:
* Subscriptions
* Replication slots
* Publications
* Users created for subscriptions

We add records for these object types, to `pg_dist_cleanup` during
creation phase. Once the operation is done, in case of success or
failure, we iterate those records and drop the objects. With this PR we
will not be dropping any of these objects during the operation. In
short, we will always be deferring the drop.

One thing that's worth mentioning is that we sort cleanup records before
processing (dropping) them, because of dependency relations among those
objects, e.g a subscription might depend on a publication. Therefore, we
always drop subscriptions before publications.

We have some renames in this PR:
* `TryDropOrphanedShards` -> `TryDropOrphanedResources`
* `DropOrphanedShardsForCleanup` -> `DropOrphanedResourcesForCleanup`
* `run_try_drop_marked_shards` -> `run_try_drop_marked_resources`
as these functions now process replication artifacts as well.

This PR drops function `DropAllLogicalReplicationLeftovers` and its all
usages, since now we rely on the deferring drop mechanism.
pull/6532/head
Ahmet Gedemenli 2022-11-30 15:38:05 +03:00 committed by GitHub
parent 1f8675da43
commit 0e92244bfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 1021 additions and 932 deletions

View File

@ -61,6 +61,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/shard_transfer.h"
@ -355,6 +356,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
"citus.shard_replication_factor > 1")));
}
DropOrphanedResourcesInSeparateTransaction();
EnsureCitusTableCanBeCreated(relationId);
EnsureValidDistributionColumn(relationId, distributionColumnName);

View File

@ -33,6 +33,10 @@
#include "distributed/worker_transaction.h"
#include "distributed/pg_dist_cleanup.h"
#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots"
#define STR_ERRCODE_OBJECT_IN_USE "55006"
#define STR_ERRCODE_UNDEFINED_OBJECT "42704"
/* GUC configuration for shard cleaner */
int NextOperationId = 0;
int NextCleanupRecordId = 0;
@ -72,10 +76,22 @@ PG_FUNCTION_INFO_V1(isolation_cleanup_orphaned_shards);
PG_FUNCTION_INFO_V1(citus_cleanup_orphaned_resources);
static int DropOrphanedShardsForMove(bool waitForLocks);
static bool TryDropShardOutsideTransaction(OperationId operationId,
char *qualifiedTableName,
static bool TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
char *nodeName,
int nodePort);
static bool TryDropShardOutsideTransaction(char *qualifiedTableName,
char *nodeName,
int nodePort);
static bool TryDropSubscriptionOutsideTransaction(char *subscriptionName,
char *nodeName,
int nodePort);
static bool TryDropPublicationOutsideTransaction(char *publicationName,
char *nodeName,
int nodePort);
static bool TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort);
static bool TryDropUserOutsideTransaction(char *username, char *nodeName, int nodePort);
static bool TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode);
/* Functions for cleanup infrastructure */
@ -91,7 +107,9 @@ static void DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId);
static bool CleanupRecordExists(uint64 recordId);
static List * ListCleanupRecords(void);
static List * ListCleanupRecordsForCurrentOperation(void);
static int DropOrphanedShardsForCleanup(void);
static int DropOrphanedResourcesForCleanup(void);
static int CompareCleanupRecordsByObjectType(const void *leftElement,
const void *rightElement);
/*
* citus_cleanup_orphaned_shards implements a user-facing UDF to delete
@ -154,8 +172,6 @@ isolation_cleanup_orphaned_shards(PG_FUNCTION_ARGS)
* orphaned resources that are present in the system. These resources are
* orphaned by previous actions that either failed or marked the resources
* for deferred cleanup.
* The UDF only supports dropping shards at the moment but will be extended in
* near future to clean any type of resource.
*
* The function takes no arguments and runs on co-ordinator. It cannot be run in a
* transaction, because holding the locks it takes for a long time is not good.
@ -169,7 +185,7 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
EnsureCoordinator();
PreventInTransactionBlock(true, "citus_cleanup_orphaned_resources");
int droppedCount = DropOrphanedShardsForCleanup();
int droppedCount = DropOrphanedResourcesForCleanup();
if (droppedCount > 0)
{
ereport(NOTICE, (errmsg("cleaned up %d orphaned resources", droppedCount)));
@ -180,33 +196,34 @@ citus_cleanup_orphaned_resources(PG_FUNCTION_ARGS)
/*
* DropOrphanedShardsInSeparateTransaction cleans up orphaned shards by
* DropOrphanedResourcesInSeparateTransaction cleans up orphaned resources by
* connecting to localhost. This is done, so that the locks that
* DropOrphanedShardsForMove takes are only held for a short time.
*/
void
DropOrphanedShardsInSeparateTransaction(void)
DropOrphanedResourcesInSeparateTransaction(void)
{
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_resources()");
ExecuteCriticalRemoteCommand(connection, "CALL citus_cleanup_orphaned_shards()");
CloseConnection(connection);
}
/*
* TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove and
* DropOrphanedShardsForCleanup that catches any errors to make it safe to
* TryDropOrphanedResources is a wrapper around DropOrphanedShardsForMove and
* DropOrphanedResourcesForCleanup that catches any errors to make it safe to
* use in the maintenance daemon.
*
* If dropping any of the shards failed this function returns -1, otherwise it
* returns the number of dropped shards.
* returns the number of dropped resources.
*/
int
TryDropOrphanedShards(bool waitForLocks)
TryDropOrphanedResources(bool waitForLocks)
{
int droppedShardCount = 0;
int droppedResourceCount = 0;
MemoryContext savedContext = CurrentMemoryContext;
/*
@ -217,8 +234,8 @@ TryDropOrphanedShards(bool waitForLocks)
PG_TRY();
{
droppedShardCount = DropOrphanedShardsForMove(waitForLocks);
droppedShardCount += DropOrphanedShardsForCleanup();
droppedResourceCount = DropOrphanedShardsForMove(waitForLocks);
droppedResourceCount += DropOrphanedResourcesForCleanup();
/*
* Releasing a subtransaction doesn't free its memory context, since the
@ -241,19 +258,19 @@ TryDropOrphanedShards(bool waitForLocks)
}
PG_END_TRY();
return droppedShardCount;
return droppedResourceCount;
}
/*
* DropOrphanedShardsForCleanup removes resources that were marked for cleanup by operation.
* DropOrphanedResourcesForCleanup removes resources that were marked for cleanup by operation.
* It does so by trying to take an exclusive lock on the resources. If the lock cannot be
* obtained it skips the resource and continues with others.
* The resource that has been skipped will be removed at a later iteration when there are no
* locks held anymore.
*/
static int
DropOrphanedShardsForCleanup()
DropOrphanedResourcesForCleanup()
{
/* Only runs on Coordinator */
if (!IsCoordinator())
@ -263,20 +280,19 @@ DropOrphanedShardsForCleanup()
List *cleanupRecordList = ListCleanupRecords();
int removedShardCountForCleanup = 0;
int failedShardCountForCleanup = 0;
/*
* We sort the records before cleaning up by their types, because of dependencies.
* For example, a subscription might depend on a publication.
*/
cleanupRecordList = SortList(cleanupRecordList,
CompareCleanupRecordsByObjectType);
int removedResourceCountForCleanup = 0;
int failedResourceCountForCleanup = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, cleanupRecordList)
{
/* We only support one resource type at the moment */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
continue;
}
if (!PrimaryNodeForGroup(record->nodeGroupId, NULL))
{
continue;
@ -289,7 +305,7 @@ DropOrphanedShardsForCleanup()
continue;
}
char *qualifiedTableName = record->objectName;
char *resourceName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
/*
@ -302,29 +318,28 @@ DropOrphanedShardsForCleanup()
continue;
}
if (TryDropShardOutsideTransaction(record->operationId,
qualifiedTableName,
workerNode->workerName,
workerNode->workerPort))
if (TryDropResourceByCleanupRecordOutsideTransaction(record,
workerNode->workerName,
workerNode->workerPort))
{
if (record->policy == CLEANUP_DEFERRED_ON_SUCCESS)
{
ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d "
ereport(LOG, (errmsg("deferred drop of orphaned resource %s on %s:%d "
"completed",
qualifiedTableName,
resourceName,
workerNode->workerName, workerNode->workerPort)));
}
else
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d which "
ereport(LOG, (errmsg("cleaned up orphaned resource %s on %s:%d which "
"was left behind after a failed operation",
qualifiedTableName,
resourceName,
workerNode->workerName, workerNode->workerPort)));
}
/* delete the cleanup record */
DeleteCleanupRecordByRecordId(record->recordId);
removedShardCountForCleanup++;
removedResourceCountForCleanup++;
}
else
{
@ -332,18 +347,18 @@ DropOrphanedShardsForCleanup()
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountForCleanup++;
failedResourceCountForCleanup++;
}
}
if (failedShardCountForCleanup > 0)
if (failedResourceCountForCleanup > 0)
{
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d",
failedShardCountForCleanup,
ereport(WARNING, (errmsg("failed to clean up %d orphaned resources out of %d",
failedResourceCountForCleanup,
list_length(cleanupRecordList))));
}
return removedShardCountForCleanup;
return removedResourceCountForCleanup;
}
@ -414,8 +429,7 @@ DropOrphanedShardsForMove(bool waitForLocks)
ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
char *qualifiedTableName = ConstructQualifiedShardName(shardInterval);
if (TryDropShardOutsideTransaction(INVALID_OPERATION_ID,
qualifiedTableName,
if (TryDropShardOutsideTransaction(qualifiedTableName,
shardPlacement->nodeName,
shardPlacement->nodePort))
{
@ -478,50 +492,38 @@ FinalizeOperationNeedingCleanupOnFailure(const char *operationName)
List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation();
int removedShardCountOnComplete = 0;
/*
* We sort the records before cleaning up by their types, because of dependencies.
* For example, a subscription might depend on a publication.
*/
currentOperationRecordList = SortList(currentOperationRecordList,
CompareCleanupRecordsByObjectType);
int failedShardCountOnComplete = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, currentOperationRecordList)
{
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg(
"Invalid object type %d on failed operation cleanup",
record->objectType)));
continue;
}
if (record->policy == CLEANUP_ALWAYS || record->policy == CLEANUP_ON_FAILURE)
{
char *qualifiedTableName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
/*
* For all resources of CurrentOperationId that are marked as 'CLEANUP_ALWAYS' or
* 'CLEANUP_ON_FAILURE', drop resource and cleanup records.
*/
if (TryDropShardOutsideTransaction(CurrentOperationId,
qualifiedTableName,
workerNode->workerName,
workerNode->workerPort))
if (TryDropResourceByCleanupRecordOutsideTransaction(record,
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation failed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Given the operation is failing and we will abort its transaction, we cannot delete
* records in the current transaction. Delete these records outside of the
* current transaction via a localhost connection.
*/
DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId);
removedShardCountOnComplete++;
}
else
else if (record->objectType == CLEANUP_OBJECT_SHARD_PLACEMENT)
{
/*
* We log failures at the end, since they occur repeatedly
@ -557,50 +559,38 @@ FinalizeOperationNeedingCleanupOnSuccess(const char *operationName)
List *currentOperationRecordList = ListCleanupRecordsForCurrentOperation();
int removedShardCountOnComplete = 0;
/*
* We sort the records before cleaning up by their types, because of dependencies.
* For example, a subscription might depend on a publication.
*/
currentOperationRecordList = SortList(currentOperationRecordList,
CompareCleanupRecordsByObjectType);
int failedShardCountOnComplete = 0;
CleanupRecord *record = NULL;
foreach_ptr(record, currentOperationRecordList)
{
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg(
"Invalid object type %d on operation cleanup",
record->objectType)));
continue;
}
if (record->policy == CLEANUP_ALWAYS)
{
char *qualifiedTableName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
/*
* For all resources of CurrentOperationId that are marked as 'CLEANUP_ALWAYS'
* drop resource and cleanup records.
*/
if (TryDropShardOutsideTransaction(CurrentOperationId,
qualifiedTableName,
workerNode->workerName,
workerNode->workerPort))
if (TryDropResourceByCleanupRecordOutsideTransaction(record,
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation completed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Delete cleanup records outside transaction as:
* The resources are marked as 'CLEANUP_ALWAYS' and should be cleaned no matter
* the operation succeeded or failed.
*/
DeleteCleanupRecordByRecordIdOutsideTransaction(record->recordId);
removedShardCountOnComplete++;
}
else
else if (record->objectType == CLEANUP_OBJECT_SHARD_PLACEMENT)
{
/*
* We log failures at the end, since they occur repeatedly
@ -632,6 +622,30 @@ FinalizeOperationNeedingCleanupOnSuccess(const char *operationName)
}
/*
* CompareRecordsByObjectType is a comparison function for sort
* cleanup records by their object type.
*/
static int
CompareCleanupRecordsByObjectType(const void *leftElement, const void *rightElement)
{
CleanupRecord *leftRecord = *((CleanupRecord **) leftElement);
CleanupRecord *rightRecord = *((CleanupRecord **) rightElement);
/* we compare 64-bit integers, instead of casting their difference to int */
if (leftRecord->objectType > rightRecord->objectType)
{
return 1;
}
else if (leftRecord->objectType < rightRecord->objectType)
{
return -1;
}
return 0;
}
/*
* InsertCleanupRecordInCurrentTransaction inserts a new pg_dist_cleanup_record entry
* as part of the current transaction. This is primarily useful for deferred drop scenarios,
@ -680,7 +694,7 @@ InsertCleanupRecordInCurrentTransaction(CleanupObject objectType,
/*
* InsertCleanupRecordInSeparateTransaction inserts a new pg_dist_cleanup_record entry
* InsertCleanupRecordInSubtransaction inserts a new pg_dist_cleanup_record entry
* in a separate transaction to ensure the record persists after rollback. We should
* delete these records if the operation completes successfully.
*
@ -769,12 +783,64 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode)
/*
* TryDropShard tries to drop the given shard placement and returns
* TryDropResourceByCleanupRecordOutsideTransaction tries to drop the given resource
* and returns true on success.
*/
static bool
TryDropResourceByCleanupRecordOutsideTransaction(CleanupRecord *record,
char *nodeName,
int nodePort)
{
switch (record->objectType)
{
case CLEANUP_OBJECT_SHARD_PLACEMENT:
{
return TryDropShardOutsideTransaction(record->objectName,
nodeName, nodePort);
}
case CLEANUP_OBJECT_SUBSCRIPTION:
{
return TryDropSubscriptionOutsideTransaction(record->objectName,
nodeName, nodePort);
}
case CLEANUP_OBJECT_PUBLICATION:
{
return TryDropPublicationOutsideTransaction(record->objectName,
nodeName, nodePort);
}
case CLEANUP_OBJECT_REPLICATION_SLOT:
{
return TryDropReplicationSlotOutsideTransaction(record->objectName,
nodeName, nodePort);
}
case CLEANUP_OBJECT_USER:
{
return TryDropUserOutsideTransaction(record->objectName, nodeName, nodePort);
}
default:
{
ereport(WARNING, (errmsg(
"Invalid object type %d on failed operation cleanup",
record->objectType)));
return false;
}
}
return false;
}
/*
* TryDropShardOutsideTransaction tries to drop the given shard placement and returns
* true on success.
*/
static bool
TryDropShardOutsideTransaction(OperationId operationId,
char *qualifiedTableName,
TryDropShardOutsideTransaction(char *qualifiedTableName,
char *nodeName,
int nodePort)
{
@ -808,6 +874,234 @@ TryDropShardOutsideTransaction(OperationId operationId,
}
/*
* TryDropSubscriptionOutsideTransaction drops subscription with the given name on the
* subscriber node if it exists. Note that this doesn't drop the replication slot on the
* publisher node. The reason is that sometimes this is not possible. To known
* cases where this is not possible are:
* 1. Due to the node with the replication slot being down.
* 2. Due to a deadlock when the replication is on the same node as the
* subscription, which is the case for shard splits to the local node.
*
* So instead of directly dropping the subscription, including the attached
* replication slot, the subscription is first disconnected from the
* replication slot before dropping it. The replication slot itself should be
* dropped using DropReplicationSlot on the source connection.
*/
static bool
TryDropSubscriptionOutsideTransaction(char *subscriptionName,
char *nodeName,
int nodePort)
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);
RemoteTransactionBegin(connection);
if (ExecuteOptionalRemoteCommand(connection,
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
{
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
int querySent = SendRemoteCommand(
connection,
psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(subscriptionName)));
if (querySent == 0)
{
ReportConnectionError(connection, WARNING);
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_UNDEFINED_OBJECT) == 0)
{
/*
* The subscription doesn't exist, so we can return right away.
* This DropSubscription call is effectively a no-op.
*/
PQclear(result);
ForgetResults(connection);
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return true;
}
else
{
ReportResultError(connection, result, WARNING);
PQclear(result);
ForgetResults(connection);
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
}
PQclear(result);
ForgetResults(connection);
RemoteTransactionCommit(connection);
ResetRemoteTransaction(connection);
StringInfo alterQuery = makeStringInfo();
appendStringInfo(alterQuery,
"ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
quote_identifier(subscriptionName));
StringInfo dropQuery = makeStringInfo();
appendStringInfo(dropQuery,
"DROP SUBSCRIPTION %s",
quote_identifier(subscriptionName));
List *dropCommandList = list_make3("SET LOCAL lock_timeout TO '1s'",
alterQuery->data, dropQuery->data);
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
connection,
dropCommandList);
return success;
}
/*
* TryDropPublicationOutsideTransaction drops the publication with the given name if it
* exists.
*/
static bool
TryDropPublicationOutsideTransaction(char *publicationName,
char *nodeName,
int nodePort)
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);
StringInfo dropQuery = makeStringInfo();
appendStringInfo(dropQuery,
"DROP PUBLICATION IF EXISTS %s",
quote_identifier(publicationName));
List *dropCommandList = list_make2("SET LOCAL lock_timeout TO '1s'",
dropQuery->data);
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
connection,
dropCommandList);
return success;
}
/*
* TryDropReplicationSlotOutsideTransaction drops the replication slot with the given
* name if it exists.
*/
static bool
TryDropReplicationSlotOutsideTransaction(char *replicationSlotName,
char *nodeName,
int nodePort)
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);
RemoteTransactionBegin(connection);
if (ExecuteOptionalRemoteCommand(connection,
"SET LOCAL lock_timeout TO '1s'", NULL) != 0)
{
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
int querySent = SendRemoteCommand(
connection,
psprintf(
"select pg_drop_replication_slot(slot_name) from "
REPLICATION_SLOT_CATALOG_TABLE_NAME
" where slot_name = %s",
quote_literal_cstr(replicationSlotName))
);
if (querySent == 0)
{
ReportConnectionError(connection, WARNING);
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (IsResponseOK(result))
{
PQclear(result);
ForgetResults(connection);
RemoteTransactionCommit(connection);
ResetRemoteTransaction(connection);
return true;
}
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) != 0)
{
/* throw a warning unless object is in use */
ReportResultError(connection, result, WARNING);
}
PQclear(result);
ForgetResults(connection);
RemoteTransactionAbort(connection);
ResetRemoteTransaction(connection);
return false;
}
/*
* TryDropUserOutsideTransaction drops the user with the given name if it exists.
*/
static bool
TryDropUserOutsideTransaction(char *username,
char *nodeName, int nodePort)
{
int connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
CitusExtensionOwnerName(),
NULL);
/*
* The DROP USER command should not propagate, so we temporarily disable
* DDL propagation.
*/
bool success = SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
connection,
list_make3(
"SET LOCAL lock_timeout TO '1s'",
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s;",
quote_identifier(username))));
return success;
}
/*
* GetNextOperationId allocates and returns a unique operationId for an operation
* requiring potential cleanup. This allocation occurs both in shared memory and

View File

@ -772,7 +772,7 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
ListCell *placementUpdateCell = NULL;
DropOrphanedShardsInSeparateTransaction();
DropOrphanedResourcesInSeparateTransaction();
foreach(placementUpdateCell, placementUpdateList)
{
@ -1901,7 +1901,7 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
return 0;
}
DropOrphanedShardsInSeparateTransaction();
DropOrphanedResourcesInSeparateTransaction();
/* find the name of the shard transfer mode to interpolate in the scheduled command */
Datum shardTranferModeLabelDatum =

View File

@ -522,6 +522,8 @@ SplitShard(SplitMode splitMode,
sourceColocatedShardIntervalList = colocatedShardIntervalList;
}
DropOrphanedResourcesInSeparateTransaction();
/* use the user-specified shard ID as the split workflow ID */
uint64 splitWorkflowId = shardIntervalToSplit->shardId;
@ -771,12 +773,11 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
workerPlacementNode->workerPort)));
}
CleanupPolicy policy = CLEANUP_ON_FAILURE;
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
policy);
CLEANUP_ON_FAILURE);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
@ -1407,11 +1408,10 @@ InsertDeferredDropCleanupRecordsForShards(List *shardIntervalList)
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS;
InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId,
policy);
CLEANUP_DEFERRED_ON_SUCCESS);
}
}
}
@ -1494,8 +1494,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList,
workersForPlacementList);
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(
connectionFlags,
@ -1722,12 +1720,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block
* TODO(niupre): We don't need to do this once shard cleaner can clean replication
* artifacts.
*/
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
@ -1820,12 +1812,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
policy);
CLEANUP_ALWAYS);
/* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
@ -1883,12 +1874,11 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
policy);
CLEANUP_ALWAYS);
/* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);

View File

@ -1157,6 +1157,8 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
return;
}
DropOrphanedResourcesInSeparateTransaction();
if (useLogicalReplication)
{
CopyShardTablesViaLogicalReplication(shardIntervalList, sourceNodeName,
@ -1209,9 +1211,17 @@ CopyShardTablesViaLogicalReplication(List *shardIntervalList, char *sourceNodeNa
MemoryContextSwitchTo(oldContext);
/* Start operation to prepare for generating cleanup records */
RegisterOperationNeedingCleanup();
/* data copy is done seperately when logical replication is used */
LogicallyReplicateShards(shardIntervalList, sourceNodeName,
sourceNodePort, targetNodeName, targetNodePort);
/*
* Drop temporary objects that were marked as CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnSuccess("citus_[move/copy]_shard_placement");
}
@ -1493,10 +1503,10 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
if (targetPlacement->shardState == SHARD_STATE_TO_DELETE)
{
/*
* Trigger deletion of orphaned shards and hope that this removes
* Trigger deletion of orphaned resources and hope that this removes
* the shard.
*/
DropOrphanedShardsInSeparateTransaction();
DropOrphanedResourcesInSeparateTransaction();
shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,

View File

@ -45,6 +45,7 @@
#include "distributed/distributed_planner.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_transfer.h"
#include "distributed/version_compat.h"
@ -65,11 +66,6 @@
#include "utils/ruleutils.h"
#include "utils/syscache.h"
#define STR_ERRCODE_UNDEFINED_OBJECT "42704"
#define STR_ERRCODE_OBJECT_IN_USE "55006"
#define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots"
#define CURRENT_LOG_POSITION_COMMAND "SELECT pg_current_wal_lsn()"
/* decimal representation of Adler-16 hash value of citus_shard_move_publication */
@ -135,17 +131,7 @@ static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static void AcquireLogicalReplicationLock(void);
static void DropSubscription(MultiConnection *connection,
char *subscriptionName);
static void DropPublication(MultiConnection *connection, char *publicationName);
static void DropUser(MultiConnection *connection, char *username);
static void DropReplicationSlot(MultiConnection *connection,
char *publicationName);
static void DropAllSubscriptions(MultiConnection *connection, LogicalRepType type);
static void DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type);
static void DropAllPublications(MultiConnection *connection, LogicalRepType type);
static void DropAllUsers(MultiConnection *connection, LogicalRepType type);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);
static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash,
@ -153,10 +139,6 @@ static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash,
static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition,
GroupedLogicalRepTargets *
groupedLogicalRepTargets);
static void RecreateGroupedLogicalRepTargetsConnections(
HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName);
/*
* LogicallyReplicateShards replicates a list of shards from one node to another
@ -184,8 +166,6 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
return;
}
DropAllLogicalReplicationLeftovers(SHARD_MOVE);
MultiConnection *sourceConnection =
GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort,
superUser, databaseName);
@ -291,34 +271,14 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
}
PG_CATCH();
{
/*
* Try our best not to leave any left-over subscription or publication.
*
* Although it is not very advisable to use code-paths that could throw
* new errors, we prefer to do it here since we expect the cost of leaving
* left-overs not be very low.
*/
/* reconnect if the connection failed or is waiting for a command */
RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash,
superUser, databaseName);
DropSubscriptions(logicalRepTargetList);
/* reconnect if the connection failed or is waiting for a command */
if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK ||
PQisBusy(sourceConnection->pgConn))
{
sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceNodeName,
sourceNodePort, superUser,
databaseName);
}
DropReplicationSlots(sourceConnection, logicalRepTargetList);
DropPublications(sourceConnection, publicationInfoHash);
/* We don't need to UnclaimConnections since we're already erroring out */
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure("citus_[move/copy]_shard_placement");
PG_RE_THROW();
}
PG_END_TRY();
@ -452,11 +412,6 @@ CompleteNonBlockingShardTransfer(List *shardList,
sourceConnection->hostname,
sourceConnection->port,
PLACEMENT_UPDATE_STATUS_COMPLETING);
/* we're done, cleanup the publication and subscription */
DropSubscriptions(logicalRepTargetList);
DropReplicationSlots(sourceConnection, logicalRepTargetList);
DropPublications(sourceConnection, publicationInfoHash);
}
@ -573,74 +528,6 @@ AcquireLogicalReplicationLock(void)
}
/*
* DropAllLogicalReplicationLeftovers drops all subscriptions, publications,
* roles and replication slots on all nodes that were related to this
* LogicalRepType. These might have been left there after the coordinator
* crashed during a shard move/split. It's important to delete them for two
* reasons:
* 1. Starting new shard moves will fail when they exist, because it cannot
* create them.
* 2. Leftover replication slots that are not consumed from anymore make it
* impossible for WAL to be dropped. This can cause out-of-disk issues.
*/
void
DropAllLogicalReplicationLeftovers(LogicalRepType type)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
/*
* We need connections that are not currently inside a transaction. The
* reason for this is that operations on subscriptions, publications and
* replication slots cannot be run in a transaction. By forcing a new
* connection we make sure no transaction is active on the connection.
*/
int connectionFlags = FORCE_NEW_CONNECTION;
List *workerNodeList = ActivePrimaryNodeList(AccessShareLock);
List *cleanupConnectionList = NIL;
WorkerNode *workerNode = NULL;
/*
* First we try to remove the subscription, everywhere and only after
* having done that we try to remove the publication everywhere. This is
* needed, because the publication can only be removed if there's no active
* subscription on it.
*/
foreach_ptr(workerNode, workerNodeList)
{
MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection(
connectionFlags, workerNode->workerName, workerNode->workerPort,
superUser, databaseName);
cleanupConnectionList = lappend(cleanupConnectionList, cleanupConnection);
DropAllSubscriptions(cleanupConnection, type);
DropAllUsers(cleanupConnection, type);
}
MultiConnection *cleanupConnection = NULL;
foreach_ptr(cleanupConnection, cleanupConnectionList)
{
/*
* If replication slot could not be dropped while dropping the
* subscriber, drop it here.
*/
DropAllReplicationSlots(cleanupConnection, type);
DropAllPublications(cleanupConnection, type);
/*
* We close all connections that we opened for the dropping here. That
* way we don't keep these connections open unnecessarily during the
* 'LogicalRepType' operation (which can take a long time). We might
* need to reopen a few later on, but that seems better than keeping
* many open for no reason for a long time.
*/
CloseConnection(cleanupConnection);
}
}
/*
* PrepareReplicationSubscriptionList returns list of shards to be logically
* replicated from given shard list. This is needed because Postgres does not
@ -1314,115 +1201,6 @@ ConflictWithIsolationTestingAfterCopy(void)
}
/*
* DropReplicationSlots drops the replication slots used for shard moves/splits
* over the given connection (if they exist).
*/
void
DropReplicationSlots(MultiConnection *sourceConnection, List *logicalRepTargetList)
{
LogicalRepTarget *target = NULL;
foreach_ptr(target, logicalRepTargetList)
{
DropReplicationSlot(sourceConnection, target->replicationSlot->name);
}
}
/*
* DropPublications drops the publications used for shard moves/splits over the
* given connection (if they exist).
*/
void
DropPublications(MultiConnection *sourceConnection, HTAB *publicationInfoHash)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, publicationInfoHash);
PublicationInfo *entry = NULL;
while ((entry = (PublicationInfo *) hash_seq_search(&status)) != NULL)
{
DropPublication(sourceConnection, entry->name);
}
}
/*
* DropReplicationSlot drops the replication slot with the given name
* if it exists. It retries if the command fails with an OBJECT_IN_USE error.
*/
static void
DropReplicationSlot(MultiConnection *connection, char *replicationSlotName)
{
int maxSecondsToTryDropping = 20;
bool raiseInterrupts = true;
PGresult *result = NULL;
/* we'll retry in case of an OBJECT_IN_USE error */
while (maxSecondsToTryDropping >= 0)
{
int querySent = SendRemoteCommand(
connection,
psprintf(
"select pg_drop_replication_slot(slot_name) from "
REPLICATION_SLOT_CATALOG_TABLE_NAME
" where slot_name = %s",
quote_literal_cstr(replicationSlotName))
);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (IsResponseOK(result))
{
/* no error, we are good to go */
break;
}
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_OBJECT_IN_USE) == 0 &&
maxSecondsToTryDropping > 0)
{
/* retry dropping the replication slot after sleeping for one sec */
maxSecondsToTryDropping--;
pg_usleep(1000);
}
else
{
/*
* Report error if:
* - Error code is not 55006 (Object In Use)
* - Or, we have made enough number of retries (currently 20), but didn't work
*/
ReportResultError(connection, result, ERROR);
}
PQclear(result);
ForgetResults(connection);
}
PQclear(result);
ForgetResults(connection);
}
/*
* DropPublication drops the publication with the given name if it
* exists.
*/
static void
DropPublication(MultiConnection *connection, char *publicationName)
{
ExecuteCriticalRemoteCommand(connection, psprintf(
"DROP PUBLICATION IF EXISTS %s",
quote_identifier(publicationName)));
}
/*
* PublicationName returns the name of the publication for the given node and
* table owner.
@ -1530,187 +1308,6 @@ GetQueryResultStringList(MultiConnection *connection, char *query)
}
/*
* DropAllSubscriptions drops all the existing subscriptions that
* match our shard move naming scheme on the node that the connection points
* to.
*/
static void
DropAllSubscriptions(MultiConnection *connection, LogicalRepType type)
{
char *query = psprintf(
"SELECT subname FROM pg_subscription "
"WHERE subname LIKE %s || '%%'",
quote_literal_cstr(subscriptionPrefix[type]));
List *subscriptionNameList = GetQueryResultStringList(connection, query);
char *subscriptionName;
foreach_ptr(subscriptionName, subscriptionNameList)
{
DropSubscription(connection, subscriptionName);
}
}
/*
* DropAllUsers drops all the users that match our shard move naming
* scheme for temporary shard move users on the node that the connection points
* to.
*/
static void
DropAllUsers(MultiConnection *connection, LogicalRepType type)
{
char *query = psprintf(
"SELECT rolname FROM pg_roles "
"WHERE rolname LIKE %s || '%%'",
quote_literal_cstr(subscriptionRolePrefix[type]));
List *usernameList = GetQueryResultStringList(connection, query);
char *username;
foreach_ptr(username, usernameList)
{
DropUser(connection, username);
}
}
/*
* DropAllReplicationSlots drops all the existing replication slots
* that match our shard move naming scheme on the node that the connection
* points to.
*/
static void
DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type)
{
char *query = psprintf(
"SELECT slot_name FROM pg_replication_slots "
"WHERE slot_name LIKE %s || '%%'",
quote_literal_cstr(replicationSlotPrefix[type]));
List *slotNameList = GetQueryResultStringList(connection, query);
char *slotName;
foreach_ptr(slotName, slotNameList)
{
DropReplicationSlot(connection, slotName);
}
}
/*
* DropAllPublications drops all the existing publications that
* match our shard move naming scheme on the node that the connection points
* to.
*/
static void
DropAllPublications(MultiConnection *connection, LogicalRepType type)
{
char *query = psprintf(
"SELECT pubname FROM pg_publication "
"WHERE pubname LIKE %s || '%%'",
quote_literal_cstr(publicationPrefix[type]));
List *publicationNameList = GetQueryResultStringList(connection, query);
char *publicationName;
foreach_ptr(publicationName, publicationNameList)
{
DropPublication(connection, publicationName);
}
}
/*
* DropSubscriptions drops all the subscriptions in the logicalRepTargetList
* from the subscriber node. It also drops the temporary users that are used as
* owners for of the subscription. This doesn't drop the replication slots on
* the publisher, these should be dropped using DropReplicationSlots.
*/
void
DropSubscriptions(List *logicalRepTargetList)
{
LogicalRepTarget *target = NULL;
foreach_ptr(target, logicalRepTargetList)
{
DropSubscription(target->superuserConnection,
target->subscriptionName);
DropUser(target->superuserConnection, target->subscriptionOwnerName);
}
}
/*
* DropSubscription drops subscription with the given name on the subscriber
* node if it exists. Note that this doesn't drop the replication slot on the
* publisher node. The reason is that sometimes this is not possible. To known
* cases where this is not possible are:
* 1. Due to the node with the replication slot being down.
* 2. Due to a deadlock when the replication is on the same node as the
* subscription, which is the case for shard splits to the local node.
*
* So instead of directly dropping the subscription, including the attached
* replication slot, the subscription is first disconnected from the
* replication slot before dropping it. The replication slot itself should be
* dropped using DropReplicationSlot on the source connection.
*/
static void
DropSubscription(MultiConnection *connection, char *subscriptionName)
{
int querySent = SendRemoteCommand(
connection,
psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(subscriptionName)));
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE);
if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_UNDEFINED_OBJECT) == 0)
{
/*
* The subscription doesn't exist, so we can return right away.
* This DropSubscription call is effectively a no-op.
*/
return;
}
else
{
ReportResultError(connection, result, ERROR);
PQclear(result);
ForgetResults(connection);
}
}
PQclear(result);
ForgetResults(connection);
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
quote_identifier(subscriptionName)));
ExecuteCriticalRemoteCommand(connection, psprintf(
"DROP SUBSCRIPTION %s",
quote_identifier(subscriptionName)));
}
/*
* DropUser drops the user with the given name if it exists.
*/
static void
DropUser(MultiConnection *connection, char *username)
{
/*
* The DROP USER command should not propagate, so we temporarily disable
* DDL propagation.
*/
SendCommandListToWorkerOutsideTransactionWithConnection(
connection,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf("DROP USER IF EXISTS %s;",
quote_identifier(username))));
}
/*
* CreatePublications creates a the publications defined in the
* publicationInfoHash over the given connection.
@ -1744,6 +1341,13 @@ CreatePublications(MultiConnection *connection,
prefixWithComma = true;
}
WorkerNode *worker = FindWorkerNode(connection->hostname,
connection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_PUBLICATION,
entry->name,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
@ -1835,6 +1439,13 @@ CreateReplicationSlots(MultiConnection *sourceConnection,
{
ReplicationSlotInfo *replicationSlot = target->replicationSlot;
WorkerNode *worker = FindWorkerNode(sourceConnection->hostname,
sourceConnection->port);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_REPLICATION_SLOT,
replicationSlot->name,
worker->groupId,
CLEANUP_ALWAYS);
if (!firstReplicationSlot)
{
firstReplicationSlot = replicationSlot;
@ -1880,6 +1491,9 @@ CreateSubscriptions(MultiConnection *sourceConnection,
{
int ownerId = target->tableOwnerId;
WorkerNode *worker = FindWorkerNode(target->superuserConnection->hostname,
target->superuserConnection->port);
/*
* The CREATE USER command should not propagate, so we temporarily
* disable DDL propagation.
@ -1898,6 +1512,11 @@ CreateSubscriptions(MultiConnection *sourceConnection,
GetUserNameFromId(ownerId, false)
)));
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_USER,
target->subscriptionOwnerName,
worker->groupId,
CLEANUP_ALWAYS);
StringInfo conninfo = makeStringInfo();
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
"connect_timeout=20",
@ -1937,6 +1556,12 @@ CreateSubscriptions(MultiConnection *sourceConnection,
createSubscriptionCommand->data);
pfree(createSubscriptionCommand->data);
pfree(createSubscriptionCommand);
InsertCleanupRecordInSubtransaction(CLEANUP_OBJECT_SUBSCRIPTION,
target->subscriptionName,
worker->groupId,
CLEANUP_ALWAYS);
ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s",
target->subscriptionName,
@ -2110,62 +1735,6 @@ CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
}
/*
* RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the
* nodes in the groupedLogicalRepTargetsHash where the old connection is broken or
* currently running a query.
*
* IMPORTANT: When it recreates the connection, it doesn't close the existing
* connection. This means that this function should only be called when we know
* we'll throw an error afterwards, otherwise we would leak these connections.
*/
static void
RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash,
char *user,
char *databaseName)
{
int connectionFlags = FORCE_NEW_CONNECTION;
HASH_SEQ_STATUS status;
GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL;
foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash)
{
MultiConnection *superuserConnection =
groupedLogicalRepTargets->superuserConnection;
if (superuserConnection &&
PQstatus(superuserConnection->pgConn) == CONNECTION_OK &&
!PQisBusy(superuserConnection->pgConn)
)
{
continue;
}
WorkerNode *targetWorkerNode = FindNodeWithNodeId(
groupedLogicalRepTargets->nodeId,
false);
superuserConnection = GetNodeUserDatabaseConnection(
connectionFlags,
targetWorkerNode->workerName,
targetWorkerNode->workerPort,
user,
databaseName);
/*
* Operations on subscriptions cannot run in a transaction block. We
* claim the connections exclusively to ensure they do not get used for
* metadata syncing, which does open a transaction block.
*/
ClaimConnectionExclusively(superuserConnection);
groupedLogicalRepTargets->superuserConnection = superuserConnection;
LogicalRepTarget *target = NULL;
foreach_ptr(target, groupedLogicalRepTargets->logicalRepTargetList)
{
target->superuserConnection = superuserConnection;
}
}
}
/*
* CreateGroupedLogicalRepTargetsConnections closes the connections for all of the
* nodes in the groupedLogicalRepTargetsHash.

View File

@ -52,7 +52,7 @@ static ShardCost GetShardCost(uint64 shardId, void *context);
PG_FUNCTION_INFO_V1(shard_placement_rebalance_array);
PG_FUNCTION_INFO_V1(shard_placement_replication_array);
PG_FUNCTION_INFO_V1(worker_node_responsive);
PG_FUNCTION_INFO_V1(run_try_drop_marked_shards);
PG_FUNCTION_INFO_V1(run_try_drop_marked_resources);
typedef struct ShardPlacementTestInfo
{
@ -75,13 +75,13 @@ typedef struct RebalancePlanContext
} RebalancePlacementContext;
/*
* run_try_drop_marked_shards is a wrapper to run TryDropOrphanedShards.
* run_try_drop_marked_resources is a wrapper to run TryDropOrphanedResources.
*/
Datum
run_try_drop_marked_shards(PG_FUNCTION_ARGS)
run_try_drop_marked_resources(PG_FUNCTION_ARGS)
{
bool waitForLocks = false;
TryDropOrphanedShards(waitForLocks);
TryDropOrphanedResources(waitForLocks);
PG_RETURN_VOID();
}

View File

@ -627,7 +627,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
TimestampDifferenceExceeds(lastShardCleanTime, GetCurrentTimestamp(),
DeferShardDeleteInterval))
{
int numberOfDroppedShards = 0;
int numberOfDroppedResources = 0;
InvalidateMetadataSystemCache();
StartTransactionCommand();
@ -646,16 +646,16 @@ CitusMaintenanceDaemonMain(Datum main_arg)
lastShardCleanTime = GetCurrentTimestamp();
bool waitForLocks = false;
numberOfDroppedShards = TryDropOrphanedShards(waitForLocks);
numberOfDroppedResources = TryDropOrphanedResources(waitForLocks);
}
CommitTransactionCommand();
if (numberOfDroppedShards > 0)
if (numberOfDroppedResources > 0)
{
ereport(LOG, (errmsg("maintenance daemon dropped %d distributed "
"shards previously marked to be removed",
numberOfDroppedShards)));
ereport(LOG, (errmsg("maintenance daemon dropped %d "
"resources previously marked to be removed",
numberOfDroppedResources)));
}
/* make sure we don't wait too long */

View File

@ -150,12 +150,6 @@ extern char * CreateReplicationSlots(MultiConnection *sourceConnection,
List *subscriptionInfoList,
char *outputPlugin);
extern void EnableSubscriptions(List *subscriptionInfoList);
extern void DropSubscriptions(List *subscriptionInfoList);
extern void DropReplicationSlots(MultiConnection *sourceConnection,
List *subscriptionInfoList);
extern void DropPublications(MultiConnection *sourceConnection,
HTAB *publicationInfoHash);
extern void DropAllLogicalReplicationLeftovers(LogicalRepType type);
extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId);
extern char * ReplicationSlotNameForNodeAndOwner(LogicalRepType type, uint32_t nodeId, Oid

View File

@ -23,8 +23,8 @@ extern bool CheckAvailableSpaceBeforeMove;
extern int NextOperationId;
extern int NextCleanupRecordId;
extern int TryDropOrphanedShards(bool waitForLocks);
extern void DropOrphanedShardsInSeparateTransaction(void);
extern int TryDropOrphanedResources(bool waitForLocks);
extern void DropOrphanedResourcesInSeparateTransaction(void);
/* Members for cleanup infrastructure */
typedef uint64 OperationId;
@ -36,7 +36,11 @@ extern OperationId CurrentOperationId;
typedef enum CleanupObject
{
CLEANUP_OBJECT_INVALID = 0,
CLEANUP_OBJECT_SHARD_PLACEMENT = 1
CLEANUP_OBJECT_SHARD_PLACEMENT = 1,
CLEANUP_OBJECT_SUBSCRIPTION = 2,
CLEANUP_OBJECT_REPLICATION_SLOT = 3,
CLEANUP_OBJECT_PUBLICATION = 4,
CLEANUP_OBJECT_USER = 5
} CleanupObject;
/*

View File

@ -275,7 +275,7 @@ SELECT pg_reload_conf();
-- END: Split a shard along its co-located shards
-- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 13 orphaned resources
NOTICE: cleaned up 11 orphaned resources
-- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -234,7 +234,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- BEGIN: Perform deferred cleanup.
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 3 orphaned resources
-- END: Perform deferred cleanup.
-- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points(

View File

@ -189,8 +189,8 @@ SELECT create_distributed_table_concurrently('table_1', 'id');
SELECT * FROM pg_dist_shard WHERE logicalrelid = 'table_1'::regclass;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
table_1 | 1880084 | t | -2147483648 | -1
table_1 | 1880085 | t | 0 | 2147483647
table_1 | 1880080 | t | -2147483648 | -1
table_1 | 1880081 | t | 0 | 2147483647
(2 rows)
DROP SCHEMA create_dist_tbl_con CASCADE;

View File

@ -54,8 +54,29 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 1 orphaned resources
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT * FROM shards_in_workers;
shardid | worker
@ -73,12 +94,15 @@ SELECT count(*) FROM t;
(1 row)
-- Verify that shard can be moved after a temporary failure
-- cleanup leftovers, as it can cause flakiness in the following test files
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
master_move_shard_placement
---------------------------------------------------------------------

View File

@ -122,15 +122,43 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISAB
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it
DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- try again
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 2 orphaned resources
-- cancel on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
-- try again
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
master_move_shard_placement
---------------------------------------------------------------------
@ -170,8 +198,85 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kil
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: role "citus_shard_move_subscription_role_10" cannot be dropped because some objects depend on it
DETAIL: owner of subscription citus_shard_move_subscription_xxxxxxx
CONTEXT: while executing command on localhost:xxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- should succeed with warnings (subscription not dropped)
-- move the shard back
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 2 orphaned resources
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
-- failure on setting lock_timeout (right before dropping subscriptions & replication slots)
SELECT citus.mitmproxy('conn.onQuery(query="^SET LOCAL lock_timeout").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- should succeed with warnings (objects not dropped)
-- move the shard back
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
master_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 1 orphaned shards
-- cancellation on disabling subscription (right before dropping it)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')');
mitmproxy
@ -181,16 +286,47 @@ SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").can
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup
SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_xxxxxxx$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,9060,t,"DROP SUBSCRIPTION")
(localhost,57637,t,"DROP SUBSCRIPTION")
(2 rows)
-- cleanup leftovers
-- verify we don't see any error for already dropped subscription
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy
@ -227,14 +363,18 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources
-- lets create few more indexes and fail with both
-- parallel mode and sequential mode
CREATE INDEX index_failure_2 ON t(id);
@ -256,14 +396,9 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- failure on parallel create index
ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size;
SELECT pg_reload_conf();
@ -279,8 +414,9 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
(1 row)
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT citus.mitmproxy('conn.allow()');
mitmproxy

View File

@ -48,16 +48,17 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
(3 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -98,7 +99,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 3 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
@ -144,24 +145,23 @@ NOTICE: cleaned up 4 orphaned resources
(1 row)
-- set log level to prevent flakiness
SET client_min_messages TO ERROR;
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry.
RESET client_min_messages;
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
@ -224,11 +224,9 @@ NOTICE: cleaned up 4 orphaned resources
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
@ -262,16 +260,21 @@ WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
(5 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -314,7 +317,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 5 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
@ -333,11 +336,9 @@ NOTICE: cleaned up 4 orphaned resources
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
@ -367,16 +368,28 @@ NOTICE: cleaned up 4 orphaned resources
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
(8 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -422,7 +435,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
@ -441,26 +454,21 @@ NOTICE: cleaned up 4 orphaned resources
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
(0 rows)
--5. Failure on polling last write-ahead log location reported to origin WAL sender
\c - postgres - :master_port
@ -478,16 +486,28 @@ NOTICE: cleaned up 4 orphaned resources
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_shard_by_split_points operation failed
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
(8 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -533,7 +553,7 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
@ -552,26 +572,21 @@ NOTICE: cleaned up 4 orphaned resources
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
(0 rows)
--6. Failure on dropping subscription
\c - postgres - :master_port
@ -589,15 +604,29 @@ NOTICE: cleaned up 4 orphaned resources
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
WARNING: connection to the remote node localhost:xxxxx failed with the following error: connection not open
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating
-- any resources.
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
(8 rows)
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
relname
@ -619,7 +648,8 @@ CONTEXT: while executing command on localhost:xxxxx
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
table_to_split_8981003
(2 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
@ -646,6 +676,7 @@ CONTEXT: while executing command on localhost:xxxxx
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
operation_id | object_type | object_name | node_group_id | policy_type
@ -664,26 +695,21 @@ CONTEXT: while executing command on localhost:xxxxx
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
(0 rows)
-- Cleanup
\c - postgres - :master_port

View File

@ -18,6 +18,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
-- cleanup leftovers if any
CALL citus_cleanup_orphaned_resources();
CREATE TABLE table_1 (id int PRIMARY KEY);
CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int);
SELECT create_distributed_table('table_1', 'id');
@ -255,7 +257,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").cancel('
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").killall()');
mitmproxy
---------------------------------------------------------------------
@ -264,6 +266,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy
@ -273,8 +283,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- failure on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()');
mitmproxy
---------------------------------------------------------------------
@ -283,6 +301,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')');
mitmproxy
@ -292,8 +318,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- failure on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()');
mitmproxy
---------------------------------------------------------------------
@ -302,6 +336,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").ki
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')');
mitmproxy
@ -311,6 +353,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").ca
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
ERROR: canceling statement due to user request
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
CALL citus_cleanup_orphaned_resources();
-- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()');
mitmproxy

View File

@ -212,9 +212,6 @@ SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER B
rep2
(2 rows)
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
ERROR: source placement must be in active state
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;
@ -288,7 +285,7 @@ SELECT citus_move_shard_placement(92448500, 'localhost', :worker_2_port, 'localh
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
NOTICE: cleaned up 1 orphaned shards
SELECT shardid, shardstate, nodeport FROM pg_dist_shard_placement WHERE shardid = 92448300 ORDER BY placementid;
shardid | shardstate | nodeport
---------------------------------------------------------------------

View File

@ -1131,9 +1131,9 @@ step s2-blocking-shard-split:
'block_writes');
<waiting ...>
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -1162,9 +1162,9 @@ citus_split_shard_by_split_points
(1 row)
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -1199,9 +1199,9 @@ step s2-blocking-shard-split:
'block_writes');
<waiting ...>
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -1230,9 +1230,9 @@ citus_split_shard_by_split_points
(1 row)
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)

View File

@ -972,9 +972,9 @@ step s2-non-blocking-shard-split:
'force_logical');
<waiting ...>
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -982,13 +982,21 @@ run_try_drop_marked_shards
step s1-show-pg_dist_cleanup:
SELECT object_name, object_type, policy_type FROM pg_dist_cleanup;
object_name |object_type|policy_type
object_name |object_type|policy_type
---------------------------------------------------------------------
public.to_split_table_1500002| 1| 1
public.to_split_table_1500003| 1| 1
public.to_split_table_1500001| 1| 0
public.to_split_table_1500003| 1| 0
(4 rows)
public.to_split_table_1500002 | 1| 1
public.to_split_table_1500003 | 1| 1
public.to_split_table_1500001 | 1| 0
public.to_split_table_1500003 | 1| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_subscription_role_10| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0
citus_shard_split_subscription_role_10| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0
(12 rows)
step s1-release-split-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
@ -1005,9 +1013,9 @@ citus_split_shard_by_split_points
(1 row)
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -1042,9 +1050,9 @@ step s2-non-blocking-shard-split:
'force_logical');
<waiting ...>
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)
@ -1052,13 +1060,21 @@ run_try_drop_marked_shards
step s1-show-pg_dist_cleanup:
SELECT object_name, object_type, policy_type FROM pg_dist_cleanup;
object_name |object_type|policy_type
object_name |object_type|policy_type
---------------------------------------------------------------------
public.to_split_table_1500002| 1| 1
public.to_split_table_1500003| 1| 1
public.to_split_table_1500001| 1| 0
public.to_split_table_1500003| 1| 0
(4 rows)
public.to_split_table_1500002 | 1| 1
public.to_split_table_1500003 | 1| 1
public.to_split_table_1500001 | 1| 0
public.to_split_table_1500003 | 1| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_publication_xxxxxxx_xxxxxxx | 4| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_slot_xxxxxxx_xxxxxxx | 3| 0
citus_shard_split_subscription_role_10| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0
citus_shard_split_subscription_role_10| 5| 0
citus_shard_split_subscription_xxxxxxx | 2| 0
(12 rows)
step s1-release-split-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
@ -1075,9 +1091,9 @@ citus_split_shard_by_split_points
(1 row)
step s1-run-cleaner:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)

View File

@ -177,9 +177,9 @@ step s1-lock-pg-dist-placement:
s2: DEBUG: could not acquire shard lock to cleanup placements
step s2-drop-old-shards:
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
run_try_drop_marked_shards
run_try_drop_marked_resources
---------------------------------------------------------------------
(1 row)

View File

@ -101,10 +101,12 @@ SELECT citus_remove_node('localhost', :master_port);
(1 row)
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
count
---------------------------------------------------------------------
0
1
(1 row)
SELECT count(*) from pg_publication;
@ -127,6 +129,8 @@ SELECT count(*) from dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
count
---------------------------------------------------------------------
@ -136,13 +140,13 @@ SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
count
---------------------------------------------------------------------
0
1
(1 row)
SELECT count(*) from pg_replication_slots;
count
---------------------------------------------------------------------
0
1
(1 row)
SELECT count(*) from dist;
@ -180,4 +184,7 @@ SELECT count(*) from dist;
\c - - - :master_port
SET search_path TO logical_replication;
SET client_min_messages TO WARNING;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;

View File

@ -565,7 +565,7 @@ SELECT master_move_shard_placement(13000034, 'localhost', :worker_1_port, 'local
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 5 orphaned shards
NOTICE: cleaned up 1 orphaned shards
-- confirm the successfull move
SELECT * FROM run_command_on_placements('serial_move_test', 'SELECT DISTINCT key FROM %s WHERE key = 15') WHERE result = '15' AND shardid = 13000034;
nodename | nodeport | shardid | success | result

View File

@ -450,7 +450,7 @@ ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046".
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 24 orphaned resources
NOTICE: cleaned up 2 orphaned resources
-- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation";
@ -1018,7 +1018,7 @@ SELECT count(*) FROM test_colocated_table_2;
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources
NOTICE: cleaned up 3 orphaned resources
\c - postgres - :worker_1_port
-- show the foreign keys of the main table & its colocated shard on other tables
SELECT tbl.relname, fk."Constraint", fk."Definition"

View File

@ -260,6 +260,8 @@ SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_t
ERROR: table lineitem_streaming has already been isolated for the given value
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical');
ERROR: table orders_streaming has already been isolated for the given value
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 2 orphaned resources
-- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647
SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical');
isolate_tenant_to_new_shard
@ -273,6 +275,8 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', s
1230047
(1 row)
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 2 orphaned resources
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554;
count
---------------------------------------------------------------------
@ -450,7 +454,6 @@ ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046".
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 30 orphaned resources
-- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation";
@ -821,93 +824,6 @@ SET citus.override_table_visibility TO false;
(18 rows)
DROP EVENT TRIGGER abort_ddl;
-- create a trigger for drops
SET citus.enable_metadata_sync TO OFF;
CREATE OR REPLACE FUNCTION abort_drop_command()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
BEGIN
RAISE EXCEPTION 'command % is disabled', tg_tag;
END;
$$;
RESET citus.enable_metadata_sync;
CREATE EVENT TRIGGER abort_drop ON sql_drop
EXECUTE PROCEDURE abort_drop_command();
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical');
ERROR: command DROP SUBSCRIPTION is disabled
\set VERBOSITY default
-- check if metadata is changed
SELECT * FROM pg_dist_shard
WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass
ORDER BY shardminvalue::BIGINT, logicalrelid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
lineitem_streaming | 1230040 | t | -2147483648 | -2147483648
orders_streaming | 1230042 | t | -2147483648 | -2147483648
lineitem_streaming | 1230041 | t | -2147483647 | -136164586
orders_streaming | 1230043 | t | -2147483647 | -136164586
lineitem_streaming | 1230035 | t | -136164585 | -136164585
orders_streaming | 1230038 | t | -136164585 | -136164585
lineitem_streaming | 1230036 | t | -136164584 | -85071815
orders_streaming | 1230039 | t | -136164584 | -85071815
lineitem_streaming | 1230011 | t | -85071814 | -85071814
orders_streaming | 1230014 | t | -85071814 | -85071814
lineitem_streaming | 1230012 | t | -85071813 | -1
orders_streaming | 1230015 | t | -85071813 | -1
lineitem_streaming | 1230004 | t | 0 | 108199380
orders_streaming | 1230007 | t | 0 | 108199380
lineitem_streaming | 1230005 | t | 108199381 | 108199381
orders_streaming | 1230008 | t | 108199381 | 108199381
lineitem_streaming | 1230028 | t | 108199382 | 412880111
orders_streaming | 1230031 | t | 108199382 | 412880111
lineitem_streaming | 1230029 | t | 412880112 | 412880112
orders_streaming | 1230032 | t | 412880112 | 412880112
lineitem_streaming | 1230044 | t | 412880113 | 2147483646
orders_streaming | 1230046 | t | 412880113 | 2147483646
lineitem_streaming | 1230045 | t | 2147483647 | 2147483647
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
\c - - - :worker_1_port
SET search_path to "Tenant Isolation";
-- however, new tables are already created
SET citus.override_table_visibility TO false;
\d
List of relations
Schema | Name | Type | Owner
---------------------------------------------------------------------
Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230061 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230062 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230063 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230064 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230065 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230066 | table | mx_isolation_role_ent
Tenant Isolation | text_column | table | mx_isolation_role_ent
Tenant Isolation | text_column_1230052 | table | mx_isolation_role_ent
Tenant Isolation | text_column_1230053 | table | mx_isolation_role_ent
Tenant Isolation | text_column_1230054 | table | mx_isolation_role_ent
(24 rows)
\c - postgres - :worker_1_port
DROP EVENT TRIGGER abort_drop;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
-- tests for cluster health
@ -1130,7 +1046,7 @@ DROP TABLE test_colocated_table_no_rep_identity;
SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE', shard_transfer_mode => 'auto');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230108
1230102
(1 row)
SELECT count(*) FROM test_colocated_table_2;
@ -1141,7 +1057,7 @@ SELECT count(*) FROM test_colocated_table_2;
\c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 10 orphaned resources
NOTICE: cleaned up 3 orphaned resources
\c - postgres - :worker_1_port
-- show the foreign keys of the main table & its colocated shard on other tables
SELECT tbl.relname, fk."Constraint", fk."Definition"
@ -1152,47 +1068,47 @@ ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
test_colocated_table_1 | test_colocated_table_1_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_1_1230074 | test_colocated_table_1_id_fkey_1230074 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id)
test_colocated_table_1_1230076 | test_colocated_table_1_id_fkey_1230076 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id)
test_colocated_table_1_1230078 | test_colocated_table_1_id_fkey_1230078 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id)
test_colocated_table_1_1230104 | test_colocated_table_1_id_fkey_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id)
test_colocated_table_1_1230105 | test_colocated_table_1_id_fkey_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id)
test_colocated_table_1_1230106 | test_colocated_table_1_id_fkey_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id)
test_colocated_table_1_1230068 | test_colocated_table_1_id_fkey_1230068 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id)
test_colocated_table_1_1230070 | test_colocated_table_1_id_fkey_1230070 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id)
test_colocated_table_1_1230072 | test_colocated_table_1_id_fkey_1230072 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id)
test_colocated_table_1_1230098 | test_colocated_table_1_id_fkey_1230098 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id)
test_colocated_table_1_1230099 | test_colocated_table_1_id_fkey_1230099 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id)
test_colocated_table_1_1230100 | test_colocated_table_1_id_fkey_1230100 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id)
test_colocated_table_2 | test_colocated_table_2_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_2 | test_colocated_table_2_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id)
test_colocated_table_2_1230082 | test_colocated_table_2_id_fkey_1230082 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id)
test_colocated_table_2_1230082 | test_colocated_table_2_value_1_fkey_1230082 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230084 | test_colocated_table_2_id_fkey_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id)
test_colocated_table_2_1230084 | test_colocated_table_2_value_1_fkey_1230084 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230086 | test_colocated_table_2_id_fkey_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id)
test_colocated_table_2_1230086 | test_colocated_table_2_value_1_fkey_1230086 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230107 | test_colocated_table_2_id_fkey_1230107 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id)
test_colocated_table_2_1230107 | test_colocated_table_2_value_1_fkey_1230107 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230108 | test_colocated_table_2_id_fkey_1230108 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id)
test_colocated_table_2_1230108 | test_colocated_table_2_value_1_fkey_1230108 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230109 | test_colocated_table_2_id_fkey_1230109 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id)
test_colocated_table_2_1230109 | test_colocated_table_2_value_1_fkey_1230109 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_2_1230076 | test_colocated_table_2_id_fkey_1230076 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id)
test_colocated_table_2_1230076 | test_colocated_table_2_value_1_fkey_1230076 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_2_1230078 | test_colocated_table_2_id_fkey_1230078 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id)
test_colocated_table_2_1230078 | test_colocated_table_2_value_1_fkey_1230078 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_2_1230080 | test_colocated_table_2_id_fkey_1230080 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id)
test_colocated_table_2_1230080 | test_colocated_table_2_value_1_fkey_1230080 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_2_1230101 | test_colocated_table_2_id_fkey_1230101 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id)
test_colocated_table_2_1230101 | test_colocated_table_2_value_1_fkey_1230101 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_2_1230102 | test_colocated_table_2_id_fkey_1230102 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id)
test_colocated_table_2_1230102 | test_colocated_table_2_value_1_fkey_1230102 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_2_1230103 | test_colocated_table_2_id_fkey_1230103 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id)
test_colocated_table_2_1230103 | test_colocated_table_2_value_1_fkey_1230103 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3 | test_colocated_table_3_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_3 | test_colocated_table_3_id_fkey1 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2(id)
test_colocated_table_3 | test_colocated_table_3_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id)
test_colocated_table_3_1230090 | test_colocated_table_3_id_fkey1_1230090 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230082(id)
test_colocated_table_3_1230090 | test_colocated_table_3_id_fkey_1230090 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230074(id)
test_colocated_table_3_1230090 | test_colocated_table_3_value_1_fkey_1230090 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230092 | test_colocated_table_3_id_fkey1_1230092 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230084(id)
test_colocated_table_3_1230092 | test_colocated_table_3_id_fkey_1230092 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230076(id)
test_colocated_table_3_1230092 | test_colocated_table_3_value_1_fkey_1230092 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230094 | test_colocated_table_3_id_fkey1_1230094 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230086(id)
test_colocated_table_3_1230094 | test_colocated_table_3_id_fkey_1230094 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230078(id)
test_colocated_table_3_1230094 | test_colocated_table_3_value_1_fkey_1230094 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230110 | test_colocated_table_3_id_fkey1_1230110 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230107(id)
test_colocated_table_3_1230110 | test_colocated_table_3_id_fkey_1230110 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230104(id)
test_colocated_table_3_1230110 | test_colocated_table_3_value_1_fkey_1230110 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230111 | test_colocated_table_3_id_fkey1_1230111 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230108(id)
test_colocated_table_3_1230111 | test_colocated_table_3_id_fkey_1230111 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230105(id)
test_colocated_table_3_1230111 | test_colocated_table_3_value_1_fkey_1230111 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230112 | test_colocated_table_3_id_fkey1_1230112 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230109(id)
test_colocated_table_3_1230112 | test_colocated_table_3_id_fkey_1230112 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230106(id)
test_colocated_table_3_1230112 | test_colocated_table_3_value_1_fkey_1230112 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230071(id)
test_colocated_table_3_1230084 | test_colocated_table_3_id_fkey1_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230076(id)
test_colocated_table_3_1230084 | test_colocated_table_3_id_fkey_1230084 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230068(id)
test_colocated_table_3_1230084 | test_colocated_table_3_value_1_fkey_1230084 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3_1230086 | test_colocated_table_3_id_fkey1_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230078(id)
test_colocated_table_3_1230086 | test_colocated_table_3_id_fkey_1230086 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230070(id)
test_colocated_table_3_1230086 | test_colocated_table_3_value_1_fkey_1230086 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3_1230088 | test_colocated_table_3_id_fkey1_1230088 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230080(id)
test_colocated_table_3_1230088 | test_colocated_table_3_id_fkey_1230088 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230072(id)
test_colocated_table_3_1230088 | test_colocated_table_3_value_1_fkey_1230088 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3_1230104 | test_colocated_table_3_id_fkey1_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230101(id)
test_colocated_table_3_1230104 | test_colocated_table_3_id_fkey_1230104 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230098(id)
test_colocated_table_3_1230104 | test_colocated_table_3_value_1_fkey_1230104 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey1_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230102(id)
test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230099(id)
test_colocated_table_3_1230105 | test_colocated_table_3_value_1_fkey_1230105 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
test_colocated_table_3_1230106 | test_colocated_table_3_id_fkey1_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230103(id)
test_colocated_table_3_1230106 | test_colocated_table_3_id_fkey_1230106 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230100(id)
test_colocated_table_3_1230106 | test_colocated_table_3_value_1_fkey_1230106 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230065(id)
(42 rows)
\c - mx_isolation_role_ent - :master_port

View File

@ -196,7 +196,7 @@ $cmd$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,1)
(localhost,57638,t,1)
(localhost,57638,t,0)
(2 rows)
-- override the function for testing purpose

View File

@ -200,6 +200,8 @@ BEGIN;
SET citus.log_remote_commands TO ON;
SET SESSION citus.max_adaptive_executor_pool_size TO 5;
SELECT replicate_table_shards('dist_table_test_2', max_shard_copies := 4, shard_transfer_mode:='block_writes');
NOTICE: issuing CALL citus_cleanup_orphaned_resources()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CALL citus_cleanup_orphaned_shards()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: Copying shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
@ -1754,7 +1756,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
NOTICE: cleaned up 2 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1853,7 +1855,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 3 orphaned shards
NOTICE: cleaned up 1 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------
@ -1928,7 +1930,7 @@ NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
(1 row)
CALL citus_cleanup_orphaned_shards();
NOTICE: cleaned up 4 orphaned shards
NOTICE: cleaned up 1 orphaned shards
SELECT * FROM public.table_placements_per_node;
nodeport | logicalrelid | count
---------------------------------------------------------------------

View File

@ -7,7 +7,7 @@ setup
SELECT setval('pg_dist_shardid_seq', 1500000);
-- Cleanup any orphan shards that might be left over from a previous run.
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
@ -18,7 +18,7 @@ setup
teardown
{
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
DROP TABLE to_split_table;
}
@ -104,7 +104,7 @@ step "s1-release-split-advisory-lock"
step "s1-run-cleaner"
{
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
}
step "s1-show-pg_dist_cleanup"

View File

@ -14,11 +14,11 @@ setup
teardown
{
-- Cleanup any orphan shards that might be left over from a previous run.
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
DROP TABLE table_to_split CASCADE;
DROP TABLE reference_table CASCADE;

View File

@ -13,7 +13,7 @@ setup
SELECT setval('pg_dist_shardid_seq', 1500000);
-- Cleanup any orphan shards that might be left over from a previous run.
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
@ -24,7 +24,7 @@ setup
teardown
{
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
DROP TABLE to_split_table;
}
@ -64,7 +64,7 @@ step "s1-release-split-advisory-lock"
step "s1-run-cleaner"
{
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
}
step "s1-start-connection"

View File

@ -20,11 +20,11 @@ setup
teardown
{
-- Cleanup any orphan shards that might be left over from a previous run.
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
DROP TABLE table_to_split CASCADE;
DROP TABLE reference_table CASCADE;

View File

@ -19,11 +19,11 @@ setup
teardown
{
-- Cleanup any orphan shards that might be left over from a previous run.
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
DROP TABLE to_split_table CASCADE;
}

View File

@ -3,7 +3,7 @@
setup
{
SET citus.enable_metadata_sync TO off;
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
CREATE OR REPLACE FUNCTION run_try_drop_marked_resources()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
@ -83,7 +83,7 @@ step "s1-commit"
session "s2"
step "s2-drop-old-shards" {
SELECT run_try_drop_marked_shards();
SELECT run_try_drop_marked_resources();
}
step "s2-start-session-level-connection"

View File

@ -42,12 +42,21 @@ SELECT * FROM shards_in_workers;
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- Verify that the shard is not moved and the number of rows are still 100k
SELECT * FROM shards_in_workers;
SELECT count(*) FROM t;
-- Verify that shard can be moved after a temporary failure
-- cleanup leftovers, as it can cause flakiness in the following test files
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT * FROM shards_in_workers;
SELECT count(*) FROM t;

View File

@ -68,8 +68,20 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- try again
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- cancel on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* (ENABLE|DISABLE)").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- try again
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
@ -84,20 +96,50 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
-- failure on disabling subscription (right before dropping it)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- should succeed with warnings (subscription not dropped)
-- move the shard back
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
CALL citus_cleanup_orphaned_shards();
-- failure on setting lock_timeout (right before dropping subscriptions & replication slots)
SELECT citus.mitmproxy('conn.onQuery(query="^SET LOCAL lock_timeout").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- should succeed with warnings (objects not dropped)
-- move the shard back
SELECT master_move_shard_placement(101, 'localhost', :worker_2_proxy_port, 'localhost', :worker_1_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
CALL citus_cleanup_orphaned_shards();
-- cancellation on disabling subscription (right before dropping it)
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.allow()');
-- first, manually drop the subscsription object. But the record for it will remain on pg_dist_cleanup
SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$);
-- cleanup leftovers
-- verify we don't see any error for already dropped subscription
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- failure on creating the primary key
SELECT citus.mitmproxy('conn.onQuery(query="t_pkey").kill()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
@ -110,7 +152,10 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost'
SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- lets create few more indexes and fail with both
-- parallel mode and sequential mode
CREATE INDEX index_failure_2 ON t(id);
@ -125,8 +170,6 @@ SELECT pg_reload_conf();
SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()');
SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.allow()');
-- failure on parallel create index
ALTER SYSTEM RESET citus.max_adaptive_executor_pool_size;
SELECT pg_reload_conf();

View File

@ -81,11 +81,16 @@ SELECT create_distributed_table('table_to_split', 'id');
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()');
-- set log level to prevent flakiness
SET client_min_messages TO ERROR;
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
RESET client_min_messages;
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
-- we need to allow connection so that we can connect to proxy
@ -270,8 +275,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
-- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating
-- any resources.
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777;
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;

View File

@ -16,6 +16,9 @@ SET citus.max_adaptive_executor_pool_size TO 1;
SELECT pg_backend_pid() as pid \gset
SELECT citus.mitmproxy('conn.allow()');
-- cleanup leftovers if any
CALL citus_cleanup_orphaned_resources();
CREATE TABLE table_1 (id int PRIMARY KEY);
CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int);
@ -126,29 +129,53 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").cancel('
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- failure on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").killall()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- failure on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping publication
SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- failure on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- cancellation on dropping replication slot
SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');
-- cleanup leftovers
SELECT citus.mitmproxy('conn.allow()');
CALL citus_cleanup_orphaned_resources();
-- failure on foreign key creation
SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical');

View File

@ -82,9 +82,6 @@ SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER B
SELECT update_distributed_table_colocation('rep2', 'rep1');
SELECT logicalrelid FROM pg_dist_partition WHERE colocationid = 92448300 ORDER BY 1;
-- cannot copy from an orphaned shard
SELECT * FROM citus_copy_shard_placement(92448300, 'localhost', :worker_1_port, 'localhost', :master_port);
-- Make sure we don't send a query to the orphaned shard
BEGIN;
SET LOCAL citus.log_remote_commands TO ON;

View File

@ -55,6 +55,8 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho
SELECT citus_remove_node('localhost', :master_port);
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
@ -63,7 +65,8 @@ SELECT count(*) from dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
@ -81,4 +84,7 @@ SELECT count(*) from dist;
SET search_path TO logical_replication;
SET client_min_messages TO WARNING;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid DISABLE;
ALTER SUBSCRIPTION citus_shard_move_subscription_:postgres_oid SET (slot_name = NONE);
DROP SUBSCRIPTION citus_shard_move_subscription_:postgres_oid;
DROP SCHEMA logical_replication CASCADE;

View File

@ -175,10 +175,14 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE', shard_tra
SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE', shard_transfer_mode => 'force_logical');
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE', shard_transfer_mode => 'force_logical');
CALL pg_catalog.citus_cleanup_orphaned_resources();
-- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647
SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE', shard_transfer_mode => 'force_logical');
SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE', shard_transfer_mode => 'force_logical');
CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554;
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1686493264;
@ -393,45 +397,6 @@ SET citus.override_table_visibility TO false;
DROP EVENT TRIGGER abort_ddl;
-- create a trigger for drops
SET citus.enable_metadata_sync TO OFF;
CREATE OR REPLACE FUNCTION abort_drop_command()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
BEGIN
RAISE EXCEPTION 'command % is disabled', tg_tag;
END;
$$;
RESET citus.enable_metadata_sync;
CREATE EVENT TRIGGER abort_drop ON sql_drop
EXECUTE PROCEDURE abort_drop_command();
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical');
\set VERBOSITY default
-- check if metadata is changed
SELECT * FROM pg_dist_shard
WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass
ORDER BY shardminvalue::BIGINT, logicalrelid;
\c - - - :worker_1_port
SET search_path to "Tenant Isolation";
-- however, new tables are already created
SET citus.override_table_visibility TO false;
\d
\c - postgres - :worker_1_port
DROP EVENT TRIGGER abort_drop;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";