-- -- SINGLE_NODE -- -- This test file has an alternative output because of the change in the -- display of SQL-standard function's arguments in INSERT/SELECT in PG15. -- The alternative output can be deleted when we drop support for PG14 -- SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15; server_version_ge_15 --------------------------------------------------------------------- t (1 row) CREATE SCHEMA single_node; SET search_path TO single_node; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; -- Ensure tuple data in explain analyze output is the same on all PG versions SET citus.enable_binary_protocol = TRUE; -- do not cache any connections for now, will enable it back soon ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node -- before adding a node we are not officially a coordinator SELECT citus_is_coordinator(); citus_is_coordinator --------------------------------------------------------------------- f (1 row) -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- after adding a node we are officially a coordinator SELECT citus_is_coordinator(); citus_is_coordinator --------------------------------------------------------------------- t (1 row) -- coordinator cannot be disabled SELECT 1 FROM citus_disable_node('localhost', :master_port); ERROR: cannot change "isactive" field of the coordinator node RESET client_min_messages; SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM pg_dist_node; count --------------------------------------------------------------------- 0 (1 row) -- there are no workers now, but we should still be able to create Citus tables -- force local execution when creating the index ALTER SYSTEM SET citus.local_shared_pool_size TO -1; -- Postmaster might not ack SIGHUP signal sent by pg_reload_conf() immediately, -- so we need to sleep for some amount of time to do our best to ensure that -- postmaster reflects GUC changes. SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) CREATE TABLE failover_to_local (a int); SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE INDEX CONCURRENTLY ON failover_to_local(a); WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, if applicable, and then re-attempt the original command. ERROR: the total number of connections on the server is more than max_connections(100) HINT: Consider using a higher value for max_connections -- reset global GUC changes ALTER SYSTEM RESET citus.local_shared_pool_size; ALTER SYSTEM RESET citus.max_cached_conns_per_worker; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) CREATE TABLE single_node_nullkey_c1(a int, b int); SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE single_node_nullkey_c2(a int, b int); SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null); create_distributed_table --------------------------------------------------------------------- (1 row) -- created on different colocation groups .. SELECT ( SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass ) != ( SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass ); ?column? --------------------------------------------------------------------- t (1 row) -- .. but both are associated to coordinator SELECT groupid = 0 FROM pg_dist_placement WHERE shardid = ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass ); ?column? --------------------------------------------------------------------- t (1 row) SELECT groupid = 0 FROM pg_dist_placement WHERE shardid = ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass ); ?column? --------------------------------------------------------------------- t (1 row) -- try creating a single-shard table from a shard relation SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null); ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation -- create a tenant schema on single node setup SET citus.enable_schema_based_sharding TO ON; CREATE SCHEMA tenant_1; CREATE TABLE tenant_1.tbl_1 (a int); -- verify that we recorded tenant_1 in pg_dist_schema SELECT COUNT(*)=1 FROM pg_dist_schema WHERE schemaid::regnamespace::text = 'tenant_1'; ?column? --------------------------------------------------------------------- t (1 row) -- verify that tenant_1.tbl_1 is recorded in pg_dist_partition, as a single-shard table SELECT COUNT(*)=1 FROM pg_dist_partition WHERE logicalrelid = 'tenant_1.tbl_1'::regclass AND partmethod = 'n' AND repmodel = 's' AND colocationid IS NOT NULL; ?column? --------------------------------------------------------------------- t (1 row) RESET citus.enable_schema_based_sharding; -- Test lazy conversion from Citus local to single-shard tables -- and reference tables, on single node. This means that no shard -- replication should be needed. CREATE TABLE ref_table_conversion_test ( a int PRIMARY KEY ); SELECT citus_add_local_table_to_metadata('ref_table_conversion_test'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) -- save old shardid and placementid SELECT get_shard_id_for_distribution_column('single_node.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset SELECT create_reference_table('ref_table_conversion_test'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT public.verify_pg_dist_partition_for_reference_table('single_node.ref_table_conversion_test'); verify_pg_dist_partition_for_reference_table --------------------------------------------------------------------- t (1 row) SELECT public.verify_shard_placements_for_reference_table('single_node.ref_table_conversion_test', :ref_table_conversion_test_old_shard_id, :ref_table_conversion_test_old_coord_placement_id); verify_shard_placements_for_reference_table --------------------------------------------------------------------- t (1 row) CREATE TABLE single_shard_conversion_test_1 ( int_col_1 int PRIMARY KEY, text_col_1 text UNIQUE, int_col_2 int ); SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_1'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) -- save old shardid SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_1') AS single_shard_conversion_test_1_old_shard_id \gset SELECT create_distributed_table('single_shard_conversion_test_1', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_1'); verify_pg_dist_partition_for_single_shard_table --------------------------------------------------------------------- t (1 row) SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_1', :single_shard_conversion_test_1_old_shard_id, true); verify_shard_placement_for_single_shard_table --------------------------------------------------------------------- t (1 row) CREATE TABLE single_shard_conversion_test_2 ( int_col_1 int ); SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_2'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) -- save old shardid SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_2') AS single_shard_conversion_test_2_old_shard_id \gset SELECT create_distributed_table('single_shard_conversion_test_2', null, colocate_with=>'none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_2'); verify_pg_dist_partition_for_single_shard_table --------------------------------------------------------------------- t (1 row) SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_2', :single_shard_conversion_test_2_old_shard_id, true); verify_shard_placement_for_single_shard_table --------------------------------------------------------------------- t (1 row) -- make sure that they're created on different colocation groups SELECT ( SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'single_node.single_shard_conversion_test_1'::regclass ) != ( SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'single_node.single_shard_conversion_test_2'::regclass ); ?column? --------------------------------------------------------------------- t (1 row) SET client_min_messages TO WARNING; DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2, ref_table_conversion_test, single_shard_conversion_test_1, single_shard_conversion_test_2; DROP SCHEMA tenant_1 CASCADE; RESET client_min_messages; -- so that we don't have to update rest of the test output SET citus.next_shard_id TO 90630500; CREATE TABLE ref(x int, y int); SELECT create_reference_table('ref'); create_reference_table --------------------------------------------------------------------- (1 row) SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node; groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced --------------------------------------------------------------------- 0 | localhost | 57636 | t | t | t | t (1 row) DROP TABLE ref; -- remove the coordinator to try again with create_reference_table SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; master_remove_node --------------------------------------------------------------------- (1 row) CREATE TABLE loc(x int, y int); SELECT citus_add_local_table_to_metadata('loc'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node; groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced --------------------------------------------------------------------- 0 | localhost | 57636 | t | t | t | t (1 row) DROP TABLE loc; -- remove the coordinator to try again with create_distributed_table SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; master_remove_node --------------------------------------------------------------------- (1 row) -- verify the coordinator gets auto added with the localhost guc ALTER SYSTEM SET citus.local_hostname TO '127.0.0.1'; --although not a hostname, should work for connecting locally SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) CREATE TABLE test(x int, y int); SELECT create_distributed_table('test','x'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node; groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced --------------------------------------------------------------------- 0 | 127.0.0.1 | 57636 | t | t | t | t (1 row) DROP TABLE test; -- remove the coordinator to try again SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0; master_remove_node --------------------------------------------------------------------- (1 row) ALTER SYSTEM RESET citus.local_hostname; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(.1); -- wait to make sure the config has changed before running the GUC pg_sleep --------------------------------------------------------------------- (1 row) CREATE TABLE test(x int, y int); SELECT create_distributed_table('test','x'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node; groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced --------------------------------------------------------------------- 0 | localhost | 57636 | t | t | t | t (1 row) BEGIN; -- we should not enable MX for this temporary node just because -- it'd spawn a bg worker targeting this node -- and that changes the connection count specific tests -- here SET LOCAL citus.enable_metadata_sync TO OFF; -- cannot add workers with specific IP as long as I have a placeholder coordinator record SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port); ERROR: cannot add a worker node when the coordinator hostname is set to localhost DETAIL: Worker nodes need to be able to connect to the coordinator to transfer data. HINT: Use SELECT citus_set_coordinator_host('') to configure the coordinator hostname COMMIT; BEGIN; -- we should not enable MX for this temporary node just because -- it'd spawn a bg worker targeting this node -- and that changes the connection count specific tests -- here SET LOCAL citus.enable_metadata_sync TO OFF; -- adding localhost workers is ok SELECT 1 FROM master_add_node('localhost', :worker_1_port); NOTICE: shards are still on the coordinator after adding the new node HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('localhost',57636); to permanently move shards away from the coordinator. ?column? --------------------------------------------------------------------- 1 (1 row) COMMIT; -- we don't need this node anymore SELECT 1 FROM master_remove_node('localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- set the coordinator host to something different than localhost SELECT 1 FROM citus_set_coordinator_host('127.0.0.1'); ?column? --------------------------------------------------------------------- 1 (1 row) BEGIN; -- we should not enable MX for this temporary node just because -- it'd spawn a bg worker targeting this node -- and that changes the connection count specific tests -- here SET LOCAL citus.enable_metadata_sync TO OFF; -- adding workers with specific IP is ok now SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port); NOTICE: shards are still on the coordinator after adding the new node HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('127.0.0.1',57636); to permanently move shards away from the coordinator. ?column? --------------------------------------------------------------------- 1 (1 row) COMMIT; -- we don't need this node anymore SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- set the coordinator host back to localhost for the remainder of tests SELECT 1 FROM citus_set_coordinator_host('localhost'); ?column? --------------------------------------------------------------------- 1 (1 row) -- should have shards setting should not really matter for a single node SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); ?column? --------------------------------------------------------------------- 1 (1 row) CREATE TYPE new_type AS (n int, m text); CREATE TABLE test_2(x int, y int, z new_type); SELECT create_distributed_table('test_2','x'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE ref(a int, b int); SELECT create_reference_table('ref'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE local(c int, d int); CREATE TABLE public.another_schema_table(a int, b int); SELECT create_distributed_table('public.another_schema_table', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type); SELECT create_distributed_table('non_binary_copy_test', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i; -- Confirm the basics work INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); SELECT * FROM test WHERE x = 1; x | y --------------------------------------------------------------------- 1 | 2 (1 row) SELECT count(*) FROM test; count --------------------------------------------------------------------- 5 (1 row) SELECT * FROM test ORDER BY x; x | y --------------------------------------------------------------------- 1 | 2 2 | 7 3 | 4 4 | 5 5 | 6 (5 rows) UPDATE test SET y = y + 1 RETURNING *; x | y --------------------------------------------------------------------- 1 | 3 2 | 8 3 | 5 4 | 6 5 | 7 (5 rows) WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; x | y --------------------------------------------------------------------- 1 | 2 2 | 7 3 | 4 4 | 5 5 | 6 (5 rows) -- show that we can filter remote commands -- given that citus.grep_remote_commands, we log all commands SET citus.log_local_commands to true; SELECT count(*) FROM public.another_schema_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) -- grep matches all commands SET citus.grep_remote_commands TO "%%"; SELECT count(*) FROM public.another_schema_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) -- only filter a specific shard for the local execution BEGIN; SET LOCAL citus.grep_remote_commands TO "%90630515%"; SELECT count(*) FROM public.another_schema_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true count --------------------------------------------------------------------- 0 (1 row) -- match nothing SET LOCAL citus.grep_remote_commands TO '%nothing%'; SELECT count(*) FROM public.another_schema_table; count --------------------------------------------------------------------- 0 (1 row) COMMIT; -- only filter a specific shard for the remote execution BEGIN; SET LOCAL citus.enable_local_execution TO FALSE; SET LOCAL citus.grep_remote_commands TO '%90630515%'; SET LOCAL citus.log_remote_commands TO ON; SELECT count(*) FROM public.another_schema_table; NOTICE: issuing SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx count --------------------------------------------------------------------- 0 (1 row) -- match nothing SET LOCAL citus.grep_remote_commands TO '%nothing%'; SELECT count(*) FROM public.another_schema_table; count --------------------------------------------------------------------- 0 (1 row) COMMIT; RESET citus.log_local_commands; RESET citus.grep_remote_commands; -- Test upsert with constraint CREATE TABLE upsert_test ( part_key int UNIQUE, other_col int, third_col int ); -- distribute the table SELECT create_distributed_table('upsert_test', 'part_key'); create_distributed_table --------------------------------------------------------------------- (1 row) -- do a regular insert INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *; part_key | other_col | third_col --------------------------------------------------------------------- 1 | 1 | 2 | 2 | (2 rows) SET citus.log_remote_commands to true; -- observe that there is a conflict and the following query does nothing INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *; NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630523 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- same as the above with different syntax INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *; NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630523 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- again the same query with another syntax INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630523 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630523 DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) BEGIN; -- force local execution SELECT count(*) FROM upsert_test WHERE part_key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630523 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 1 (1 row) SET citus.log_remote_commands to false; -- multi-shard pushdown query that goes through local execution INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- multi-shard pull-to-coordinator query that goes through local execution INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) COMMIT; -- to test citus local tables select undistribute_table('upsert_test'); NOTICE: creating a new table for single_node.upsert_test NOTICE: moving the data of single_node.upsert_test NOTICE: dropping the old single_node.upsert_test NOTICE: renaming the new table to single_node.upsert_test undistribute_table --------------------------------------------------------------------- (1 row) -- create citus local table select citus_add_local_table_to_metadata('upsert_test'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) -- test the constraint with local execution INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) DROP TABLE upsert_test; CREATE TABLE relation_tracking_table_1(id int, nonid int); SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0; CREATE or REPLACE function foo() returns setof relation_tracking_table_1 AS $$ BEGIN RETURN query select * from relation_tracking_table_1 order by 1 limit 10; end; $$ language plpgsql; CREATE TABLE relation_tracking_table_2 (id int, nonid int); -- use the relation-access in this session select foo(); foo --------------------------------------------------------------------- (6,0) (7,0) (8,0) (9,0) (10,0) (11,0) (12,0) (13,0) (14,0) (15,0) (10 rows) -- we should be able to use sequential mode, as the previous multi-shard -- relation access has been cleaned-up BEGIN; SET LOCAL citus.multi_shard_modify_mode TO sequential; INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM relation_tracking_table_2; count --------------------------------------------------------------------- 995 (1 row) ROLLBACK; BEGIN; INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$) create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM relation_tracking_table_2; count --------------------------------------------------------------------- 995 (1 row) COMMIT; SET client_min_messages TO ERROR; DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE; RESET client_min_messages; CREATE SCHEMA "Quoed.Schema"; SET search_path TO "Quoed.Schema"; CREATE TABLE "long_constraint_upsert\_test" ( part_key int, other_col int, third_col int, CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key) ); NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " -- distribute the table and create shards SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " part_key | other_col | third_col --------------------------------------------------------------------- 1 | 1 | (1 row) ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name; INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *; NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- this is currently not supported, but once we support -- make sure that the following query also works fine ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name; NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted " ERROR: renaming constraints belonging to distributed tables is currently unsupported --INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; SET search_path TO single_node; SET client_min_messages TO ERROR; DROP SCHEMA "Quoed.Schema" CASCADE; RESET client_min_messages; -- test partitioned index creation with long name CREATE TABLE test_index_creation1 ( tenant_id integer NOT NULL, timeperiod timestamp without time zone NOT NULL, field1 integer NOT NULL, inserted_utc timestamp without time zone NOT NULL DEFAULT now(), PRIMARY KEY(tenant_id, timeperiod) ) PARTITION BY RANGE (timeperiod); CREATE TABLE test_index_creation1_p2020_09_26 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00'); CREATE TABLE test_index_creation1_p2020_09_27 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00'); select create_distributed_table('test_index_creation1', 'tenant_id'); create_distributed_table --------------------------------------------------------------------- (1 row) -- should be able to create indexes with INCLUDE/WHERE CREATE INDEX ix_test_index_creation5 ON test_index_creation1 USING btree(tenant_id, timeperiod) INCLUDE (field1) WHERE (tenant_id = 100); -- test if indexes are created SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%'); created --------------------------------------------------------------------- 1 (1 row) -- test citus size functions in transaction with modification CREATE TABLE test_citus_size_func (a int); SELECT create_distributed_table('test_citus_size_func', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO test_citus_size_func VALUES(1), (2); BEGIN; -- DDL with citus_table_size ALTER TABLE test_citus_size_func ADD COLUMN newcol INT; SELECT citus_table_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- DDL with citus_relation_size ALTER TABLE test_citus_size_func ADD COLUMN newcol INT; SELECT citus_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- DDL with citus_total_relation_size ALTER TABLE test_citus_size_func ADD COLUMN newcol INT; SELECT citus_total_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- single shard insert with citus_table_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_table_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- multi shard modification with citus_table_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_table_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- single shard insert with citus_relation_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- multi shard modification with citus_relation_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- single shard insert with citus_total_relation_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_total_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; -- multi shard modification with citus_total_relation_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_total_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; -- we should be able to limit intermediate results BEGIN; SET LOCAL citus.max_intermediate_result_size TO 0; WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1; ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB) DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. ROLLBACK; -- the first cte (cte_1) does not exceed the limit -- but the second (cte_2) exceeds, so we error out BEGIN; SET LOCAL citus.max_intermediate_result_size TO '1kB'; INSERT INTO test SELECT i,i from generate_series(0,1000)i; -- only pulls 1 row, should not hit the limit WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1; count --------------------------------------------------------------------- 1 (1 row) -- cte_1 only pulls 1 row, but cte_2 all rows WITH cte_1 AS (SELECT * FROM test LIMIT 1), cte_2 AS (SELECT * FROM test OFFSET 0) SELECT count(*) FROM cte_1, cte_2; ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB) DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place. HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable. ROLLBACK; -- single shard and multi-shard delete -- inside a transaction block BEGIN; DELETE FROM test WHERE y = 5; INSERT INTO test VALUES (4, 5); DELETE FROM test WHERE x = 1; INSERT INTO test VALUES (1, 2); COMMIT; CREATE INDEX single_node_i1 ON test(x); CREATE INDEX single_node_i2 ON test(x,y); REINDEX SCHEMA single_node; REINDEX SCHEMA CONCURRENTLY single_node; -- keep one of the indexes -- drop w/wout tx blocks BEGIN; DROP INDEX single_node_i2; ROLLBACK; DROP INDEX single_node_i2; -- change the schema w/wout TX block BEGIN; ALTER TABLE public.another_schema_table SET SCHEMA single_node; ROLLBACK; ALTER TABLE public.another_schema_table SET SCHEMA single_node; BEGIN; TRUNCATE test; SELECT * FROM test; x | y --------------------------------------------------------------------- (0 rows) ROLLBACK; VACUUM test; VACUUM test, test_2; VACUUM ref, test; VACUUM ANALYZE test(x); ANALYZE ref; ANALYZE test_2; VACUUM local; VACUUM local, ref, test, test_2; VACUUM FULL test, ref; BEGIN; ALTER TABLE test ADD COLUMN z INT DEFAULT 66; SELECT count(*) FROM test WHERE z = 66; count --------------------------------------------------------------------- 5 (1 row) ROLLBACK; -- explain analyze should work on a single node EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) SELECT * FROM test; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=5 loops=1) Task Count: 4 Tuple data received from nodes: 40 bytes Tasks Shown: One of 4 -> Task Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on test_90630506 test (actual rows=2 loops=1) (8 rows) -- common utility command SELECT pg_size_pretty(citus_relation_size('test'::regclass)); pg_size_pretty --------------------------------------------------------------------- 24 kB (1 row) -- basic view queries CREATE VIEW single_node_view AS SELECT count(*) as cnt FROM test t1 JOIN test t2 USING (x); SELECT * FROM single_node_view; cnt --------------------------------------------------------------------- 5 (1 row) SELECT * FROM single_node_view, test WHERE test.x = single_node_view.cnt; cnt | x | y --------------------------------------------------------------------- 5 | 5 | 6 (1 row) -- copy in/out BEGIN; COPY test(x) FROM PROGRAM 'seq 32'; SELECT count(*) FROM test; count --------------------------------------------------------------------- 37 (1 row) COPY (SELECT count(DISTINCT x) FROM test) TO STDOUT; 32 INSERT INTO test SELECT i,i FROM generate_series(0,100)i; ROLLBACK; -- master_create_empty_shard on coordinator BEGIN; CREATE TABLE append_table (a INT, b INT); SELECT create_distributed_table('append_table','a','append'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT master_create_empty_shard('append_table'); NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ... ERROR: could only create 0 of 1 of required shard replicas END; -- alter table inside a tx block BEGIN; ALTER TABLE test ADD COLUMN z single_node.new_type; INSERT INTO test VALUES (99, 100, (1, 'onder')::new_type) RETURNING *; x | y | z --------------------------------------------------------------------- 99 | 100 | (1,onder) (1 row) ROLLBACK; -- prepared statements with custom types PREPARE single_node_prepare_p1(int, int, new_type) AS INSERT INTO test_2 VALUES ($1, $2, $3); EXECUTE single_node_prepare_p1(1, 1, (95, 'citus9.5')::new_type); EXECUTE single_node_prepare_p1(2 ,2, (94, 'citus9.4')::new_type); EXECUTE single_node_prepare_p1(3 ,2, (93, 'citus9.3')::new_type); EXECUTE single_node_prepare_p1(4 ,2, (92, 'citus9.2')::new_type); EXECUTE single_node_prepare_p1(5 ,2, (91, 'citus9.1')::new_type); EXECUTE single_node_prepare_p1(6 ,2, (90, 'citus9.0')::new_type); PREPARE use_local_query_cache(int) AS SELECT count(*) FROM test_2 WHERE x = $1; EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) SET client_min_messages TO DEBUG2; -- the 6th execution will go through the planner -- the 7th execution will skip the planner as it uses the cache EXECUTE use_local_query_cache(1); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan count --------------------------------------------------------------------- 1 (1 row) EXECUTE use_local_query_cache(1); count --------------------------------------------------------------------- 1 (1 row) RESET client_min_messages; -- partitioned table should be fine, adding for completeness CREATE TABLE collections_list ( key bigint, ts timestamptz DEFAULT now(), collection_id integer, value numeric, PRIMARY KEY(key, collection_id) ) PARTITION BY LIST (collection_id ); SELECT create_distributed_table('collections_list', 'key'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE collections_list_0 PARTITION OF collections_list (key, ts, collection_id, value) FOR VALUES IN ( 0 ); CREATE TABLE collections_list_1 PARTITION OF collections_list (key, ts, collection_id, value) FOR VALUES IN ( 1 ); INSERT INTO collections_list SELECT i, '2011-01-01', i % 2, i * i FROM generate_series(0, 100) i; SELECT count(*) FROM collections_list WHERE key < 10 AND collection_id = 1; count --------------------------------------------------------------------- 5 (1 row) SELECT count(*) FROM collections_list_0 WHERE key < 10 AND collection_id = 1; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM collections_list_1 WHERE key = 11; count --------------------------------------------------------------------- 1 (1 row) ALTER TABLE collections_list DROP COLUMN ts; SELECT * FROM collections_list, collections_list_0 WHERE collections_list.key=collections_list_0.key ORDER BY 1 DESC,2 DESC,3 DESC,4 DESC LIMIT 1; key | collection_id | value | key | collection_id | value --------------------------------------------------------------------- 100 | 0 | 10000 | 100 | 0 | 10000 (1 row) -- test hash distribution using INSERT with generate_series() function CREATE OR REPLACE FUNCTION part_hashint4_noop(value int4, seed int8) RETURNS int8 AS $$ SELECT value + seed; $$ LANGUAGE SQL IMMUTABLE; CREATE OPERATOR CLASS part_test_int4_ops FOR TYPE int4 USING HASH AS operator 1 =, function 2 part_hashint4_noop(int4, int8); CREATE TABLE hash_parted ( a int, b int ) PARTITION BY HASH (a part_test_int4_ops); CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0); CREATE TABLE hpart1 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 1); CREATE TABLE hpart2 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 2); CREATE TABLE hpart3 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 3); -- Disable metadata sync since citus doesn't support distributing -- operator class for now. SET citus.enable_metadata_sync TO OFF; SELECT create_distributed_table('hash_parted ', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO hash_parted VALUES (1, generate_series(1, 10)); SELECT * FROM hash_parted ORDER BY 1, 2; a | b --------------------------------------------------------------------- 1 | 1 1 | 2 1 | 3 1 | 4 1 | 5 1 | 6 1 | 7 1 | 8 1 | 9 1 | 10 (10 rows) ALTER TABLE hash_parted DETACH PARTITION hpart0; ALTER TABLE hash_parted DETACH PARTITION hpart1; ALTER TABLE hash_parted DETACH PARTITION hpart2; ALTER TABLE hash_parted DETACH PARTITION hpart3; RESET citus.enable_metadata_sync; -- test range partition without creating partitions and inserting with generate_series() -- should error out even in plain PG since no partition of relation "parent_tab" is found for row -- in Citus it errors out because it fails to evaluate partition key in insert CREATE TABLE parent_tab (id int) PARTITION BY RANGE (id); SELECT create_distributed_table('parent_tab', 'id'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO parent_tab VALUES (generate_series(0, 3)); ERROR: failed to evaluate partition key in insert HINT: try using constant values for partition column -- now it should work CREATE TABLE parent_tab_1_2 PARTITION OF parent_tab FOR VALUES FROM (1) to (2); ALTER TABLE parent_tab ADD COLUMN b int; INSERT INTO parent_tab VALUES (1, generate_series(0, 3)); SELECT * FROM parent_tab ORDER BY 1, 2; id | b --------------------------------------------------------------------- 1 | 0 1 | 1 1 | 2 1 | 3 (4 rows) -- make sure that parallel accesses are good SET citus.force_max_query_parallelization TO ON; SELECT * FROM test_2 ORDER BY 1 DESC; x | y | z --------------------------------------------------------------------- 6 | 2 | (90,citus9.0) 5 | 2 | (91,citus9.1) 4 | 2 | (92,citus9.2) 3 | 2 | (93,citus9.3) 2 | 2 | (94,citus9.4) 1 | 1 | (95,citus9.5) (6 rows) DELETE FROM test_2 WHERE y = 1000 RETURNING *; x | y | z --------------------------------------------------------------------- (0 rows) RESET citus.force_max_query_parallelization ; BEGIN; INSERT INTO test_2 VALUES (7 ,2, (83, 'citus8.3')::new_type); SAVEPOINT s1; INSERT INTO test_2 VALUES (9 ,1, (82, 'citus8.2')::new_type); SAVEPOINT s2; ROLLBACK TO SAVEPOINT s1; SELECT * FROM test_2 WHERE z = (83, 'citus8.3')::new_type OR z = (82, 'citus8.2')::new_type; x | y | z --------------------------------------------------------------------- 7 | 2 | (83,citus8.3) (1 row) RELEASE SAVEPOINT s1; COMMIT; SELECT * FROM test_2 WHERE z = (83, 'citus8.3')::new_type OR z = (82, 'citus8.2')::new_type; x | y | z --------------------------------------------------------------------- 7 | 2 | (83,citus8.3) (1 row) -- final query is only intermediate result -- we want PG 11/12/13 behave consistently, the CTEs should be MATERIALIZED WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1 ORDER BY 1,2; x | y | z --------------------------------------------------------------------- 1 | 1 | (95,citus9.5) 2 | 2 | (94,citus9.4) 3 | 2 | (93,citus9.3) 4 | 2 | (92,citus9.2) 5 | 2 | (91,citus9.1) 6 | 2 | (90,citus9.0) 7 | 2 | (83,citus8.3) (7 rows) -- final query is router query WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1, test_2 WHERE test_2.x = cte_1.x AND test_2.x = 7 ORDER BY 1,2; x | y | z | x | y | z --------------------------------------------------------------------- 7 | 2 | (83,citus8.3) | 7 | 2 | (83,citus8.3) (1 row) -- final query is a distributed query WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1, test_2 WHERE test_2.x = cte_1.x AND test_2.y != 2 ORDER BY 1,2; x | y | z | x | y | z --------------------------------------------------------------------- 1 | 1 | (95,citus9.5) | 1 | 1 | (95,citus9.5) (1 row) -- query pushdown should work SELECT * FROM (SELECT x, count(*) FROM test_2 GROUP BY x) as foo, (SELECT x, count(*) FROM test_2 GROUP BY x) as bar WHERE foo.x = bar.x ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC LIMIT 1; x | count | x | count --------------------------------------------------------------------- 7 | 1 | 7 | 1 (1 row) -- make sure that foreign keys work fine ALTER TABLE test_2 ADD CONSTRAINT first_pkey PRIMARY KEY (x); ALTER TABLE test ADD CONSTRAINT foreign_key FOREIGN KEY (x) REFERENCES test_2(x) ON DELETE CASCADE; -- show that delete on test_2 cascades to test SELECT * FROM test WHERE x = 5; x | y --------------------------------------------------------------------- 5 | 6 (1 row) DELETE FROM test_2 WHERE x = 5; SELECT * FROM test WHERE x = 5; x | y --------------------------------------------------------------------- (0 rows) INSERT INTO test_2 VALUES (5 ,2, (91, 'citus9.1')::new_type); INSERT INTO test VALUES (5, 6); INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8); SELECT count(*) FROM ref; count --------------------------------------------------------------------- 3 (1 row) SELECT * FROM ref ORDER BY a; a | b --------------------------------------------------------------------- 1 | 2 5 | 6 7 | 8 (3 rows) SELECT * FROM test, ref WHERE x = a ORDER BY x; x | y | a | b --------------------------------------------------------------------- 1 | 2 | 1 | 2 5 | 6 | 5 | 6 (2 rows) INSERT INTO local VALUES (1, 2), (3, 4), (7, 8); SELECT count(*) FROM local; count --------------------------------------------------------------------- 3 (1 row) SELECT * FROM local ORDER BY c; c | d --------------------------------------------------------------------- 1 | 2 3 | 4 7 | 8 (3 rows) SELECT * FROM ref, local WHERE a = c ORDER BY a; a | b | c | d --------------------------------------------------------------------- 1 | 2 | 1 | 2 7 | 8 | 7 | 8 (2 rows) -- Check repartition joins are supported SET citus.enable_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET search_path TO public; SET citus.enable_single_hash_repartition_joins TO OFF; SELECT * FROM single_node.test t1, single_node.test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM single_node.test t1, single_node.test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET search_path TO single_node; SET citus.task_assignment_policy TO 'round-robin'; SET citus.enable_single_hash_repartition_joins TO ON; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET citus.task_assignment_policy TO 'greedy'; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) SET citus.task_assignment_policy TO 'first-replica'; SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x; x | y | x | y --------------------------------------------------------------------- 2 | 7 | 1 | 2 4 | 5 | 3 | 4 5 | 6 | 4 | 5 (3 rows) RESET citus.enable_repartition_joins; RESET citus.enable_single_hash_repartition_joins; -- INSERT SELECT router BEGIN; INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1; SELECT count(*) from test; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- INSERT SELECT pushdown BEGIN; INSERT INTO test(x, y) SELECT x, y FROM test; SELECT count(*) from test; count --------------------------------------------------------------------- 10 (1 row) ROLLBACK; -- INSERT SELECT analytical query BEGIN; INSERT INTO test(x, y) SELECT count(x), max(y) FROM test; SELECT count(*) from test; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- INSERT SELECT repartition BEGIN; INSERT INTO test(x, y) SELECT y, x FROM test; SELECT count(*) from test; count --------------------------------------------------------------------- 10 (1 row) ROLLBACK; -- INSERT SELECT from reference table into distributed BEGIN; INSERT INTO test(x, y) SELECT a, b FROM ref; SELECT count(*) from test; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from local table into distributed BEGIN; INSERT INTO test(x, y) SELECT c, d FROM local; SELECT count(*) from test; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO ref(a, b) SELECT x, y FROM test; SELECT count(*) from ref; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO ref(a, b) SELECT c, d FROM local; SELECT count(*) from ref; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO local(c, d) SELECT x, y FROM test; SELECT count(*) from local; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO local(c, d) SELECT a, b FROM ref; SELECT count(*) from local; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- Confirm that dummy placements work SELECT count(*) FROM test WHERE false; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); count --------------------------------------------------------------------- (0 rows) -- Confirm that they work with round-robin task assignment policy SET citus.task_assignment_policy TO 'round-robin'; SELECT count(*) FROM test WHERE false; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y); count --------------------------------------------------------------------- (0 rows) RESET citus.task_assignment_policy; SELECT count(*) FROM test; count --------------------------------------------------------------------- 5 (1 row) -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO ref(a, b) SELECT x, y FROM test; SELECT count(*) from ref; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO ref(a, b) SELECT c, d FROM local; SELECT count(*) from ref; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO local(c, d) SELECT x, y FROM test; SELECT count(*) from local; count --------------------------------------------------------------------- 8 (1 row) ROLLBACK; -- INSERT SELECT from distributed table to local table BEGIN; INSERT INTO local(c, d) SELECT a, b FROM ref; SELECT count(*) from local; count --------------------------------------------------------------------- 6 (1 row) ROLLBACK; -- query fails on the shards should be handled -- nicely SELECT x/0 FROM test; ERROR: division by zero CONTEXT: while executing command on localhost:xxxxx -- Add "fake" pg_dist_transaction records and run recovery -- to show that it is recovered -- Temporarily disable automatic 2PC recovery ALTER SYSTEM SET citus.recover_2pc_interval TO -1; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) BEGIN; CREATE TABLE should_commit (value int); PREPARE TRANSACTION 'citus_0_should_commit'; -- zero is the coordinator's group id, so we can hard code it INSERT INTO pg_dist_transaction VALUES (0, 'citus_0_should_commit'); SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- 1 (1 row) -- the table should be seen SELECT * FROM should_commit; value --------------------------------------------------------------------- (0 rows) -- set the original back ALTER SYSTEM RESET citus.recover_2pc_interval; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) RESET citus.task_executor_type; -- make sure undistribute table works fine ALTER TABLE test DROP CONSTRAINT foreign_key; SELECT undistribute_table('test_2'); NOTICE: creating a new table for single_node.test_2 NOTICE: moving the data of single_node.test_2 NOTICE: dropping the old single_node.test_2 NOTICE: renaming the new table to single_node.test_2 undistribute_table --------------------------------------------------------------------- (1 row) SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass; logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- (0 rows) CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1)); SELECT create_reference_table('reference_table_1'); create_reference_table --------------------------------------------------------------------- (1 row) CREATE TABLE distributed_table_1 (col_1 INT UNIQUE); SELECT create_distributed_table('distributed_table_1', 'col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE); SELECT citus_add_local_table_to_metadata('citus_local_table_1'); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1); CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200); CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300); SELECT create_distributed_table('partitioned_table_1', 'col_1'); create_distributed_table --------------------------------------------------------------------- (1 row) ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1); ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1); ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2); SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true); NOTICE: converting the partitions of single_node.partitioned_table_1 NOTICE: creating a new table for single_node.partitioned_table_1 NOTICE: dropping the old single_node.partitioned_table_1 NOTICE: renaming the new table to single_node.partitioned_table_1 NOTICE: creating a new table for single_node.reference_table_1 NOTICE: moving the data of single_node.reference_table_1 NOTICE: dropping the old single_node.reference_table_1 NOTICE: renaming the new table to single_node.reference_table_1 NOTICE: creating a new table for single_node.distributed_table_1 NOTICE: moving the data of single_node.distributed_table_1 NOTICE: dropping the old single_node.distributed_table_1 NOTICE: renaming the new table to single_node.distributed_table_1 NOTICE: creating a new table for single_node.citus_local_table_1 NOTICE: moving the data of single_node.citus_local_table_1 NOTICE: dropping the old single_node.citus_local_table_1 NOTICE: renaming the new table to single_node.citus_local_table_1 NOTICE: creating a new table for single_node.partitioned_table_1_100_200 NOTICE: moving the data of single_node.partitioned_table_1_100_200 NOTICE: dropping the old single_node.partitioned_table_1_100_200 NOTICE: renaming the new table to single_node.partitioned_table_1_100_200 NOTICE: creating a new table for single_node.partitioned_table_1_200_300 NOTICE: moving the data of single_node.partitioned_table_1_200_300 NOTICE: dropping the old single_node.partitioned_table_1_200_300 NOTICE: renaming the new table to single_node.partitioned_table_1_200_300 undistribute_table --------------------------------------------------------------------- (1 row) CREATE TABLE local_table_1 (col_1 INT UNIQUE); CREATE TABLE local_table_2 (col_1 INT UNIQUE); CREATE TABLE local_table_3 (col_1 INT UNIQUE); ALTER TABLE local_table_2 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1); ALTER TABLE local_table_3 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1); ALTER TABLE local_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1); SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_keys=>true); citus_add_local_table_to_metadata --------------------------------------------------------------------- (1 row) CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$ BEGIN INSERT INTO test (x) VALUES ($1); END;$$; SELECT * FROM pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- 5 | 0 | localhost | 57636 | default | t | t | primary | default | t | t (1 row) SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); create_distributed_function --------------------------------------------------------------------- (1 row) CREATE FUNCTION function_delegation(int) RETURNS void AS $$ BEGIN UPDATE test SET y = y + 1 WHERE x < $1; END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('function_delegation(int)', '$1', 'test'); create_distributed_function --------------------------------------------------------------------- (1 row) SET client_min_messages TO DEBUG1; CALL call_delegation(1); DEBUG: not pushing down procedure to the same node SELECT function_delegation(1); DEBUG: not pushing down function to the same node function_delegation --------------------------------------------------------------------- (1 row) SET client_min_messages TO WARNING; DROP TABLE test CASCADE; CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count() RETURNS bigint LANGUAGE C STRICT AS 'citus', $$get_all_active_client_backend_count$$; -- set the cached connections to zero -- and execute a distributed query so that -- we end up with zero cached connections afterwards ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- disable deadlock detection and re-trigger 2PC recovery -- once more when citus.max_cached_conns_per_worker is zero -- so that we can be sure that the connections established for -- maintanince daemon is closed properly. -- this is to prevent random failures in the tests (otherwise, we -- might see connections established for this operations) ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) -- now that last 2PC recovery is done, we're good to disable it ALTER SYSTEM SET citus.recover_2pc_interval TO '-1'; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- test alter_distributed_table UDF CREATE TABLE adt_table (a INT, b INT); CREATE TABLE adt_col (a INT UNIQUE, b INT); CREATE TABLE adt_ref (a INT REFERENCES adt_col(a)); SELECT create_distributed_table('adt_table', 'a', colocate_with:='none'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_col', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT create_distributed_table('adt_ref', 'a', colocate_with:='adt_table'); create_distributed_table --------------------------------------------------------------------- (1 row) INSERT INTO adt_table VALUES (1, 2), (3, 4), (5, 6); INSERT INTO adt_col VALUES (3, 4), (5, 6), (7, 8); INSERT INTO adt_ref VALUES (3), (5); SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 4 adt_ref | distributed | a | 4 adt_table | distributed | a | 4 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT alter_distributed_table('adt_table', shard_count:=6, cascade_to_colocated:=true); alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | a | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref, adt_table (1 row) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT alter_distributed_table('adt_table', distribution_column:='b', colocate_with:='none'); alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_col | distributed | a | 6 adt_ref | distributed | a | 6 adt_table | distributed | b | 6 (3 rows) SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1; Colocation Groups --------------------------------------------------------------------- adt_col, adt_ref adt_table (2 rows) SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1; Referencing Table | Definition --------------------------------------------------------------------- adt_col | UNIQUE (a) adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a) (2 rows) SELECT * FROM adt_table ORDER BY 1; a | b --------------------------------------------------------------------- 1 | 2 3 | 4 5 | 6 (3 rows) SELECT * FROM adt_col ORDER BY 1; a | b --------------------------------------------------------------------- 3 | 4 5 | 6 7 | 8 (3 rows) SELECT * FROM adt_ref ORDER BY 1; a --------------------------------------------------------------------- 3 5 (2 rows) -- make sure that COPY (e.g., INSERT .. SELECT) and -- alter_distributed_table works in the same TX BEGIN; SET LOCAL citus.enable_local_execution=OFF; INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x; SELECT alter_distributed_table('adt_table', distribution_column:='a'); alter_distributed_table --------------------------------------------------------------------- (1 row) ROLLBACK; BEGIN; INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x; SELECT alter_distributed_table('adt_table', distribution_column:='a'); alter_distributed_table --------------------------------------------------------------------- (1 row) SELECT COUNT(*) FROM adt_table; count --------------------------------------------------------------------- 1003 (1 row) END; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table'; table_name | citus_table_type | distribution_column | shard_count --------------------------------------------------------------------- adt_table | distributed | a | 6 (1 row) \c - - - :master_port -- sometimes Postgres is a little slow to terminate the backends -- even if PGFinish is sent. So, to prevent any flaky tests, sleep SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) -- since max_cached_conns_per_worker == 0 at this point, the -- backend(s) that execute on the shards will be terminated -- so show that there no internal backends SET search_path TO single_node; SET citus.next_shard_id TO 90730500; SELECT count(*) from should_commit; count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; count --------------------------------------------------------------------- 0 (1 row) SELECT get_all_active_client_backend_count(); get_all_active_client_backend_count --------------------------------------------------------------------- 1 (1 row) BEGIN; SET LOCAL citus.shard_count TO 32; SET LOCAL citus.force_max_query_parallelization TO ON; SET LOCAL citus.enable_local_execution TO false; CREATE TABLE test (a int); SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('test', 'a'); create_distributed_table --------------------------------------------------------------------- (1 row) SELECT count(*) FROM test; count --------------------------------------------------------------------- 0 (1 row) -- now, we should have additional 32 connections SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%'; count --------------------------------------------------------------------- 32 (1 row) -- single external connection SELECT get_all_active_client_backend_count(); get_all_active_client_backend_count --------------------------------------------------------------------- 1 (1 row) ROLLBACK; \c - - - :master_port SET search_path TO single_node; SET citus.next_shard_id TO 90830500; -- simulate that even if there is no connection slots -- to connect, Citus can switch to local execution SET citus.force_max_query_parallelization TO false; SET citus.log_remote_commands TO ON; ALTER SYSTEM SET citus.local_shared_pool_size TO -1; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) SELECT pg_sleep(0.1); pg_sleep --------------------------------------------------------------------- (1 row) SET citus.executor_slow_start_interval TO 10; SELECT count(*) from another_schema_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630515 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630516 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630517 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630518 another_schema_table WHERE true count --------------------------------------------------------------------- 0 (1 row) UPDATE another_schema_table SET b = b; NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630516 another_schema_table SET b = b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630517 another_schema_table SET b = b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630518 another_schema_table SET b = b -- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning -- not that we ignore INSERT .. SELECT via coordinator as it relies on -- COPY command INSERT INTO another_schema_table SELECT * FROM another_schema_table; NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630515 another_schema_table WHERE (another_schema_table.a IS NOT NULL) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630516 another_schema_table WHERE (another_schema_table.a IS NOT NULL) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630517 another_schema_table WHERE (another_schema_table.a IS NOT NULL) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630518 another_schema_table WHERE (another_schema_table.a IS NOT NULL) INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630515_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630515_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630516_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630516_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630517_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630517_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630518_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630518_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 -- multi-row INSERTs INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) VALUES (1,1), (5,5) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) VALUES (6,6) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) VALUES (2,2) -- INSERT..SELECT with re-partitioning when using local execution BEGIN; INSERT INTO another_schema_table VALUES (1,100); NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 (a, b) VALUES (1, 100) INSERT INTO another_schema_table VALUES (2,100); NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 (a, b) VALUES (2, 100) INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630515_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630515_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630516_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630516_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630517_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630517_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630518_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630518_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630515_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630516_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630515_to_2,repartitioned_results_xxxxx_from_90630517_to_2,repartitioned_results_xxxxx_from_90630518_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630518_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b a | b --------------------------------------------------------------------- 100 | 1 100 | 2 (2 rows) ROLLBACK; -- intermediate results WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 count --------------------------------------------------------------------- 7 (1 row) -- this is to get ready for the next tests TRUNCATE another_schema_table; NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE -- copy can use local execution even if there is no connection available COPY another_schema_table(a) FROM PROGRAM 'seq 32'; NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 1: "1" NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 2: "2" NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 3: "3" NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 6: "6" -- INSERT .. SELECT with co-located intermediate results SET citus.log_remote_commands to false; CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a); SET citus.log_local_commands to true; INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b a | b --------------------------------------------------------------------- 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | (10 rows) -- INSERT .. SELECT with co-located intermediate result for non-binary input WITH cte_1 AS (INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630519 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630520 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630521 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630522 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630519 AS citus_table_alias (key, value) SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('insert_select_XXX_90630519'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630520 AS citus_table_alias (key, value) SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('insert_select_XXX_90630520'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630521 AS citus_table_alias (key, value) SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('insert_select_XXX_90630521'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630522 AS citus_table_alias (key, value) SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('insert_select_XXX_90630522'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value single_node.new_type)) cte_1 count --------------------------------------------------------------------- 1001 (1 row) -- test with NULL columns ALTER TABLE non_binary_copy_test ADD COLUMN z INT; NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630519, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z integer;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630520, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z integer;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630521, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z integer;') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630522, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z integer;') WITH cte_1 AS (INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) SELECT bool_and(z is null) FROM cte_1; NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630519 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630520 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630521 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630522 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630519 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630519'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630520 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630520'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630521 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630521'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630522 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630522'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1 bool_and --------------------------------------------------------------------- t (1 row) -- test with type coersion (int -> text) and also NULL values with coersion WITH cte_1 AS (INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z) SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1; NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630519 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630520 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630521 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630522 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630519 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630519'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630520 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630520'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630521 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630521'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630522 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630522'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS count, count(DISTINCT (z)::text) AS count FROM (SELECT intermediate_result.key, intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, z integer)) cte_1 count | count --------------------------------------------------------------------- 1001 | 0 (1 row) -- test disabling drop and truncate for known shards SET citus.shard_replication_factor TO 1; CREATE TABLE test_disabling_drop_and_truncate (a int); SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- (1 row) SET citus.enable_manual_changes_to_shards TO off; -- these should error out DROP TABLE test_disabling_drop_and_truncate_90830500; ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly RESET citus.enable_manual_changes_to_shards ; -- these should work as expected TRUNCATE TABLE test_disabling_drop_and_truncate_90830500; DROP TABLE test_disabling_drop_and_truncate_90830500; DROP TABLE test_disabling_drop_and_truncate; -- test creating distributed or reference tables from shards CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- (1 row) -- these should error because shards cannot be used to: -- create distributed table SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a'); ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation -- create reference table SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504'); ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation RESET citus.shard_replication_factor; DROP TABLE test_creating_distributed_relation_table_from_shard; -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table; NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i; NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx NOTICE: executing the copy locally for shard xxxxx WITH cte_1 AS (INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630515 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630516 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630517 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630518 AS citus_table_alias (a, b) SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 count --------------------------------------------------------------------- 0 (1 row) WITH cte_1 AS (INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) SELECT bool_and(z is null) FROM cte_1; NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630519 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630520 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630521 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630522 non_binary_copy_test WHERE true LIMIT '10000'::bigint NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630519 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630519'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630520 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630520'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630521 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630521'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630522 AS citus_table_alias (key, value, z) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.z FROM read_intermediate_result('insert_select_XXX_90630522'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1 bool_and --------------------------------------------------------------------- t (1 row) RESET citus.local_copy_flush_threshold; RESET citus.local_copy_flush_threshold; CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', $$coordinated_transaction_should_use_2PC$$; -- a multi-shard/single-shard select that is failed over to local -- execution doesn't start a 2PC BEGIN; SELECT count(*) FROM another_schema_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630515 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630516 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630517 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630518 another_schema_table WHERE true count --------------------------------------------------------------------- 10001 (1 row) SELECT count(*) FROM another_schema_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 1 (1 row) WITH cte_1 as (SELECT * FROM another_schema_table LIMIT 10) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630515 another_schema_table WHERE true LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630516 another_schema_table WHERE true LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630517 another_schema_table WHERE true LIMIT '10'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630518 another_schema_table WHERE true LIMIT '10'::bigint NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 count --------------------------------------------------------------------- 10 (1 row) WITH cte_1 as (SELECT * FROM another_schema_table WHERE a = 1 LIMIT 10) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT another_schema_table.a, another_schema_table.b FROM single_node.another_schema_table_90630515 another_schema_table WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) LIMIT 10) cte_1 count --------------------------------------------------------------------- 1 (1 row) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- f (1 row) ROLLBACK; -- same without a transaction block WITH cte_1 AS (SELECT count(*) as cnt FROM another_schema_table LIMIT 1000), cte_2 AS (SELECT coordinated_transaction_should_use_2PC() as enabled_2pc) SELECT cnt, enabled_2pc FROM cte_1, cte_2; NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630515 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630516 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630517 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT count(*) AS cnt FROM single_node.another_schema_table_90630518 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT cte_1.cnt, cte_2.enabled_2pc FROM (SELECT intermediate_result.cnt FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint)) cte_1, (SELECT intermediate_result.enabled_2pc FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(enabled_2pc boolean)) cte_2 cnt | enabled_2pc --------------------------------------------------------------------- 10001 | f (1 row) -- a multi-shard modification that is failed over to local -- execution starts a 2PC BEGIN; UPDATE another_schema_table SET b = b + 1; NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630516 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630517 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630518 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) ROLLBACK; -- a multi-shard modification that is failed over to local -- execution starts a 2PC BEGIN; WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) SELECT count(*) FROM cte_1; NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630516 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630517 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630518 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 count --------------------------------------------------------------------- 10001 (1 row) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) ROLLBACK; -- same without transaction block WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 RETURNING *) SELECT coordinated_transaction_should_use_2PC(); NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630516 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630517 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630518 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) RETURNING a, b NOTICE: executing the command locally: SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) -- a single-shard modification that is failed over to local -- starts 2PC execution BEGIN; UPDATE another_schema_table SET b = b + 1 WHERE a = 1; NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (b OPERATOR(pg_catalog.+) 1) WHERE (a OPERATOR(pg_catalog.=) 1) SELECT coordinated_transaction_should_use_2PC(); coordinated_transaction_should_use_2pc --------------------------------------------------------------------- t (1 row) ROLLBACK; -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; SELECT count(*) from another_schema_table; ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; UPDATE another_schema_table SET b = b; ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT * FROM another_schema_table; ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); ERROR: the total number of connections on the server is more than max_connections(100) HINT: This command supports local execution. Consider enabling local execution using SET citus.enable_local_execution TO true; -- copy fails if local execution is disabled and there is no connection slot COPY another_schema_table(a) FROM PROGRAM 'seq 32'; ERROR: could not find an available connection HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish CONTEXT: COPY another_schema_table, line 1: "1" -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.local_shared_pool_size; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- t (1 row) -- suppress notices SET client_min_messages TO error; -- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added SELECT 1 FROM master_remove_node('localhost', :master_port); ERROR: cannot remove or disable the node localhost:xxxxx because because it contains the only shard placement for shard xxxxx DETAIL: One of the table(s) that prevents the operation complete successfully is single_node.ref HINT: To proceed, either drop the tables or use undistribute_table() function to convert them to local tables -- Cleanup DROP SCHEMA single_node CASCADE; -- Remove the coordinator again SELECT 1 FROM master_remove_node('localhost', :master_port); ?column? --------------------------------------------------------------------- 1 (1 row) -- restart nodeid sequence so that multi_cluster_management still has the same -- nodeids ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 1;