-- -- failure_tenant_isolation -- -- due to different libpq versions -- some warning messages differ -- between local and CI SET client_min_messages TO ERROR; CREATE SCHEMA IF NOT EXISTS tenant_isolation; SET SEARCH_PATH = tenant_isolation; SET citus.shard_count TO 2; SET citus.next_shard_id TO 300; SET citus.shard_replication_factor TO 1; SET citus.max_adaptive_executor_pool_size TO 1; SELECT pg_backend_pid() as pid \gset SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) -- cleanup leftovers if any SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) CREATE TABLE table_1 (id int PRIMARY KEY); CREATE TABLE table_2 (ref_id int REFERENCES table_1(id) UNIQUE, data int); SELECT create_distributed_table('table_1', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('table_2', 'ref_id'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE VIEW shard_sizes AS SELECT shardid, result AS row_count FROM run_command_on_placements('table_1', 'SELECT count(*) FROM %s'); INSERT INTO table_1 SELECT x FROM generate_series(1, 100) AS f (x); INSERT INTO table_2 SELECT x, x FROM generate_series(1, 100) AS f (x); -- initial shard sizes SELECT * FROM shard_sizes ORDER BY 1; shardid | row_count --------------------------------------------------------------------- 300 | 49 301 | 51 (2 rows) -- failure on table creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on table creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on colocated table creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on colocated table creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on table constraints on replica identity creation SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_1 ADD CONSTRAINT").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on table constraints on replica identity creation SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_1 ADD CONSTRAINT").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on publication creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on publication creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on replication slot creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on replication slot creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE_REPLICATION_SLOT").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on setting snapshot SELECT citus.mitmproxy('conn.onQuery(query="SET TRANSACTION SNAPSHOT").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- cancellation on setting snapshot SELECT citus.mitmproxy('conn.onQuery(query="SET TRANSACTION SNAPSHOT").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on table population SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- cancellation on table population SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on colocated table population SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- cancellation on colocated table population SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on replication setup udf SELECT citus.mitmproxy('conn.onQuery(query="worker_split_shard_replication_setup").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry. -- cancellation on replication setup udf SELECT citus.mitmproxy('conn.onQuery(query="worker_split_shard_replication_setup").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on subscription creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on subscription creation SELECT citus.mitmproxy('conn.onQuery(query="CREATE SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on catching up LSN SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cancellation on catching up LSN SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_current_wal_lsn").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").killall()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- failure on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").killall()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- cancellation on dropping publication SELECT citus.mitmproxy('conn.onQuery(query="DROP PUBLICATION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- failure on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").killall()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- cancellation on dropping replication slot SELECT citus.mitmproxy('conn.onQuery(query="select pg_drop_replication_slot").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- cleanup leftovers SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT public.wait_for_resource_cleanup(); wait_for_resource_cleanup --------------------------------------------------------------------- (1 row) -- failure on foreign key creation SELECT citus.mitmproxy('conn.onQuery(query="ADD CONSTRAINT table_2_ref_id_fkey FOREIGN KEY").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -- failure on shard split transaction SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: failure on connection marked as essential: localhost:xxxxx -- failure on shard split transaction SELECT citus.mitmproxy('conn.onQuery(query="BEGIN").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on shard split transaction commit SELECT citus.mitmproxy('conn.onQuery(query="COMMIT").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: failure on connection marked as essential: localhost:xxxxx -- failure on shard split transaction commit SELECT citus.mitmproxy('conn.onQuery(query="COMMIT").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- failure on transaction prepare for dropping old tables SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) -- due to libpq version differences, the output might change -- hence use code block to catch the error \set VERBOSITY terse DO LANGUAGE plpgsql $$ BEGIN SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); EXCEPTION WHEN OTHERS THEN RAISE 'Command failed to execute'; END; $$; ERROR: Command failed to execute \set VERBOSITY default -- failure on transaction prepare for dropping old tables SELECT citus.mitmproxy('conn.onQuery(query="PREPARE TRANSACTION").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical'); ERROR: canceling statement due to user request -- verify that the tenant is not isolated SELECT * FROM shard_sizes ORDER BY 1; shardid | row_count --------------------------------------------------------------------- 300 | 49 301 | 51 (2 rows) -- Verify that tenant can be isolated after unsuccessful attempts SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- (1 row) -- shard sizes after successful tenant isolation CREATE TABLE old_shards AS SELECT shardid FROM pg_dist_shard; WITH new_shard AS ( SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE', shard_transfer_mode := 'force_logical') AS shardid ) SELECT row_count FROM shard_sizes JOIN new_shard ON shard_sizes.shardid = new_shard.shardid; row_count --------------------------------------------------------------------- 1 (1 row) SELECT row_count FROM shard_sizes WHERE shard_sizes.shardid NOT IN (SELECT * FROM old_shards) ORDER BY 1; row_count --------------------------------------------------------------------- 1 20 28 (3 rows) \set VERBOSITY terse DROP SCHEMA tenant_isolation CASCADE; \set VERBOSITY default