Shard Split : Add / Update logging (#6336)

DESCRIPTION: Improve logging during shard split and resource cleanup

### DESCRIPTION

This PR makes logging improvements to Shard Split : 

1. Update confusing logging to fix #6312
2. Added new `ereport(LOG` to make debugging easier as part of telemetry review.
naisila/views
Nitish Upreti 2022-09-16 09:39:08 -07:00 committed by GitHub
parent 8b5cdaf0e9
commit e9508b2603
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 201 additions and 54 deletions

View File

@ -303,21 +303,40 @@ DropOrphanedShardsForCleanup()
workerNode->workerName,
workerNode->workerPort))
{
if (record->policy == CLEANUP_DEFERRED_ON_SUCCESS)
{
ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d "
"completed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort)));
}
else
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d which "
"was left behind after a failed operation",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort)));
}
/* delete the cleanup record */
DeleteCleanupRecordByRecordId(record->recordId);
removedShardCountForCleanup++;
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountForCleanup++;
}
}
if (failedShardCountForCleanup > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountForCleanup, list_length(
cleanupRecordList))));
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d",
failedShardCountForCleanup,
list_length(cleanupRecordList))));
}
return removedShardCountForCleanup;
@ -396,19 +415,29 @@ DropOrphanedShardsForMove(bool waitForLocks)
shardPlacement->nodeName,
shardPlacement->nodePort))
{
ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d "
"after a move completed",
qualifiedTableName,
shardPlacement->nodeName,
shardPlacement->nodePort)));
/* delete the actual placement */
DeleteShardPlacementRow(placement->placementId);
removedShardCount++;
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardDropCount++;
}
}
if (failedShardDropCount > 0)
{
ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d",
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d",
failedShardDropCount, list_length(shardPlacementList))));
}
@ -436,7 +465,7 @@ RegisterOperationNeedingCleanup(void)
* completion with failure. This will trigger cleanup of appropriate resources.
*/
void
FinalizeOperationNeedingCleanupOnFailure()
FinalizeOperationNeedingCleanupOnFailure(const char *operationName)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
@ -454,8 +483,9 @@ FinalizeOperationNeedingCleanupOnFailure()
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
ereport(WARNING, (errmsg(
"Invalid object type %d on failed operation cleanup",
record->objectType)));
continue;
}
@ -473,6 +503,12 @@ FinalizeOperationNeedingCleanupOnFailure()
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation failed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Given the operation is failing and we will abort its transaction, we cannot delete
* records in the current transaction. Delete these records outside of the
@ -483,23 +519,22 @@ FinalizeOperationNeedingCleanupOnFailure()
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountOnComplete++;
}
}
}
if (list_length(currentOperationRecordList) > 0)
if (failedShardCountOnComplete > 0)
{
ereport(LOG, (errmsg("Removed %d orphaned shards out of %d",
removedShardCountOnComplete, list_length(
currentOperationRecordList))));
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountOnComplete, list_length(
currentOperationRecordList))));
}
ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d after "
"a %s operation failed",
failedShardCountOnComplete,
list_length(currentOperationRecordList),
operationName)));
}
}
@ -509,7 +544,7 @@ FinalizeOperationNeedingCleanupOnFailure()
* completion with success. This will trigger cleanup of appropriate resources.
*/
void
FinalizeOperationNeedingCleanupOnSuccess()
FinalizeOperationNeedingCleanupOnSuccess(const char *operationName)
{
/* We must have a valid OperationId. Any operation requring cleanup
* will call RegisterOperationNeedingCleanup.
@ -527,8 +562,9 @@ FinalizeOperationNeedingCleanupOnSuccess()
/* We only supporting cleaning shards right now */
if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT)
{
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
record->objectType)));
ereport(WARNING, (errmsg(
"Invalid object type %d on operation cleanup",
record->objectType)));
continue;
}
@ -546,6 +582,12 @@ FinalizeOperationNeedingCleanupOnSuccess()
workerNode->workerName,
workerNode->workerPort))
{
ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a "
"%s operation completed",
qualifiedTableName,
workerNode->workerName, workerNode->workerPort,
operationName)));
/*
* Delete cleanup records outside transaction as:
* The resources are marked as 'CLEANUP_ALWAYS' and should be cleaned no matter
@ -556,6 +598,10 @@ FinalizeOperationNeedingCleanupOnSuccess()
}
else
{
/*
* We log failures at the end, since they occur repeatedly
* for a large number of objects.
*/
failedShardCountOnComplete++;
}
}
@ -570,18 +616,14 @@ FinalizeOperationNeedingCleanupOnSuccess()
}
}
if (list_length(currentOperationRecordList) > 0)
if (failedShardCountOnComplete > 0)
{
ereport(LOG, (errmsg("Removed %d orphaned shards out of %d",
removedShardCountOnComplete, list_length(
currentOperationRecordList))));
if (failedShardCountOnComplete > 0)
{
ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d",
failedShardCountOnComplete, list_length(
currentOperationRecordList))));
}
ereport(WARNING, (errmsg(
"failed to clean up %d orphaned shards out of %d after "
"a %s operation completed",
failedShardCountOnComplete,
list_length(currentOperationRecordList),
operationName)));
}
}
@ -727,18 +769,11 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode)
* true on success.
*/
static bool
TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName,
char *nodeName, int nodePort)
TryDropShardOutsideTransaction(OperationId operationId,
char *qualifiedTableName,
char *nodeName,
int nodePort)
{
char *operation = (operationId == INVALID_OPERATION_ID) ? "move" : "cleanup";
ereport(LOG, (errmsg("cleaning up %s on %s:%d which was left "
"after a %s",
qualifiedTableName,
nodeName,
nodePort,
operation)));
/* prepare sql query to execute to drop the shard */
StringInfo dropQuery = makeStringInfo();
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName);

View File

@ -169,6 +169,12 @@ static const char *const SplitOperationName[] =
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate",
[CREATE_DISTRIBUTED_TABLE] = "create"
};
static const char *const SplitOperationAPIName[] =
{
[SHARD_SPLIT_API] = "citus_split_shard_by_split_points",
[ISOLATE_TENANT_TO_NEW_SHARD] = "isolate_tenant_to_new_shard",
[CREATE_DISTRIBUTED_TABLE] = "create_distributed_table_concurrently"
};
static const char *const SplitTargetName[] =
{
[SHARD_SPLIT_API] = "shard",
@ -469,6 +475,8 @@ SplitShard(SplitMode splitMode,
List *colocatedShardIntervalList,
uint32 targetColocationId)
{
const char *operationName = SplitOperationAPIName[splitOperation];
ErrorIfModificationAndSplitInTheSameTransaction(splitOperation);
ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit);
@ -526,6 +534,8 @@ SplitShard(SplitMode splitMode,
if (splitMode == BLOCKING_SPLIT)
{
ereport(LOG, (errmsg("performing blocking %s ", operationName)));
BlockingShardSplit(
splitOperation,
splitWorkflowId,
@ -536,6 +546,8 @@ SplitShard(SplitMode splitMode,
}
else
{
ereport(LOG, (errmsg("performing non-blocking %s ", operationName)));
NonBlockingShardSplit(
splitOperation,
splitWorkflowId,
@ -548,7 +560,10 @@ SplitShard(SplitMode splitMode,
PlacementMovedUsingLogicalReplicationInTX = true;
}
FinalizeOperationNeedingCleanupOnSuccess();
/*
* Drop temporary objects that were marked as CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnSuccess(operationName);
}
@ -569,6 +584,8 @@ BlockingShardSplit(SplitOperation splitOperation,
List *workersForPlacementList,
DistributionColumnMap *distributionColumnOverrides)
{
const char *operationName = SplitOperationAPIName[splitOperation];
BlockWritesToShardList(sourceColocatedShardIntervalList);
/* First create shard interval metadata for split children */
@ -583,10 +600,14 @@ BlockingShardSplit(SplitOperation splitOperation,
PG_TRY();
{
ereport(LOG, (errmsg("creating child shards for %s", operationName)));
/* Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
@ -596,6 +617,10 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Used for testing */
ConflictOnlyWithIsolationTesting();
ereport(LOG, (errmsg(
"creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s",
operationName)));
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList,
@ -617,10 +642,16 @@ BlockingShardSplit(SplitOperation splitOperation,
*/
if (DeferShardDeleteOnSplit)
{
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
}
else
{
ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s",
operationName)));
DropShardList(sourceColocatedShardIntervalList);
}
@ -635,6 +666,9 @@ BlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList,
workersForPlacementList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
@ -649,7 +683,7 @@ BlockingShardSplit(SplitOperation splitOperation,
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
FinalizeOperationNeedingCleanupOnFailure();
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}
@ -1494,6 +1528,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DistributionColumnMap *distributionColumnOverrides,
uint32 targetColocationId)
{
const char *operationName = SplitOperationAPIName[splitOperation];
ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction();
char *superUser = CitusExtensionOwnerName();
@ -1536,6 +1572,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Non-Blocking shard split workflow starts here */
PG_TRY();
{
ereport(LOG, (errmsg("creating child shards for %s",
operationName)));
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
@ -1565,6 +1604,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
ereport(LOG, (errmsg(
"creating replication artifacts (publications, replication slots, subscriptions for %s",
operationName)));
/* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash);
@ -1613,11 +1656,15 @@ NonBlockingShardSplit(SplitOperation splitOperation,
databaseName,
logicalRepTargetList);
ereport(LOG, (errmsg("performing copy for %s", operationName)));
/* 8) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapshot, distributionColumnOverrides);
ereport(LOG, (errmsg("replicating changes for %s", operationName)));
/*
* 9) Logically replicate all the changes and do most of the table DDL,
* like index and foreign key creation.
@ -1638,10 +1685,16 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
if (DeferShardDeleteOnSplit)
{
ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s",
operationName)));
InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList);
}
else
{
ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s",
operationName)));
DropShardList(sourceColocatedShardIntervalList);
}
@ -1690,6 +1743,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreatePartitioningHierarchy(logicalRepTargetList);
ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s",
operationName)));
/*
* 14) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
@ -1723,7 +1779,11 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
FinalizeOperationNeedingCleanupOnFailure();
/*
* Drop temporary objects that were marked as CLEANUP_ON_FAILURE
* or CLEANUP_ALWAYS.
*/
FinalizeOperationNeedingCleanupOnFailure(operationName);
PG_RE_THROW();
}

View File

@ -43,6 +43,10 @@ static DestReceiver * CreatePartitionedSplitCopyDestReceiver(EState *executor,
List *splitCopyInfoList);
static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray,
ArrayType **maxValueArray);
static char * TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName,
char *sourceShardToCopyPrefix,
char *sourceShardToCopyQualifiedName,
List *splitCopyInfoList);
/*
* worker_split_copy(source_shard_id bigint, splitCopyInfo pg_catalog.split_copy_info[])
@ -93,12 +97,18 @@ worker_split_copy(PG_FUNCTION_ARGS)
Oid sourceShardToCopySchemaOId = get_rel_namespace(
shardIntervalToSplitCopy->relationId);
char *sourceShardToCopySchemaName = get_namespace_name(sourceShardToCopySchemaOId);
char *sourceShardToCopyName = get_rel_name(shardIntervalToSplitCopy->relationId);
char *sourceShardPrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
char *sourceShardToCopyName = pstrdup(sourceShardPrefix);
AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy);
char *sourceShardToCopyQualifiedName = quote_qualified_identifier(
sourceShardToCopySchemaName,
sourceShardToCopyName);
ereport(LOG, (errmsg("%s", TraceWorkerSplitCopyUdf(sourceShardToCopySchemaName,
sourceShardPrefix,
sourceShardToCopyQualifiedName,
splitCopyInfoList))));
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", sourceShardToCopyQualifiedName);
@ -113,6 +123,48 @@ worker_split_copy(PG_FUNCTION_ARGS)
}
/* Trace split copy udf */
static char *
TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName,
char *sourceShardToCopyPrefix,
char *sourceShardToCopyQualifiedName,
List *splitCopyInfoList)
{
StringInfo splitCopyTrace = makeStringInfo();
appendStringInfo(splitCopyTrace, "performing copy from shard %s to [",
sourceShardToCopyQualifiedName);
/* split copy always has atleast two destinations */
int index = 1;
int splitWayCount = list_length(splitCopyInfoList);
SplitCopyInfo *splitCopyInfo = NULL;
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *shardNameCopy = pstrdup(sourceShardToCopyPrefix);
AppendShardIdToName(&shardNameCopy, splitCopyInfo->destinationShardId);
char *shardNameCopyQualifiedName = quote_qualified_identifier(
sourceShardToCopySchemaName,
shardNameCopy);
appendStringInfo(splitCopyTrace, "%s (nodeId: %u)", shardNameCopyQualifiedName,
splitCopyInfo->destinationShardNodeId);
pfree(shardNameCopy);
if (index < splitWayCount)
{
appendStringInfo(splitCopyTrace, ", ");
}
index++;
}
appendStringInfo(splitCopyTrace, "]");
return splitCopyTrace->data;
}
/* Parse a single SplitCopyInfo Tuple */
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo)

View File

@ -103,13 +103,13 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
* completion on failure. This will trigger cleanup of appropriate resources
* and cleanup records.
*/
extern void FinalizeOperationNeedingCleanupOnFailure(void);
extern void FinalizeOperationNeedingCleanupOnFailure(const char *operationName);
/*
* FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal
* completion on success. This will trigger cleanup of appropriate resources
* and cleanup records.
*/
extern void FinalizeOperationNeedingCleanupOnSuccess(void);
extern void FinalizeOperationNeedingCleanupOnSuccess(const char *operationName);
#endif /*CITUS_SHARD_CLEANER_H */

View File

@ -91,7 +91,7 @@ step s1-drop-marked-shards:
<waiting ...>
s1: WARNING: canceling statement due to lock timeout
step s1-drop-marked-shards: <... completed>
s1: WARNING: Failed to drop 1 orphaned shards out of 1
s1: WARNING: failed to clean up 1 orphaned shards out of 1
step s1-commit:
COMMIT;

View File

@ -740,7 +740,7 @@ DETAIL: from localhost:xxxxx
(1 row)
CALL citus_cleanup_orphaned_shards();
LOG: cleaning up public.test_with_pkey_13000042 on localhost:xxxxx which was left after a move
LOG: deferred drop of orphaned shard public.test_with_pkey_13000042 on localhost:xxxxx after a move completed
NOTICE: cleaned up 1 orphaned shards
SET client_min_messages TO DEFAULT;
-- we don't support multiple shard moves in a single transaction

View File

@ -758,7 +758,7 @@ SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'block_writes');
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 1 shards out of 1
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port
@ -811,7 +811,7 @@ WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 6 shards out of 6
WARNING: failed to clean up 6 orphaned shards out of 6 after a isolate_tenant_to_new_shard operation failed
ERROR: command DROP TABLE is disabled
\set VERBOSITY default
-- check if metadata is changed

View File

@ -790,7 +790,7 @@ SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical');
WARNING: command DROP TABLE is disabled
WARNING: Failed to cleanup 1 shards out of 1
WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed
ERROR: command CREATE TABLE is disabled
\set VERBOSITY default
\c - postgres - :worker_1_port