From e9508b26035963ff6203d48e73fb0597b19cc274 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Fri, 16 Sep 2022 09:39:08 -0700 Subject: [PATCH] Shard Split : Add / Update logging (#6336) DESCRIPTION: Improve logging during shard split and resource cleanup ### DESCRIPTION This PR makes logging improvements to Shard Split : 1. Update confusing logging to fix #6312 2. Added new `ereport(LOG` to make debugging easier as part of telemetry review. --- .../distributed/operations/shard_cleaner.c | 121 +++++++++++------- .../distributed/operations/shard_split.c | 66 +++++++++- .../operations/worker_split_copy_udf.c | 54 +++++++- src/include/distributed/shard_cleaner.h | 4 +- .../isolation_rebalancer_deferred_drop.out | 2 +- .../multi_colocated_shard_rebalance.out | 2 +- .../expected/multi_tenant_isolation.out | 4 +- .../multi_tenant_isolation_nonblocking.out | 2 +- 8 files changed, 201 insertions(+), 54 deletions(-) diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 9775099eb..98b6dd4c6 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -303,21 +303,40 @@ DropOrphanedShardsForCleanup() workerNode->workerName, workerNode->workerPort)) { + if (record->policy == CLEANUP_DEFERRED_ON_SUCCESS) + { + ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d " + "completed", + qualifiedTableName, + workerNode->workerName, workerNode->workerPort))); + } + else + { + ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d which " + "was left behind after a failed operation", + qualifiedTableName, + workerNode->workerName, workerNode->workerPort))); + } + /* delete the cleanup record */ DeleteCleanupRecordByRecordId(record->recordId); removedShardCountForCleanup++; } else { + /* + * We log failures at the end, since they occur repeatedly + * for a large number of objects. + */ failedShardCountForCleanup++; } } if (failedShardCountForCleanup > 0) { - ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d", - failedShardCountForCleanup, list_length( - cleanupRecordList)))); + ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d", + failedShardCountForCleanup, + list_length(cleanupRecordList)))); } return removedShardCountForCleanup; @@ -396,19 +415,29 @@ DropOrphanedShardsForMove(bool waitForLocks) shardPlacement->nodeName, shardPlacement->nodePort)) { + ereport(LOG, (errmsg("deferred drop of orphaned shard %s on %s:%d " + "after a move completed", + qualifiedTableName, + shardPlacement->nodeName, + shardPlacement->nodePort))); + /* delete the actual placement */ DeleteShardPlacementRow(placement->placementId); removedShardCount++; } else { + /* + * We log failures at the end, since they occur repeatedly + * for a large number of objects. + */ failedShardDropCount++; } } if (failedShardDropCount > 0) { - ereport(WARNING, (errmsg("Failed to drop %d orphaned shards out of %d", + ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d", failedShardDropCount, list_length(shardPlacementList)))); } @@ -436,7 +465,7 @@ RegisterOperationNeedingCleanup(void) * completion with failure. This will trigger cleanup of appropriate resources. */ void -FinalizeOperationNeedingCleanupOnFailure() +FinalizeOperationNeedingCleanupOnFailure(const char *operationName) { /* We must have a valid OperationId. Any operation requring cleanup * will call RegisterOperationNeedingCleanup. @@ -454,8 +483,9 @@ FinalizeOperationNeedingCleanupOnFailure() /* We only supporting cleaning shards right now */ if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT) { - ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", - record->objectType))); + ereport(WARNING, (errmsg( + "Invalid object type %d on failed operation cleanup", + record->objectType))); continue; } @@ -473,6 +503,12 @@ FinalizeOperationNeedingCleanupOnFailure() workerNode->workerName, workerNode->workerPort)) { + ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a " + "%s operation failed", + qualifiedTableName, + workerNode->workerName, workerNode->workerPort, + operationName))); + /* * Given the operation is failing and we will abort its transaction, we cannot delete * records in the current transaction. Delete these records outside of the @@ -483,23 +519,22 @@ FinalizeOperationNeedingCleanupOnFailure() } else { + /* + * We log failures at the end, since they occur repeatedly + * for a large number of objects. + */ failedShardCountOnComplete++; } } } - if (list_length(currentOperationRecordList) > 0) + if (failedShardCountOnComplete > 0) { - ereport(LOG, (errmsg("Removed %d orphaned shards out of %d", - removedShardCountOnComplete, list_length( - currentOperationRecordList)))); - - if (failedShardCountOnComplete > 0) - { - ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d", - failedShardCountOnComplete, list_length( - currentOperationRecordList)))); - } + ereport(WARNING, (errmsg("failed to clean up %d orphaned shards out of %d after " + "a %s operation failed", + failedShardCountOnComplete, + list_length(currentOperationRecordList), + operationName))); } } @@ -509,7 +544,7 @@ FinalizeOperationNeedingCleanupOnFailure() * completion with success. This will trigger cleanup of appropriate resources. */ void -FinalizeOperationNeedingCleanupOnSuccess() +FinalizeOperationNeedingCleanupOnSuccess(const char *operationName) { /* We must have a valid OperationId. Any operation requring cleanup * will call RegisterOperationNeedingCleanup. @@ -527,8 +562,9 @@ FinalizeOperationNeedingCleanupOnSuccess() /* We only supporting cleaning shards right now */ if (record->objectType != CLEANUP_OBJECT_SHARD_PLACEMENT) { - ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", - record->objectType))); + ereport(WARNING, (errmsg( + "Invalid object type %d on operation cleanup", + record->objectType))); continue; } @@ -546,6 +582,12 @@ FinalizeOperationNeedingCleanupOnSuccess() workerNode->workerName, workerNode->workerPort)) { + ereport(LOG, (errmsg("cleaned up orphaned shard %s on %s:%d after a " + "%s operation completed", + qualifiedTableName, + workerNode->workerName, workerNode->workerPort, + operationName))); + /* * Delete cleanup records outside transaction as: * The resources are marked as 'CLEANUP_ALWAYS' and should be cleaned no matter @@ -556,6 +598,10 @@ FinalizeOperationNeedingCleanupOnSuccess() } else { + /* + * We log failures at the end, since they occur repeatedly + * for a large number of objects. + */ failedShardCountOnComplete++; } } @@ -570,18 +616,14 @@ FinalizeOperationNeedingCleanupOnSuccess() } } - if (list_length(currentOperationRecordList) > 0) + if (failedShardCountOnComplete > 0) { - ereport(LOG, (errmsg("Removed %d orphaned shards out of %d", - removedShardCountOnComplete, list_length( - currentOperationRecordList)))); - - if (failedShardCountOnComplete > 0) - { - ereport(WARNING, (errmsg("Failed to cleanup %d shards out of %d", - failedShardCountOnComplete, list_length( - currentOperationRecordList)))); - } + ereport(WARNING, (errmsg( + "failed to clean up %d orphaned shards out of %d after " + "a %s operation completed", + failedShardCountOnComplete, + list_length(currentOperationRecordList), + operationName))); } } @@ -727,18 +769,11 @@ TryLockRelationAndPlacementCleanup(Oid relationId, LOCKMODE lockmode) * true on success. */ static bool -TryDropShardOutsideTransaction(OperationId operationId, char *qualifiedTableName, - char *nodeName, int nodePort) +TryDropShardOutsideTransaction(OperationId operationId, + char *qualifiedTableName, + char *nodeName, + int nodePort) { - char *operation = (operationId == INVALID_OPERATION_ID) ? "move" : "cleanup"; - - ereport(LOG, (errmsg("cleaning up %s on %s:%d which was left " - "after a %s", - qualifiedTableName, - nodeName, - nodePort, - operation))); - /* prepare sql query to execute to drop the shard */ StringInfo dropQuery = makeStringInfo(); appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, qualifiedTableName); diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 0c30d413b..d8dcb4cbc 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -169,6 +169,12 @@ static const char *const SplitOperationName[] = [ISOLATE_TENANT_TO_NEW_SHARD] = "isolate", [CREATE_DISTRIBUTED_TABLE] = "create" }; +static const char *const SplitOperationAPIName[] = +{ + [SHARD_SPLIT_API] = "citus_split_shard_by_split_points", + [ISOLATE_TENANT_TO_NEW_SHARD] = "isolate_tenant_to_new_shard", + [CREATE_DISTRIBUTED_TABLE] = "create_distributed_table_concurrently" +}; static const char *const SplitTargetName[] = { [SHARD_SPLIT_API] = "shard", @@ -469,6 +475,8 @@ SplitShard(SplitMode splitMode, List *colocatedShardIntervalList, uint32 targetColocationId) { + const char *operationName = SplitOperationAPIName[splitOperation]; + ErrorIfModificationAndSplitInTheSameTransaction(splitOperation); ShardInterval *shardIntervalToSplit = LoadShardInterval(shardIdToSplit); @@ -526,6 +534,8 @@ SplitShard(SplitMode splitMode, if (splitMode == BLOCKING_SPLIT) { + ereport(LOG, (errmsg("performing blocking %s ", operationName))); + BlockingShardSplit( splitOperation, splitWorkflowId, @@ -536,6 +546,8 @@ SplitShard(SplitMode splitMode, } else { + ereport(LOG, (errmsg("performing non-blocking %s ", operationName))); + NonBlockingShardSplit( splitOperation, splitWorkflowId, @@ -548,7 +560,10 @@ SplitShard(SplitMode splitMode, PlacementMovedUsingLogicalReplicationInTX = true; } - FinalizeOperationNeedingCleanupOnSuccess(); + /* + * Drop temporary objects that were marked as CLEANUP_ALWAYS. + */ + FinalizeOperationNeedingCleanupOnSuccess(operationName); } @@ -569,6 +584,8 @@ BlockingShardSplit(SplitOperation splitOperation, List *workersForPlacementList, DistributionColumnMap *distributionColumnOverrides) { + const char *operationName = SplitOperationAPIName[splitOperation]; + BlockWritesToShardList(sourceColocatedShardIntervalList); /* First create shard interval metadata for split children */ @@ -583,10 +600,14 @@ BlockingShardSplit(SplitOperation splitOperation, PG_TRY(); { + ereport(LOG, (errmsg("creating child shards for %s", operationName))); + /* Physically create split children. */ CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); + ereport(LOG, (errmsg("performing copy for %s", operationName))); + /* For Blocking split, copy isn't snapshotted */ char *snapshotName = NULL; DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, @@ -596,6 +617,10 @@ BlockingShardSplit(SplitOperation splitOperation, /* Used for testing */ ConflictOnlyWithIsolationTesting(); + ereport(LOG, (errmsg( + "creating auxillary structures (indexes, stats, replicaindentities, triggers) for %s", + operationName))); + /* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList, @@ -617,10 +642,16 @@ BlockingShardSplit(SplitOperation splitOperation, */ if (DeferShardDeleteOnSplit) { + ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s", + operationName))); + InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList); } else { + ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s", + operationName))); + DropShardList(sourceColocatedShardIntervalList); } @@ -635,6 +666,9 @@ BlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, workersForPlacementList); + ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s", + operationName))); + /* * Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign @@ -649,7 +683,7 @@ BlockingShardSplit(SplitOperation splitOperation, ShutdownAllConnections(); /* Do a best effort cleanup of shards created on workers in the above block */ - FinalizeOperationNeedingCleanupOnFailure(); + FinalizeOperationNeedingCleanupOnFailure(operationName); PG_RE_THROW(); } @@ -1494,6 +1528,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, DistributionColumnMap *distributionColumnOverrides, uint32 targetColocationId) { + const char *operationName = SplitOperationAPIName[splitOperation]; + ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction(); char *superUser = CitusExtensionOwnerName(); @@ -1536,6 +1572,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Non-Blocking shard split workflow starts here */ PG_TRY(); { + ereport(LOG, (errmsg("creating child shards for %s", + operationName))); + /* 1) Physically create split children. */ CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1565,6 +1604,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList); + ereport(LOG, (errmsg( + "creating replication artifacts (publications, replication slots, subscriptions for %s", + operationName))); + /* 4) Create Publications. */ CreatePublications(sourceConnection, publicationInfoHash); @@ -1613,11 +1656,15 @@ NonBlockingShardSplit(SplitOperation splitOperation, databaseName, logicalRepTargetList); + ereport(LOG, (errmsg("performing copy for %s", operationName))); + /* 8) Do snapshotted Copy */ DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapshot, distributionColumnOverrides); + ereport(LOG, (errmsg("replicating changes for %s", operationName))); + /* * 9) Logically replicate all the changes and do most of the table DDL, * like index and foreign key creation. @@ -1638,10 +1685,16 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ if (DeferShardDeleteOnSplit) { + ereport(LOG, (errmsg("marking deferred cleanup of source shard(s) for %s", + operationName))); + InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList); } else { + ereport(LOG, (errmsg("performing cleanup of source shard(s) for %s", + operationName))); + DropShardList(sourceColocatedShardIntervalList); } @@ -1690,6 +1743,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ CreatePartitioningHierarchy(logicalRepTargetList); + ereport(LOG, (errmsg("creating foreign key constraints (if any) for %s", + operationName))); + /* * 14) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign @@ -1723,7 +1779,11 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ DropAllLogicalReplicationLeftovers(SHARD_SPLIT); - FinalizeOperationNeedingCleanupOnFailure(); + /* + * Drop temporary objects that were marked as CLEANUP_ON_FAILURE + * or CLEANUP_ALWAYS. + */ + FinalizeOperationNeedingCleanupOnFailure(operationName); PG_RE_THROW(); } diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c index fe20aeca2..b96475992 100644 --- a/src/backend/distributed/operations/worker_split_copy_udf.c +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -43,6 +43,10 @@ static DestReceiver * CreatePartitionedSplitCopyDestReceiver(EState *executor, List *splitCopyInfoList); static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray, ArrayType **maxValueArray); +static char * TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName, + char *sourceShardToCopyPrefix, + char *sourceShardToCopyQualifiedName, + List *splitCopyInfoList); /* * worker_split_copy(source_shard_id bigint, splitCopyInfo pg_catalog.split_copy_info[]) @@ -93,12 +97,18 @@ worker_split_copy(PG_FUNCTION_ARGS) Oid sourceShardToCopySchemaOId = get_rel_namespace( shardIntervalToSplitCopy->relationId); char *sourceShardToCopySchemaName = get_namespace_name(sourceShardToCopySchemaOId); - char *sourceShardToCopyName = get_rel_name(shardIntervalToSplitCopy->relationId); + char *sourceShardPrefix = get_rel_name(shardIntervalToSplitCopy->relationId); + char *sourceShardToCopyName = pstrdup(sourceShardPrefix); AppendShardIdToName(&sourceShardToCopyName, shardIdToSplitCopy); char *sourceShardToCopyQualifiedName = quote_qualified_identifier( sourceShardToCopySchemaName, sourceShardToCopyName); + ereport(LOG, (errmsg("%s", TraceWorkerSplitCopyUdf(sourceShardToCopySchemaName, + sourceShardPrefix, + sourceShardToCopyQualifiedName, + splitCopyInfoList)))); + StringInfo selectShardQueryForCopy = makeStringInfo(); appendStringInfo(selectShardQueryForCopy, "SELECT * FROM %s;", sourceShardToCopyQualifiedName); @@ -113,6 +123,48 @@ worker_split_copy(PG_FUNCTION_ARGS) } +/* Trace split copy udf */ +static char * +TraceWorkerSplitCopyUdf(char *sourceShardToCopySchemaName, + char *sourceShardToCopyPrefix, + char *sourceShardToCopyQualifiedName, + List *splitCopyInfoList) +{ + StringInfo splitCopyTrace = makeStringInfo(); + appendStringInfo(splitCopyTrace, "performing copy from shard %s to [", + sourceShardToCopyQualifiedName); + + /* split copy always has atleast two destinations */ + int index = 1; + int splitWayCount = list_length(splitCopyInfoList); + SplitCopyInfo *splitCopyInfo = NULL; + foreach_ptr(splitCopyInfo, splitCopyInfoList) + { + char *shardNameCopy = pstrdup(sourceShardToCopyPrefix); + AppendShardIdToName(&shardNameCopy, splitCopyInfo->destinationShardId); + + char *shardNameCopyQualifiedName = quote_qualified_identifier( + sourceShardToCopySchemaName, + shardNameCopy); + + appendStringInfo(splitCopyTrace, "%s (nodeId: %u)", shardNameCopyQualifiedName, + splitCopyInfo->destinationShardNodeId); + pfree(shardNameCopy); + + if (index < splitWayCount) + { + appendStringInfo(splitCopyTrace, ", "); + } + + index++; + } + + appendStringInfo(splitCopyTrace, "]"); + + return splitCopyTrace->data; +} + + /* Parse a single SplitCopyInfo Tuple */ static void ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo **splitCopyInfo) diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index d5d2a3617..55633f653 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -103,13 +103,13 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType, * completion on failure. This will trigger cleanup of appropriate resources * and cleanup records. */ -extern void FinalizeOperationNeedingCleanupOnFailure(void); +extern void FinalizeOperationNeedingCleanupOnFailure(const char *operationName); /* * FinalizeOperationNeedingCleanupOnSuccess is be called by an operation to signal * completion on success. This will trigger cleanup of appropriate resources * and cleanup records. */ -extern void FinalizeOperationNeedingCleanupOnSuccess(void); +extern void FinalizeOperationNeedingCleanupOnSuccess(const char *operationName); #endif /*CITUS_SHARD_CLEANER_H */ diff --git a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out index 6963e9122..169138143 100644 --- a/src/test/regress/expected/isolation_rebalancer_deferred_drop.out +++ b/src/test/regress/expected/isolation_rebalancer_deferred_drop.out @@ -91,7 +91,7 @@ step s1-drop-marked-shards: s1: WARNING: canceling statement due to lock timeout step s1-drop-marked-shards: <... completed> -s1: WARNING: Failed to drop 1 orphaned shards out of 1 +s1: WARNING: failed to clean up 1 orphaned shards out of 1 step s1-commit: COMMIT; diff --git a/src/test/regress/expected/multi_colocated_shard_rebalance.out b/src/test/regress/expected/multi_colocated_shard_rebalance.out index da3bae484..48e7b46e7 100644 --- a/src/test/regress/expected/multi_colocated_shard_rebalance.out +++ b/src/test/regress/expected/multi_colocated_shard_rebalance.out @@ -740,7 +740,7 @@ DETAIL: from localhost:xxxxx (1 row) CALL citus_cleanup_orphaned_shards(); -LOG: cleaning up public.test_with_pkey_13000042 on localhost:xxxxx which was left after a move +LOG: deferred drop of orphaned shard public.test_with_pkey_13000042 on localhost:xxxxx after a move completed NOTICE: cleaned up 1 orphaned shards SET client_min_messages TO DEFAULT; -- we don't support multiple shard moves in a single transaction diff --git a/src/test/regress/expected/multi_tenant_isolation.out b/src/test/regress/expected/multi_tenant_isolation.out index 3f6537e79..021b43972 100644 --- a/src/test/regress/expected/multi_tenant_isolation.out +++ b/src/test/regress/expected/multi_tenant_isolation.out @@ -758,7 +758,7 @@ SET search_path to "Tenant Isolation"; \set VERBOSITY terse SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'block_writes'); WARNING: command DROP TABLE is disabled -WARNING: Failed to cleanup 1 shards out of 1 +WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed ERROR: command CREATE TABLE is disabled \set VERBOSITY default \c - postgres - :worker_1_port @@ -811,7 +811,7 @@ WARNING: command DROP TABLE is disabled WARNING: command DROP TABLE is disabled WARNING: command DROP TABLE is disabled WARNING: command DROP TABLE is disabled -WARNING: Failed to cleanup 6 shards out of 6 +WARNING: failed to clean up 6 orphaned shards out of 6 after a isolate_tenant_to_new_shard operation failed ERROR: command DROP TABLE is disabled \set VERBOSITY default -- check if metadata is changed diff --git a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out index e8feb8f23..392da23a0 100644 --- a/src/test/regress/expected/multi_tenant_isolation_nonblocking.out +++ b/src/test/regress/expected/multi_tenant_isolation_nonblocking.out @@ -790,7 +790,7 @@ SET search_path to "Tenant Isolation"; \set VERBOSITY terse SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE', shard_transfer_mode => 'force_logical'); WARNING: command DROP TABLE is disabled -WARNING: Failed to cleanup 1 shards out of 1 +WARNING: failed to clean up 1 orphaned shards out of 1 after a isolate_tenant_to_new_shard operation failed ERROR: command CREATE TABLE is disabled \set VERBOSITY default \c - postgres - :worker_1_port