diff --git a/null.d b/null.d new file mode 100644 index 000000000..ff21f306e --- /dev/null +++ b/null.d @@ -0,0 +1 @@ +null.o: /dev/null diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index 5ed339bca..780e6f22e 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -42,22 +42,22 @@ int NextCleanupRecordId = 0; */ typedef struct CleanupRecord { - /* unique identifier of the record (for deletes) */ + /* unique identifier of the record */ uint64 recordId; - /* identifier of the operation that generated the record (must not be in progress) */ + /* identifier of the operation that generated the record */ OperationId operationId; - /* type of the object (e.g. shard placement) */ + /* type of the object (e.g. shard) */ CleanupObject objectType; /* fully qualified name of the object */ char *objectName; - /* node grou ID on which the object is located */ + /* node group ID on which the object is located */ int nodeGroupId; - /* cleanup policy */ + /* cleanup policy that determines when object is cleaned */ CleanupPolicy policy; } CleanupRecord; @@ -157,8 +157,9 @@ DropOrphanedShardsInSeparateTransaction(void) /* - * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove that catches - * any errors to make it safe to use in the maintenance daemon. + * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove and + * DropOrphanedShardsForCleanup that catches any errors to make it safe to + * use in the maintenance daemon. * * If dropping any of the shards failed this function returns -1, otherwise it * returns the number of dropped shards. @@ -206,7 +207,7 @@ DropOrphanedShardsForCleanup() foreach_ptr(record, cleanupRecordList) { - /* We only supporting cleaning shards right now */ + /* We only support one resource type at the moment */ if (record->objectType != CLEANUP_SHARD_PLACEMENT) { ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", @@ -214,6 +215,7 @@ DropOrphanedShardsForCleanup() continue; } + /* Advisory locks are reentrant */ if (!TryLockOperationId(record->operationId)) { /* operation that the cleanup record is part of is still running */ @@ -222,12 +224,14 @@ DropOrphanedShardsForCleanup() char *qualifiedTableName = record->objectName; WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); + + /* + * Now that we have the lock, check if record exists. + * The operation could have completed successfully just after we called + * ListCleanupRecords in which case the record will be now gone. + */ if (!CleanupRecordExists(record->recordId)) { - /* - * The operation completed successfully just after we called - * ListCleanupRecords in which case the record is now gone. - */ continue; } @@ -345,6 +349,10 @@ DropOrphanedShardsForMove(bool waitForLocks) } +/* + * StartNewOperationNeedingCleanup is be called by an operation to register + * for cleanup. + */ OperationId StartNewOperationNeedingCleanup(void) { @@ -356,6 +364,10 @@ StartNewOperationNeedingCleanup(void) } +/* + * CompleteNewOperationNeedingCleanup is be called by an operation to signal + * completion. This will trigger cleanup of appropriate resources. + */ void CompleteNewOperationNeedingCleanup(bool isSuccess) { @@ -517,6 +529,9 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType, } +/* + * DeleteCleanupRecordByRecordId deletes a cleanup record by record id. + */ static void DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) { @@ -669,17 +684,23 @@ ListCleanupRecords(void) /* - * ListCleanupRecordsForCurrentOperation lists all the current cleanup records. + * ListCleanupRecordsForCurrentOperation lists all the cleanup records for + * current operation. */ static List * ListCleanupRecordsForCurrentOperation(void) { + /* We must have a valid OperationId. Any operation requring cleanup + * will call StartNewOperationNeedingCleanup. + */ + Assert(CurrentOperationId != INVALID_OPERATION_ID); + Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup); ScanKeyData scanKey[1]; ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_operation_id, BTEqualStrategyNumber, - F_OIDEQ, CurrentOperationId); + F_INT8EQ, UInt64GetDatum(CurrentOperationId)); int scanKeyCount = 1; Oid scanIndexId = InvalidOid; @@ -841,6 +862,10 @@ GetNextCleanupRecordId(void) } +/* + * LockOperationId takes an exclusive lock to ensure that only one process + * can cleanup operationId resources at the same time. + */ static void LockOperationId(OperationId operationId) { @@ -852,6 +877,10 @@ LockOperationId(OperationId operationId) } +/* + * TryLockOperationId takes an exclusive lock (with dontWait = true) to ensure that + * only one process can cleanup operationId resources at the same time. + */ static bool TryLockOperationId(OperationId operationId) { @@ -859,6 +888,7 @@ TryLockOperationId(OperationId operationId) const bool sessionLock = false; const bool dontWait = true; SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); - bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); - return lockAcquired; + LockAcquireResult lockResult = LockAcquire(&tag, ExclusiveLock, sessionLock, + dontWait); + return (lockResult != LOCKACQUIRE_NOT_AVAIL); } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 49699ed75..39f746ff8 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1348,7 +1348,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* 19) Drop Publications */ DropPublications(sourceConnection, publicationInfoHash); - /* * 20) Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks @@ -1512,11 +1511,16 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, ConstructQualifiedShardName( shardInterval), - workerPlacementNode->groupId, + sourceWorkerNode->groupId, policy); /* Create dummy split child shard on source worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + + /* Add dummy split child shard entry created on source node */ + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, + sourceWorkerNode->nodeId, + shardInterval); } } } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index fae6f7c39..64735cf19 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -579,7 +579,7 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type) /* * We close all connections that we opened for the dropping here. That * way we don't keep these connections open unnecessarily during the - * shard move (which can take a long time). + * 'LogicalRepType' operation (which can take a long time). */ CloseConnection(cleanupConnection); } diff --git a/src/include/distributed/shard_cleaner.h b/src/include/distributed/shard_cleaner.h index e051bf77e..b74b395ad 100644 --- a/src/include/distributed/shard_cleaner.h +++ b/src/include/distributed/shard_cleaner.h @@ -99,11 +99,7 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType, /* * CompleteNewOperationNeedingCleanup is be called by an operation to signal - * completion. This will trigger cleanup of resources that were registered for: - * - * 1) CLEANUP_ALWAYS: resources that are transient and always need clean up. - * 2) CLEANUP_ON_FAILURE: resources that are cleanup only on failure, if - * isSuccess is false. + * completion. This will trigger cleanup of appropriate resources. */ extern void CompleteNewOperationNeedingCleanup(bool isSuccess); diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index e69de29bb..d0cd83fe9 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -0,0 +1,822 @@ +-- The test excercises below failure scenarios +--1. Failure while creating publications +--2. Failure while creating shared memory segment +--3. Failure while creating replication slots +--4. Failure while enabling subscription +--5. Failure on polling subscription state +--6. Failure on polling last write-ahead log location reported to origin WAL sender +--7. Failure on dropping subscription +CREATE SCHEMA "citus_failure_split_cleanup_schema"; +-- Create a method to execute TryDropOrphanShards +CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() +RETURNS VOID +AS 'citus' +LANGUAGE C STRICT VOLATILE; +SET search_path TO "citus_failure_split_cleanup_schema"; +SET citus.next_shard_id TO 8981000; +SET citus.next_placement_id TO 8610000; +SET citus.next_operation_id TO 777; +SET citus.next_cleanup_record_id TO 11; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +SELECT pg_backend_pid() as pid \gset +-- Set a very long(10mins) time interval to stop auto cleanup for test purposes. +ALTER SYSTEM SET citus.defer_shard_delete_interval TO 600000; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +-- Connections on the proxy port(worker_2) are monitored +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port \gset +CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text); +SELECT create_distributed_table('table_to_split', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +--1. Failure while creating publications + SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION .* FOR TABLE").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 1 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 2 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 3 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 4 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- +(0 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- +(0 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + +--2. Failure while creating shared memory segment + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +invalid socket +CONTEXT: while executing command on localhost:xxxxx +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 5 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 6 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 7 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 8 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + +--3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot. + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT .* LOGICAL .* EXPORT_SNAPSHOT.*").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +invalid socket +CONTEXT: while executing command on localhost:xxxxx +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 9 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 10 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 11 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 12 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- +(0 rows) + +--4. Failure while enabling subscription + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION .* ENABLE").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 13 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 14 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 15 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 16 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + +--5. Failure on polling subscription state + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 17 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 18 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 19 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 20 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + +--6. Failure on polling last write-ahead log location reported to origin WAL sender + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- + 21 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 + 22 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 + 23 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0 + 24 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 +(4 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 + table_to_split_8981003 + table_to_split_8981002 +(3 rows) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + +--7. Failure on dropping subscription + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-100000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx + -- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating + -- any resources. + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- +(0 rows) + + -- we need to allow connection so that we can connect to proxy + SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Left over child shards + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Left over publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over replication slots + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Left over subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + + \c - postgres - :master_port + SELECT run_try_drop_marked_shards(); + run_try_drop_marked_shards +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM pg_dist_cleanup; + record_id | operation_id | object_type | object_name | node_group_id | policy_type +--------------------------------------------------------------------- +(0 rows) + + \c - - - :worker_2_proxy_port + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + -- Empty child shards after cleanup + SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; + relname +--------------------------------------------------------------------- + table_to_split_8981000 +(1 row) + + -- Empty publications + SELECT pubname FROM pg_publication; + pubname +--------------------------------------------------------------------- + citus_shard_split_publication_xxxxxxx_xxxxxxx + citus_shard_split_publication_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty replication slot table + SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_slot_xxxxxxx_xxxxxxx + citus_shard_split_slot_xxxxxxx_xxxxxxx +(2 rows) + + -- Empty subscriptions + SELECT subname FROM pg_subscription; + subname +--------------------------------------------------------------------- + citus_shard_split_subscription_xxxxxxx +(1 row) + +-- Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_failure_split_cleanup_schema" CASCADE; +NOTICE: drop cascades to table citus_failure_split_cleanup_schema.table_to_split +-- Cleanup diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 91e253360..80126bccc 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -5,15 +5,16 @@ --4. Failure while enabling subscription --5. Failure on polling subscription state --6. Failure on polling last write-ahead log location reported to origin WAL sender ---7. Failure on disabling subscription (right before dropping it) ---8. Failure on dropping subscription -CREATE SCHEMA "citus_split_test_schema"; +--7. Failure on dropping subscription +CREATE SCHEMA "citus_failure_split_cleanup_schema"; -CREATE ROLE test_split_role WITH LOGIN; -GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; -SET ROLE test_split_role; +-- Create a method to execute TryDropOrphanShards +CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() +RETURNS VOID +AS 'citus' +LANGUAGE C STRICT VOLATILE; -SET search_path TO "citus_split_test_schema"; +SET search_path TO "citus_failure_split_cleanup_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -23,6 +24,10 @@ SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; SELECT pg_backend_pid() as pid \gset +-- Set a very long(10mins) time interval to stop auto cleanup for test purposes. +ALTER SYSTEM SET citus.defer_shard_delete_interval TO 600000; +SELECT pg_reload_conf(); + -- Connections on the proxy port(worker_2) are monitored SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port \gset @@ -30,19 +35,12 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_ CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text); SELECT create_distributed_table('table_to_split', 'id'); -\c - postgres - :master_port --- Create a method to execute TryDropOrphanShards -CREATE OR REPLACE FUNCTION run_try_drop_marked_shards() -RETURNS VOID -AS 'citus' -LANGUAGE C STRICT VOLATILE; - --1. Failure while creating publications SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION .* FOR TABLE").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; @@ -50,7 +48,7 @@ LANGUAGE C STRICT VOLATILE; SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -66,7 +64,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -76,22 +74,25 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---1. Failure while creating publications --2. Failure while creating shared memory segment -\c - postgres - :master_port + \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -107,7 +108,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -117,22 +118,25 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---2. Failure while creating shared memory segment --3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot. \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT .* LOGICAL .* EXPORT_SNAPSHOT.*").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -148,7 +152,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -158,23 +162,25 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot. - --4. Failure while enabling subscription \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION .* ENABLE").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -190,7 +196,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -200,22 +206,25 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---4. Failure while enabling subscription --5. Failure on polling subscription state \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -231,7 +240,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -241,22 +250,25 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---5. Failure on polling subscription state --6. Failure on polling last write-ahead log location reported to origin WAL sender \c - postgres - :master_port + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; + SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT * FROM pg_dist_cleanup; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -272,7 +284,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -282,64 +294,28 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---6. Failure on polling last write-ahead log location reported to origin WAL sender ---7. Failure on disabling subscription (right before dropping it) +--7. Failure on dropping subscription \c - postgres - :master_port - SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").killall()'); - SELECT pg_catalog.citus_split_shard_by_split_points( - 8981000, - ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], - 'force_logical'); - SELECT * FROM pg_dist_cleanup; - -- we need to allow connection so that we can connect to proxy - SELECT citus.mitmproxy('conn.allow()'); + SET citus.next_shard_id TO 8981002; + SET citus.next_operation_id TO 777; + SET citus.next_cleanup_record_id TO 11; - \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Left over child shards - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; - -- Left over publications - SELECT pubname FROM pg_publication; - -- Left over replication slots - SELECT slot_name FROM pg_replication_slots; - -- Left over subscriptions - SELECT subname FROM pg_subscription; - - \c - postgres - :master_port - SELECT run_try_drop_marked_shards(); - SELECT * FROM pg_dist_cleanup; - - \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; - SET citus.show_shards_for_app_name_prefixes = '*'; - -- Empty child shards after cleanup - SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; - -- Empty publications - SELECT pubname FROM pg_publication; - -- Empty replication slot table - SELECT slot_name FROM pg_replication_slots; - -- Empty subscriptions - SELECT subname FROM pg_subscription; ---7. Failure on disabling subscription (right before dropping it) - ---8. Failure on dropping subscription - \c - postgres - :master_port SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()'); SELECT pg_catalog.citus_split_shard_by_split_points( 8981000, ARRAY['-100000'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); + -- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating + -- any resources. SELECT * FROM pg_dist_cleanup; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Left over child shards SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -355,7 +331,7 @@ LANGUAGE C STRICT VOLATILE; SELECT * FROM pg_dist_cleanup; \c - - - :worker_2_proxy_port - SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET citus.show_shards_for_app_name_prefixes = '*'; -- Empty child shards after cleanup SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; @@ -365,4 +341,8 @@ LANGUAGE C STRICT VOLATILE; SELECT slot_name FROM pg_replication_slots; -- Empty subscriptions SELECT subname FROM pg_subscription; ---8. Failure on dropping subscription + +-- Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_failure_split_cleanup_schema" CASCADE; +-- Cleanup