Fix dummy shard logging bug and update test

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-26 13:50:32 -07:00
parent 206690925e
commit fa1456d14f
7 changed files with 942 additions and 109 deletions

1
null.d Normal file
View File

@ -0,0 +1 @@
null.o: /dev/null

View File

@ -42,22 +42,22 @@ int NextCleanupRecordId = 0;
*/ */
typedef struct CleanupRecord typedef struct CleanupRecord
{ {
/* unique identifier of the record (for deletes) */ /* unique identifier of the record */
uint64 recordId; uint64 recordId;
/* identifier of the operation that generated the record (must not be in progress) */ /* identifier of the operation that generated the record */
OperationId operationId; OperationId operationId;
/* type of the object (e.g. shard placement) */ /* type of the object (e.g. shard) */
CleanupObject objectType; CleanupObject objectType;
/* fully qualified name of the object */ /* fully qualified name of the object */
char *objectName; char *objectName;
/* node grou ID on which the object is located */ /* node group ID on which the object is located */
int nodeGroupId; int nodeGroupId;
/* cleanup policy */ /* cleanup policy that determines when object is cleaned */
CleanupPolicy policy; CleanupPolicy policy;
} CleanupRecord; } CleanupRecord;
@ -157,8 +157,9 @@ DropOrphanedShardsInSeparateTransaction(void)
/* /*
* TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove that catches * TryDropOrphanedShards is a wrapper around DropOrphanedShardsForMove and
* any errors to make it safe to use in the maintenance daemon. * DropOrphanedShardsForCleanup that catches any errors to make it safe to
* use in the maintenance daemon.
* *
* If dropping any of the shards failed this function returns -1, otherwise it * If dropping any of the shards failed this function returns -1, otherwise it
* returns the number of dropped shards. * returns the number of dropped shards.
@ -206,7 +207,7 @@ DropOrphanedShardsForCleanup()
foreach_ptr(record, cleanupRecordList) foreach_ptr(record, cleanupRecordList)
{ {
/* We only supporting cleaning shards right now */ /* We only support one resource type at the moment */
if (record->objectType != CLEANUP_SHARD_PLACEMENT) if (record->objectType != CLEANUP_SHARD_PLACEMENT)
{ {
ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ", ereport(WARNING, (errmsg("Invalid object type %d for cleanup record ",
@ -214,6 +215,7 @@ DropOrphanedShardsForCleanup()
continue; continue;
} }
/* Advisory locks are reentrant */
if (!TryLockOperationId(record->operationId)) if (!TryLockOperationId(record->operationId))
{ {
/* operation that the cleanup record is part of is still running */ /* operation that the cleanup record is part of is still running */
@ -222,12 +224,14 @@ DropOrphanedShardsForCleanup()
char *qualifiedTableName = record->objectName; char *qualifiedTableName = record->objectName;
WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId); WorkerNode *workerNode = LookupNodeForGroup(record->nodeGroupId);
/*
* Now that we have the lock, check if record exists.
* The operation could have completed successfully just after we called
* ListCleanupRecords in which case the record will be now gone.
*/
if (!CleanupRecordExists(record->recordId)) if (!CleanupRecordExists(record->recordId))
{ {
/*
* The operation completed successfully just after we called
* ListCleanupRecords in which case the record is now gone.
*/
continue; continue;
} }
@ -345,6 +349,10 @@ DropOrphanedShardsForMove(bool waitForLocks)
} }
/*
* StartNewOperationNeedingCleanup is be called by an operation to register
* for cleanup.
*/
OperationId OperationId
StartNewOperationNeedingCleanup(void) StartNewOperationNeedingCleanup(void)
{ {
@ -356,6 +364,10 @@ StartNewOperationNeedingCleanup(void)
} }
/*
* CompleteNewOperationNeedingCleanup is be called by an operation to signal
* completion. This will trigger cleanup of appropriate resources.
*/
void void
CompleteNewOperationNeedingCleanup(bool isSuccess) CompleteNewOperationNeedingCleanup(bool isSuccess)
{ {
@ -517,6 +529,9 @@ InsertCleanupRecordInSubtransaction(CleanupObject objectType,
} }
/*
* DeleteCleanupRecordByRecordId deletes a cleanup record by record id.
*/
static void static void
DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId) DeleteCleanupRecordByRecordIdOutsideTransaction(uint64 recordId)
{ {
@ -669,17 +684,23 @@ ListCleanupRecords(void)
/* /*
* ListCleanupRecordsForCurrentOperation lists all the current cleanup records. * ListCleanupRecordsForCurrentOperation lists all the cleanup records for
* current operation.
*/ */
static List * static List *
ListCleanupRecordsForCurrentOperation(void) ListCleanupRecordsForCurrentOperation(void)
{ {
/* We must have a valid OperationId. Any operation requring cleanup
* will call StartNewOperationNeedingCleanup.
*/
Assert(CurrentOperationId != INVALID_OPERATION_ID);
Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock); Relation pgDistCleanup = table_open(DistCleanupRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup); TupleDesc tupleDescriptor = RelationGetDescr(pgDistCleanup);
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_operation_id, BTEqualStrategyNumber, ScanKeyInit(&scanKey[0], Anum_pg_dist_cleanup_operation_id, BTEqualStrategyNumber,
F_OIDEQ, CurrentOperationId); F_INT8EQ, UInt64GetDatum(CurrentOperationId));
int scanKeyCount = 1; int scanKeyCount = 1;
Oid scanIndexId = InvalidOid; Oid scanIndexId = InvalidOid;
@ -841,6 +862,10 @@ GetNextCleanupRecordId(void)
} }
/*
* LockOperationId takes an exclusive lock to ensure that only one process
* can cleanup operationId resources at the same time.
*/
static void static void
LockOperationId(OperationId operationId) LockOperationId(OperationId operationId)
{ {
@ -852,6 +877,10 @@ LockOperationId(OperationId operationId)
} }
/*
* TryLockOperationId takes an exclusive lock (with dontWait = true) to ensure that
* only one process can cleanup operationId resources at the same time.
*/
static bool static bool
TryLockOperationId(OperationId operationId) TryLockOperationId(OperationId operationId)
{ {
@ -859,6 +888,7 @@ TryLockOperationId(OperationId operationId)
const bool sessionLock = false; const bool sessionLock = false;
const bool dontWait = true; const bool dontWait = true;
SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId); SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId);
bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); LockAcquireResult lockResult = LockAcquire(&tag, ExclusiveLock, sessionLock,
return lockAcquired; dontWait);
return (lockResult != LOCKACQUIRE_NOT_AVAIL);
} }

View File

@ -1348,7 +1348,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* 19) Drop Publications */ /* 19) Drop Publications */
DropPublications(sourceConnection, publicationInfoHash); DropPublications(sourceConnection, publicationInfoHash);
/* /*
* 20) Drop old shards and delete related metadata. Have to do that before * 20) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks * creating the new shard metadata, because there's cross-checks
@ -1512,11 +1511,16 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName( ConstructQualifiedShardName(
shardInterval), shardInterval),
workerPlacementNode->groupId, sourceWorkerNode->groupId,
policy); policy);
/* Create dummy split child shard on source worker node */ /* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
/* Add dummy split child shard entry created on source node */
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
sourceWorkerNode->nodeId,
shardInterval);
} }
} }
} }

View File

@ -579,7 +579,7 @@ DropAllLogicalReplicationLeftovers(LogicalRepType type)
/* /*
* We close all connections that we opened for the dropping here. That * We close all connections that we opened for the dropping here. That
* way we don't keep these connections open unnecessarily during the * way we don't keep these connections open unnecessarily during the
* shard move (which can take a long time). * 'LogicalRepType' operation (which can take a long time).
*/ */
CloseConnection(cleanupConnection); CloseConnection(cleanupConnection);
} }

View File

@ -99,11 +99,7 @@ extern void InsertCleanupRecordInSubtransaction(CleanupObject objectType,
/* /*
* CompleteNewOperationNeedingCleanup is be called by an operation to signal * CompleteNewOperationNeedingCleanup is be called by an operation to signal
* completion. This will trigger cleanup of resources that were registered for: * completion. This will trigger cleanup of appropriate resources.
*
* 1) CLEANUP_ALWAYS: resources that are transient and always need clean up.
* 2) CLEANUP_ON_FAILURE: resources that are cleanup only on failure, if
* isSuccess is false.
*/ */
extern void CompleteNewOperationNeedingCleanup(bool isSuccess); extern void CompleteNewOperationNeedingCleanup(bool isSuccess);

View File

@ -0,0 +1,822 @@
-- The test excercises below failure scenarios
--1. Failure while creating publications
--2. Failure while creating shared memory segment
--3. Failure while creating replication slots
--4. Failure while enabling subscription
--5. Failure on polling subscription state
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--7. Failure on dropping subscription
CREATE SCHEMA "citus_failure_split_cleanup_schema";
-- Create a method to execute TryDropOrphanShards
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
SET search_path TO "citus_failure_split_cleanup_schema";
SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SELECT pg_backend_pid() as pid \gset
-- Set a very long(10mins) time interval to stop auto cleanup for test purposes.
ALTER SYSTEM SET citus.defer_shard_delete_interval TO 600000;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
-- Connections on the proxy port(worker_2) are monitored
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port \gset
CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text);
SELECT create_distributed_table('table_to_split', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--1. Failure while creating publications
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION .* FOR TABLE").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
1 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
2 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
3 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
4 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
(0 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
(0 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
--2. Failure while creating shared memory segment
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
invalid socket
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
5 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
6 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
7 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
8 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
--3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot.
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT .* LOGICAL .* EXPORT_SNAPSHOT.*").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
invalid socket
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
9 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
10 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
11 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
12 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
(0 rows)
--4. Failure while enabling subscription
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION .* ENABLE").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
13 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
14 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
15 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
16 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
--5. Failure on polling subscription state
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
17 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
18 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
19 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
20 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
--6. Failure on polling last write-ahead log location reported to origin WAL sender
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
21 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
22 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
23 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
24 | 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
(4 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
table_to_split_8981003
table_to_split_8981002
(3 rows)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
--7. Failure on dropping subscription
\c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT pg_catalog.citus_split_shard_by_split_points(
8981000,
ARRAY['-100000'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating
-- any resources.
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
(0 rows)
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Left over publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Left over subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
run_try_drop_marked_shards
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_cleanup;
record_id | operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
relname
---------------------------------------------------------------------
table_to_split_8981000
(1 row)
-- Empty publications
SELECT pubname FROM pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_split_publication_xxxxxxx_xxxxxxx
citus_shard_split_publication_xxxxxxx_xxxxxxx
(2 rows)
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_slot_xxxxxxx_xxxxxxx
citus_shard_split_slot_xxxxxxx_xxxxxxx
(2 rows)
-- Empty subscriptions
SELECT subname FROM pg_subscription;
subname
---------------------------------------------------------------------
citus_shard_split_subscription_xxxxxxx
(1 row)
-- Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_failure_split_cleanup_schema" CASCADE;
NOTICE: drop cascades to table citus_failure_split_cleanup_schema.table_to_split
-- Cleanup

View File

@ -5,15 +5,16 @@
--4. Failure while enabling subscription --4. Failure while enabling subscription
--5. Failure on polling subscription state --5. Failure on polling subscription state
--6. Failure on polling last write-ahead log location reported to origin WAL sender --6. Failure on polling last write-ahead log location reported to origin WAL sender
--7. Failure on disabling subscription (right before dropping it) --7. Failure on dropping subscription
--8. Failure on dropping subscription CREATE SCHEMA "citus_failure_split_cleanup_schema";
CREATE SCHEMA "citus_split_test_schema";
CREATE ROLE test_split_role WITH LOGIN; -- Create a method to execute TryDropOrphanShards
GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
SET ROLE test_split_role; RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_failure_split_cleanup_schema";
SET citus.next_shard_id TO 8981000; SET citus.next_shard_id TO 8981000;
SET citus.next_placement_id TO 8610000; SET citus.next_placement_id TO 8610000;
@ -23,6 +24,10 @@ SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT pg_backend_pid() as pid \gset SELECT pg_backend_pid() as pid \gset
-- Set a very long(10mins) time interval to stop auto cleanup for test purposes.
ALTER SYSTEM SET citus.defer_shard_delete_interval TO 600000;
SELECT pg_reload_conf();
-- Connections on the proxy port(worker_2) are monitored -- Connections on the proxy port(worker_2) are monitored
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port \gset
@ -30,19 +35,12 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_proxy_
CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text); CREATE TABLE table_to_split(id int PRIMARY KEY, int_data int, data text);
SELECT create_distributed_table('table_to_split', 'id'); SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port
-- Create a method to execute TryDropOrphanShards
CREATE OR REPLACE FUNCTION run_try_drop_marked_shards()
RETURNS VOID
AS 'citus'
LANGUAGE C STRICT VOLATILE;
--1. Failure while creating publications --1. Failure while creating publications
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION .* FOR TABLE").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION .* FOR TABLE").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
@ -50,7 +48,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -66,7 +64,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -76,22 +74,25 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--1. Failure while creating publications
--2. Failure while creating shared memory segment --2. Failure while creating shared memory segment
\c - postgres - :master_port \c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="SELECT \* FROM pg_catalog.worker_split_shard_replication_setup\(.*").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -107,7 +108,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -117,22 +118,25 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--2. Failure while creating shared memory segment
--3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot. --3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot.
\c - postgres - :master_port \c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT .* LOGICAL .* EXPORT_SNAPSHOT.*").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT .* LOGICAL .* EXPORT_SNAPSHOT.*").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -148,7 +152,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -158,23 +162,25 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--3. Failure while executing 'CREATE_REPLICATION_SLOT' for Snapshot.
--4. Failure while enabling subscription --4. Failure while enabling subscription
\c - postgres - :master_port \c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION .* ENABLE").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="ALTER SUBSCRIPTION .* ENABLE").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -190,7 +196,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -200,22 +206,25 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--4. Failure while enabling subscription
--5. Failure on polling subscription state --5. Failure on polling subscription state
\c - postgres - :master_port \c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT count\(\*\) FROM pg_subscription_rel").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -231,7 +240,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -241,22 +250,25 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--5. Failure on polling subscription state
--6. Failure on polling last write-ahead log location reported to origin WAL sender --6. Failure on polling last write-ahead log location reported to origin WAL sender
\c - postgres - :master_port \c - postgres - :master_port
SET citus.next_shard_id TO 8981002;
SET citus.next_operation_id TO 777;
SET citus.next_cleanup_record_id TO 11;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -272,7 +284,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -282,64 +294,28 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--6. Failure on polling last write-ahead log location reported to origin WAL sender
--7. Failure on disabling subscription (right before dropping it) --7. Failure on dropping subscription
\c - postgres - :master_port \c - postgres - :master_port
SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").killall()'); SET citus.next_shard_id TO 8981002;
SELECT pg_catalog.citus_split_shard_by_split_points( SET citus.next_operation_id TO 777;
8981000, SET citus.next_cleanup_record_id TO 11;
ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node],
'force_logical');
SELECT * FROM pg_dist_cleanup;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
-- Left over publications
SELECT pubname FROM pg_publication;
-- Left over replication slots
SELECT slot_name FROM pg_replication_slots;
-- Left over subscriptions
SELECT subname FROM pg_subscription;
\c - postgres - :master_port
SELECT run_try_drop_marked_shards();
SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
-- Empty publications
SELECT pubname FROM pg_publication;
-- Empty replication slot table
SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions
SELECT subname FROM pg_subscription;
--7. Failure on disabling subscription (right before dropping it)
--8. Failure on dropping subscription
\c - postgres - :master_port
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()'); SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").killall()');
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-100000'], ARRAY['-100000'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
-- NO records expected as we fail at 'DropAllLogicalReplicationLeftovers' before creating
-- any resources.
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Left over child shards -- Left over child shards
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -355,7 +331,7 @@ LANGUAGE C STRICT VOLATILE;
SELECT * FROM pg_dist_cleanup; SELECT * FROM pg_dist_cleanup;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_split_test_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Empty child shards after cleanup -- Empty child shards after cleanup
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r'; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r';
@ -365,4 +341,8 @@ LANGUAGE C STRICT VOLATILE;
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;
-- Empty subscriptions -- Empty subscriptions
SELECT subname FROM pg_subscription; SELECT subname FROM pg_subscription;
--8. Failure on dropping subscription
-- Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_failure_split_cleanup_schema" CASCADE;
-- Cleanup