mirror of https://github.com/citusdata/citus.git
1915 lines
92 KiB
Plaintext
1915 lines
92 KiB
Plaintext
CREATE SCHEMA single_node;
|
|
SET search_path TO single_node;
|
|
SET citus.shard_count TO 4;
|
|
SET citus.shard_replication_factor TO 1;
|
|
SET citus.next_shard_id TO 90630500;
|
|
SET citus.replication_model TO 'streaming';
|
|
-- adding the coordinator as inactive is disallowed
|
|
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
|
|
ERROR: coordinator node cannot be added as inactive node
|
|
-- idempotently add node to allow this test to run without add_coordinator
|
|
SET client_min_messages TO WARNING;
|
|
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- coordinator cannot be disabled
|
|
SELECT 1 FROM master_disable_node('localhost', :master_port);
|
|
ERROR: Disabling localhost:xxxxx failed
|
|
DETAIL: cannot change "isactive" field of the coordinator node
|
|
RESET client_min_messages;
|
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM pg_dist_node;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
-- there are no workers now, but we should still be able to create Citus tables
|
|
CREATE TABLE ref(x int, y int);
|
|
SELECT create_reference_table('ref');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
|
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
|
---------------------------------------------------------------------
|
|
0 | localhost | 57636 | t | t | t | t
|
|
(1 row)
|
|
|
|
DROP TABLE ref;
|
|
-- remove the coordinator to try again with create_reference_table
|
|
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
|
master_remove_node
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE loc(x int, y int);
|
|
SELECT citus_add_local_table_to_metadata('loc');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
|
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
|
---------------------------------------------------------------------
|
|
0 | localhost | 57636 | t | t | t | t
|
|
(1 row)
|
|
|
|
DROP TABLE loc;
|
|
-- remove the coordinator to try again with create_distributed_table
|
|
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
|
master_remove_node
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE test(x int, y int);
|
|
SELECT create_distributed_table('test','x');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
|
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
|
---------------------------------------------------------------------
|
|
0 | localhost | 57636 | t | t | t | t
|
|
(1 row)
|
|
|
|
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
|
|
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
|
ERROR: cannot add a worker node when the coordinator hostname is set to localhost
|
|
DETAIL: Worker nodes need to be able to connect to the coordinator to transfer data.
|
|
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
|
|
-- adding localhost workers is ok
|
|
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
|
NOTICE: shards are still on the coordinator after adding the new node
|
|
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('localhost',57636); to permanently move shards away from the coordinator.
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- set the coordinator host to something different than localhost
|
|
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- adding workers with specific IP is ok now
|
|
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
|
NOTICE: shards are still on the coordinator after adding the new node
|
|
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('127.0.0.1',57636); to permanently move shards away from the coordinator.
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- set the coordinator host back to localhost for the remainder of tests
|
|
SELECT 1 FROM citus_set_coordinator_host('localhost');
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- should have shards setting should not really matter for a single node
|
|
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
CREATE TYPE new_type AS (n int, m text);
|
|
CREATE TABLE test_2(x int, y int, z new_type);
|
|
SELECT create_distributed_table('test_2','x');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE ref(a int, b int);
|
|
SELECT create_reference_table('ref');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE local(c int, d int);
|
|
CREATE TABLE public.another_schema_table(a int, b int);
|
|
SELECT create_distributed_table('public.another_schema_table', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type);
|
|
SELECT create_distributed_table('non_binary_copy_test', 'key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i;
|
|
-- Confirm the basics work
|
|
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
|
|
SELECT * FROM test WHERE x = 1;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
SELECT * FROM test ORDER BY x;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
2 | 7
|
|
3 | 4
|
|
4 | 5
|
|
5 | 6
|
|
(5 rows)
|
|
|
|
UPDATE test SET y = y + 1 RETURNING *;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
1 | 3
|
|
2 | 8
|
|
3 | 5
|
|
4 | 6
|
|
5 | 7
|
|
(5 rows)
|
|
|
|
WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
2 | 7
|
|
3 | 4
|
|
4 | 5
|
|
5 | 6
|
|
(5 rows)
|
|
|
|
-- Test upsert with constraint
|
|
CREATE TABLE upsert_test
|
|
(
|
|
part_key int UNIQUE,
|
|
other_col int,
|
|
third_col int
|
|
);
|
|
-- distribute the table
|
|
SELECT create_distributed_table('upsert_test', 'part_key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- do a regular insert
|
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
1 | 1 |
|
|
2 | 2 |
|
|
(2 rows)
|
|
|
|
SET citus.log_remote_commands to true;
|
|
-- observe that there is a conflict and the following query does nothing
|
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
|
|
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- same as the above with different syntax
|
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
|
|
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- again the same query with another syntax
|
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
|
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630519 DO NOTHING RETURNING part_key, other_col, third_col
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
BEGIN;
|
|
-- force local execution
|
|
SELECT count(*) FROM upsert_test WHERE part_key = 1;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630519 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SET citus.log_remote_commands to false;
|
|
-- multi-shard pushdown query that goes through local execution
|
|
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- multi-shard pull-to-coordinator query that goes through local execution
|
|
INSERT INTO upsert_test (part_key, other_col) SELECT part_key, other_col FROM upsert_test LIMIT 100 ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
COMMIT;
|
|
-- to test citus local tables
|
|
select undistribute_table('upsert_test');
|
|
NOTICE: creating a new table for single_node.upsert_test
|
|
NOTICE: moving the data of single_node.upsert_test
|
|
NOTICE: dropping the old single_node.upsert_test
|
|
NOTICE: renaming the new table to single_node.upsert_test
|
|
undistribute_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- create citus local table
|
|
select citus_add_local_table_to_metadata('upsert_test');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- test the constraint with local execution
|
|
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
DROP TABLE upsert_test;
|
|
CREATE SCHEMA "Quoed.Schema";
|
|
SET search_path TO "Quoed.Schema";
|
|
CREATE TABLE "long_constraint_upsert\_test"
|
|
(
|
|
part_key int,
|
|
other_col int,
|
|
third_col int,
|
|
CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" UNIQUE (part_key)
|
|
);
|
|
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
|
-- distribute the table and create shards
|
|
SELECT create_distributed_table('"long_constraint_upsert\_test"', 'part_key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO "long_constraint_upsert\_test" (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
|
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
1 | 1 |
|
|
(1 row)
|
|
|
|
ALTER TABLE "long_constraint_upsert\_test" RENAME TO simple_table_name;
|
|
INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" DO NOTHING RETURNING *;
|
|
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
|
part_key | other_col | third_col
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- this is currently not supported, but once we support
|
|
-- make sure that the following query also works fine
|
|
ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" TO simple_constraint_name;
|
|
NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted \aconstraint" will be truncated to "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quoted "
|
|
ERROR: renaming constraints belonging to distributed tables is currently unsupported
|
|
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
|
SET search_path TO single_node;
|
|
DROP SCHEMA "Quoed.Schema" CASCADE;
|
|
NOTICE: drop cascades to 5 other objects
|
|
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
|
|
drop cascades to table "Quoed.Schema".simple_table_name_90630524
|
|
drop cascades to table "Quoed.Schema".simple_table_name_90630525
|
|
drop cascades to table "Quoed.Schema".simple_table_name_90630526
|
|
drop cascades to table "Quoed.Schema".simple_table_name_90630527
|
|
-- test partitioned index creation with long name
|
|
CREATE TABLE test_index_creation1
|
|
(
|
|
tenant_id integer NOT NULL,
|
|
timeperiod timestamp without time zone NOT NULL,
|
|
field1 integer NOT NULL,
|
|
inserted_utc timestamp without time zone NOT NULL DEFAULT now(),
|
|
PRIMARY KEY(tenant_id, timeperiod)
|
|
) PARTITION BY RANGE (timeperiod);
|
|
CREATE TABLE test_index_creation1_p2020_09_26
|
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
|
|
CREATE TABLE test_index_creation1_p2020_09_27
|
|
PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');
|
|
select create_distributed_table('test_index_creation1', 'tenant_id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- should be able to create indexes with INCLUDE/WHERE
|
|
CREATE INDEX ix_test_index_creation5 ON test_index_creation1
|
|
USING btree(tenant_id, timeperiod)
|
|
INCLUDE (field1) WHERE (tenant_id = 100);
|
|
-- test if indexes are created
|
|
SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%');
|
|
created
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- test citus size functions in transaction with modification
|
|
CREATE TABLE test_citus_size_func (a int);
|
|
SELECT create_distributed_table('test_citus_size_func', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO test_citus_size_func VALUES(1), (2);
|
|
BEGIN;
|
|
-- DDL with citus_table_size
|
|
ALTER TABLE test_citus_size_func ADD COLUMN newcol INT;
|
|
SELECT citus_table_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- DDL with citus_relation_size
|
|
ALTER TABLE test_citus_size_func ADD COLUMN newcol INT;
|
|
SELECT citus_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- DDL with citus_total_relation_size
|
|
ALTER TABLE test_citus_size_func ADD COLUMN newcol INT;
|
|
SELECT citus_total_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- single shard insert with citus_table_size
|
|
INSERT INTO test_citus_size_func VALUES (3);
|
|
SELECT citus_table_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- multi shard modification with citus_table_size
|
|
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
|
|
SELECT citus_table_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- single shard insert with citus_relation_size
|
|
INSERT INTO test_citus_size_func VALUES (3);
|
|
SELECT citus_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- multi shard modification with citus_relation_size
|
|
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
|
|
SELECT citus_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- single shard insert with citus_total_relation_size
|
|
INSERT INTO test_citus_size_func VALUES (3);
|
|
SELECT citus_total_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
BEGIN;
|
|
-- multi shard modification with citus_total_relation_size
|
|
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
|
|
SELECT citus_total_relation_size('test_citus_size_func');
|
|
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
|
|
ROLLBACK;
|
|
-- we should be able to limit intermediate results
|
|
BEGIN;
|
|
SET LOCAL citus.max_intermediate_result_size TO 0;
|
|
WITH cte_1 AS (SELECT * FROM test OFFSET 0) SELECT * FROM cte_1;
|
|
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 0 kB)
|
|
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
|
|
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
|
|
ROLLBACK;
|
|
-- the first cte (cte_1) does not exceed the limit
|
|
-- but the second (cte_2) exceeds, so we error out
|
|
BEGIN;
|
|
SET LOCAL citus.max_intermediate_result_size TO '1kB';
|
|
INSERT INTO test SELECT i,i from generate_series(0,1000)i;
|
|
-- only pulls 1 row, should not hit the limit
|
|
WITH cte_1 AS (SELECT * FROM test LIMIT 1) SELECT count(*) FROM cte_1;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- cte_1 only pulls 1 row, but cte_2 all rows
|
|
WITH cte_1 AS (SELECT * FROM test LIMIT 1),
|
|
cte_2 AS (SELECT * FROM test OFFSET 0)
|
|
SELECT count(*) FROM cte_1, cte_2;
|
|
ERROR: the intermediate result size exceeds citus.max_intermediate_result_size (currently 1 kB)
|
|
DETAIL: Citus restricts the size of intermediate results of complex subqueries and CTEs to avoid accidentally pulling large result sets into once place.
|
|
HINT: To run the current query, set citus.max_intermediate_result_size to a higher value or -1 to disable.
|
|
ROLLBACK;
|
|
-- single shard and multi-shard delete
|
|
-- inside a transaction block
|
|
BEGIN;
|
|
DELETE FROM test WHERE y = 5;
|
|
INSERT INTO test VALUES (4, 5);
|
|
DELETE FROM test WHERE x = 1;
|
|
INSERT INTO test VALUES (1, 2);
|
|
COMMIT;
|
|
CREATE INDEX single_node_i1 ON test(x);
|
|
CREATE INDEX single_node_i2 ON test(x,y);
|
|
REINDEX SCHEMA single_node;
|
|
-- PG 11 does not support CONCURRENTLY
|
|
-- and we do not want to add a new output
|
|
-- file just for that. Enable the test
|
|
-- once we remove PG_VERSION_11
|
|
--REINDEX SCHEMA CONCURRENTLY single_node;
|
|
-- keep one of the indexes
|
|
-- drop w/wout tx blocks
|
|
BEGIN;
|
|
DROP INDEX single_node_i2;
|
|
ROLLBACK;
|
|
DROP INDEX single_node_i2;
|
|
-- change the schema w/wout TX block
|
|
BEGIN;
|
|
ALTER TABLE public.another_schema_table SET SCHEMA single_node;
|
|
ROLLBACK;
|
|
ALTER TABLE public.another_schema_table SET SCHEMA single_node;
|
|
BEGIN;
|
|
TRUNCATE test;
|
|
SELECT * FROM test;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
ROLLBACK;
|
|
VACUUM test;
|
|
VACUUM test, test_2;
|
|
VACUUM ref, test;
|
|
VACUUM ANALYZE test(x);
|
|
ANALYZE ref;
|
|
ANALYZE test_2;
|
|
VACUUM local;
|
|
VACUUM local, ref, test, test_2;
|
|
VACUUM FULL test, ref;
|
|
BEGIN;
|
|
ALTER TABLE test ADD COLUMN z INT DEFAULT 66;
|
|
SELECT count(*) FROM test WHERE z = 66;
|
|
count
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- explain analyze should work on a single node
|
|
EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE)
|
|
SELECT * FROM test;
|
|
QUERY PLAN
|
|
---------------------------------------------------------------------
|
|
Custom Scan (Citus Adaptive) (actual rows=5 loops=1)
|
|
Task Count: 4
|
|
Tuple data received from nodes: 10 bytes
|
|
Tasks Shown: One of 4
|
|
-> Task
|
|
Tuple data received from node: 4 bytes
|
|
Node: host=localhost port=xxxxx dbname=regression
|
|
-> Seq Scan on test_90630502 test (actual rows=2 loops=1)
|
|
(8 rows)
|
|
|
|
-- common utility command
|
|
SELECT pg_size_pretty(citus_relation_size('test'::regclass));
|
|
pg_size_pretty
|
|
---------------------------------------------------------------------
|
|
24 kB
|
|
(1 row)
|
|
|
|
-- basic view queries
|
|
CREATE VIEW single_node_view AS
|
|
SELECT count(*) as cnt FROM test t1 JOIN test t2 USING (x);
|
|
SELECT * FROM single_node_view;
|
|
cnt
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
SELECT * FROM single_node_view, test WHERE test.x = single_node_view.cnt;
|
|
cnt | x | y
|
|
---------------------------------------------------------------------
|
|
5 | 5 | 6
|
|
(1 row)
|
|
|
|
-- copy in/out
|
|
BEGIN;
|
|
COPY test(x) FROM PROGRAM 'seq 32';
|
|
SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
37
|
|
(1 row)
|
|
|
|
COPY (SELECT count(DISTINCT x) FROM test) TO STDOUT;
|
|
32
|
|
INSERT INTO test SELECT i,i FROM generate_series(0,100)i;
|
|
ROLLBACK;
|
|
-- alter table inside a tx block
|
|
BEGIN;
|
|
ALTER TABLE test ADD COLUMN z single_node.new_type;
|
|
INSERT INTO test VALUES (99, 100, (1, 'onder')::new_type) RETURNING *;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
99 | 100 | (1,onder)
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- prepared statements with custom types
|
|
PREPARE single_node_prepare_p1(int, int, new_type) AS
|
|
INSERT INTO test_2 VALUES ($1, $2, $3);
|
|
EXECUTE single_node_prepare_p1(1, 1, (95, 'citus9.5')::new_type);
|
|
EXECUTE single_node_prepare_p1(2 ,2, (94, 'citus9.4')::new_type);
|
|
EXECUTE single_node_prepare_p1(3 ,2, (93, 'citus9.3')::new_type);
|
|
EXECUTE single_node_prepare_p1(4 ,2, (92, 'citus9.2')::new_type);
|
|
EXECUTE single_node_prepare_p1(5 ,2, (91, 'citus9.1')::new_type);
|
|
EXECUTE single_node_prepare_p1(6 ,2, (90, 'citus9.0')::new_type);
|
|
PREPARE use_local_query_cache(int) AS SELECT count(*) FROM test_2 WHERE x = $1;
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
SET client_min_messages TO DEBUG2;
|
|
-- the 6th execution will go through the planner
|
|
-- the 7th execution will skip the planner as it uses the cache
|
|
EXECUTE use_local_query_cache(1);
|
|
DEBUG: Deferred pruning for a fast-path router query
|
|
DEBUG: Creating router plan
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
EXECUTE use_local_query_cache(1);
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
RESET client_min_messages;
|
|
-- partitioned table should be fine, adding for completeness
|
|
CREATE TABLE collections_list (
|
|
key bigint,
|
|
ts timestamptz DEFAULT now(),
|
|
collection_id integer,
|
|
value numeric,
|
|
PRIMARY KEY(key, collection_id)
|
|
) PARTITION BY LIST (collection_id );
|
|
SELECT create_distributed_table('collections_list', 'key');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE collections_list_0
|
|
PARTITION OF collections_list (key, ts, collection_id, value)
|
|
FOR VALUES IN ( 0 );
|
|
CREATE TABLE collections_list_1
|
|
PARTITION OF collections_list (key, ts, collection_id, value)
|
|
FOR VALUES IN ( 1 );
|
|
INSERT INTO collections_list SELECT i, '2011-01-01', i % 2, i * i FROM generate_series(0, 100) i;
|
|
SELECT count(*) FROM collections_list WHERE key < 10 AND collection_id = 1;
|
|
count
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM collections_list_0 WHERE key < 10 AND collection_id = 1;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM collections_list_1 WHERE key = 11;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
ALTER TABLE collections_list DROP COLUMN ts;
|
|
SELECT * FROM collections_list, collections_list_0 WHERE collections_list.key=collections_list_0.key ORDER BY 1 DESC,2 DESC,3 DESC,4 DESC LIMIT 1;
|
|
key | collection_id | value | key | collection_id | value
|
|
---------------------------------------------------------------------
|
|
100 | 0 | 10000 | 100 | 0 | 10000
|
|
(1 row)
|
|
|
|
-- test hash distribution using INSERT with generate_series() function
|
|
CREATE OR REPLACE FUNCTION part_hashint4_noop(value int4, seed int8)
|
|
RETURNS int8 AS $$
|
|
SELECT value + seed;
|
|
$$ LANGUAGE SQL IMMUTABLE;
|
|
CREATE OPERATOR CLASS part_test_int4_ops
|
|
FOR TYPE int4
|
|
USING HASH AS
|
|
operator 1 =,
|
|
function 2 part_hashint4_noop(int4, int8);
|
|
CREATE TABLE hash_parted (
|
|
a int,
|
|
b int
|
|
) PARTITION BY HASH (a part_test_int4_ops);
|
|
CREATE TABLE hpart0 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 0);
|
|
CREATE TABLE hpart1 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 1);
|
|
CREATE TABLE hpart2 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 2);
|
|
CREATE TABLE hpart3 PARTITION OF hash_parted FOR VALUES WITH (modulus 4, remainder 3);
|
|
SELECT create_distributed_table('hash_parted ', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO hash_parted VALUES (1, generate_series(1, 10));
|
|
SELECT * FROM hash_parted ORDER BY 1, 2;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
1 | 1
|
|
1 | 2
|
|
1 | 3
|
|
1 | 4
|
|
1 | 5
|
|
1 | 6
|
|
1 | 7
|
|
1 | 8
|
|
1 | 9
|
|
1 | 10
|
|
(10 rows)
|
|
|
|
ALTER TABLE hash_parted DETACH PARTITION hpart0;
|
|
ALTER TABLE hash_parted DETACH PARTITION hpart1;
|
|
ALTER TABLE hash_parted DETACH PARTITION hpart2;
|
|
ALTER TABLE hash_parted DETACH PARTITION hpart3;
|
|
-- test range partition without creating partitions and inserting with generate_series()
|
|
-- should error out even in plain PG since no partition of relation "parent_tab" is found for row
|
|
-- in Citus it errors out because it fails to evaluate partition key in insert
|
|
CREATE TABLE parent_tab (id int) PARTITION BY RANGE (id);
|
|
SELECT create_distributed_table('parent_tab', 'id');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO parent_tab VALUES (generate_series(0, 3));
|
|
ERROR: failed to evaluate partition key in insert
|
|
HINT: try using constant values for partition column
|
|
-- now it should work
|
|
CREATE TABLE parent_tab_1_2 PARTITION OF parent_tab FOR VALUES FROM (1) to (2);
|
|
ALTER TABLE parent_tab ADD COLUMN b int;
|
|
INSERT INTO parent_tab VALUES (1, generate_series(0, 3));
|
|
SELECT * FROM parent_tab ORDER BY 1, 2;
|
|
id | b
|
|
---------------------------------------------------------------------
|
|
1 | 0
|
|
1 | 1
|
|
1 | 2
|
|
1 | 3
|
|
(4 rows)
|
|
|
|
-- make sure that parallel accesses are good
|
|
SET citus.force_max_query_parallelization TO ON;
|
|
SELECT * FROM test_2 ORDER BY 1 DESC;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
6 | 2 | (90,citus9.0)
|
|
5 | 2 | (91,citus9.1)
|
|
4 | 2 | (92,citus9.2)
|
|
3 | 2 | (93,citus9.3)
|
|
2 | 2 | (94,citus9.4)
|
|
1 | 1 | (95,citus9.5)
|
|
(6 rows)
|
|
|
|
DELETE FROM test_2 WHERE y = 1000 RETURNING *;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
RESET citus.force_max_query_parallelization ;
|
|
BEGIN;
|
|
INSERT INTO test_2 VALUES (7 ,2, (83, 'citus8.3')::new_type);
|
|
SAVEPOINT s1;
|
|
INSERT INTO test_2 VALUES (9 ,1, (82, 'citus8.2')::new_type);
|
|
SAVEPOINT s2;
|
|
ROLLBACK TO SAVEPOINT s1;
|
|
SELECT * FROM test_2 WHERE z = (83, 'citus8.3')::new_type OR z = (82, 'citus8.2')::new_type;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
7 | 2 | (83,citus8.3)
|
|
(1 row)
|
|
|
|
RELEASE SAVEPOINT s1;
|
|
COMMIT;
|
|
SELECT * FROM test_2 WHERE z = (83, 'citus8.3')::new_type OR z = (82, 'citus8.2')::new_type;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
7 | 2 | (83,citus8.3)
|
|
(1 row)
|
|
|
|
-- final query is only intermediate result
|
|
-- we want PG 11/12/13 behave consistently, the CTEs should be MATERIALIZED
|
|
SET citus.enable_cte_inlining TO FALSE;
|
|
WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1 ORDER BY 1,2;
|
|
x | y | z
|
|
---------------------------------------------------------------------
|
|
1 | 1 | (95,citus9.5)
|
|
2 | 2 | (94,citus9.4)
|
|
3 | 2 | (93,citus9.3)
|
|
4 | 2 | (92,citus9.2)
|
|
5 | 2 | (91,citus9.1)
|
|
6 | 2 | (90,citus9.0)
|
|
7 | 2 | (83,citus8.3)
|
|
(7 rows)
|
|
|
|
-- final query is router query
|
|
WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1, test_2 WHERE test_2.x = cte_1.x AND test_2.x = 7 ORDER BY 1,2;
|
|
x | y | z | x | y | z
|
|
---------------------------------------------------------------------
|
|
7 | 2 | (83,citus8.3) | 7 | 2 | (83,citus8.3)
|
|
(1 row)
|
|
|
|
-- final query is a distributed query
|
|
WITH cte_1 AS (SELECT * FROM test_2) SELECT * FROM cte_1, test_2 WHERE test_2.x = cte_1.x AND test_2.y != 2 ORDER BY 1,2;
|
|
x | y | z | x | y | z
|
|
---------------------------------------------------------------------
|
|
1 | 1 | (95,citus9.5) | 1 | 1 | (95,citus9.5)
|
|
(1 row)
|
|
|
|
-- query pushdown should work
|
|
SELECT
|
|
*
|
|
FROM
|
|
(SELECT x, count(*) FROM test_2 GROUP BY x) as foo,
|
|
(SELECT x, count(*) FROM test_2 GROUP BY x) as bar
|
|
WHERE
|
|
foo.x = bar.x
|
|
ORDER BY 1 DESC, 2 DESC, 3 DESC, 4 DESC
|
|
LIMIT 1;
|
|
x | count | x | count
|
|
---------------------------------------------------------------------
|
|
7 | 1 | 7 | 1
|
|
(1 row)
|
|
|
|
-- make sure that foreign keys work fine
|
|
ALTER TABLE test_2 ADD CONSTRAINT first_pkey PRIMARY KEY (x);
|
|
ALTER TABLE test ADD CONSTRAINT foreign_key FOREIGN KEY (x) REFERENCES test_2(x) ON DELETE CASCADE;
|
|
-- show that delete on test_2 cascades to test
|
|
SELECT * FROM test WHERE x = 5;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
5 | 6
|
|
(1 row)
|
|
|
|
DELETE FROM test_2 WHERE x = 5;
|
|
SELECT * FROM test WHERE x = 5;
|
|
x | y
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
INSERT INTO test_2 VALUES (5 ,2, (91, 'citus9.1')::new_type);
|
|
INSERT INTO test VALUES (5, 6);
|
|
INSERT INTO ref VALUES (1, 2), (5, 6), (7, 8);
|
|
SELECT count(*) FROM ref;
|
|
count
|
|
---------------------------------------------------------------------
|
|
3
|
|
(1 row)
|
|
|
|
SELECT * FROM ref ORDER BY a;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
5 | 6
|
|
7 | 8
|
|
(3 rows)
|
|
|
|
SELECT * FROM test, ref WHERE x = a ORDER BY x;
|
|
x | y | a | b
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2
|
|
5 | 6 | 5 | 6
|
|
(2 rows)
|
|
|
|
INSERT INTO local VALUES (1, 2), (3, 4), (7, 8);
|
|
SELECT count(*) FROM local;
|
|
count
|
|
---------------------------------------------------------------------
|
|
3
|
|
(1 row)
|
|
|
|
SELECT * FROM local ORDER BY c;
|
|
c | d
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
3 | 4
|
|
7 | 8
|
|
(3 rows)
|
|
|
|
SELECT * FROM ref, local WHERE a = c ORDER BY a;
|
|
a | b | c | d
|
|
---------------------------------------------------------------------
|
|
1 | 2 | 1 | 2
|
|
7 | 8 | 7 | 8
|
|
(2 rows)
|
|
|
|
-- Check repartion joins are supported
|
|
SET citus.enable_repartition_joins TO ON;
|
|
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET citus.enable_single_hash_repartition_joins TO ON;
|
|
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET search_path TO public;
|
|
SET citus.enable_single_hash_repartition_joins TO OFF;
|
|
SELECT * FROM single_node.test t1, single_node.test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET citus.enable_single_hash_repartition_joins TO ON;
|
|
SELECT * FROM single_node.test t1, single_node.test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET search_path TO single_node;
|
|
SET citus.task_assignment_policy TO 'round-robin';
|
|
SET citus.enable_single_hash_repartition_joins TO ON;
|
|
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET citus.task_assignment_policy TO 'greedy';
|
|
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
SET citus.task_assignment_policy TO 'first-replica';
|
|
SELECT * FROM test t1, test t2 WHERE t1.x = t2.y ORDER BY t1.x;
|
|
x | y | x | y
|
|
---------------------------------------------------------------------
|
|
2 | 7 | 1 | 2
|
|
4 | 5 | 3 | 4
|
|
5 | 6 | 4 | 5
|
|
(3 rows)
|
|
|
|
RESET citus.enable_repartition_joins;
|
|
RESET citus.enable_single_hash_repartition_joins;
|
|
-- INSERT SELECT router
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT x, y FROM test WHERE x = 1;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT pushdown
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT x, y FROM test;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
10
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT analytical query
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT count(x), max(y) FROM test;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT repartition
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT y, x FROM test;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
10
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from reference table into distributed
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT a, b FROM ref;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from local table into distributed
|
|
BEGIN;
|
|
INSERT INTO test(x, y) SELECT c, d FROM local;
|
|
SELECT count(*) from test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO ref(a, b) SELECT x, y FROM test;
|
|
SELECT count(*) from ref;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO ref(a, b) SELECT c, d FROM local;
|
|
SELECT count(*) from ref;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO local(c, d) SELECT x, y FROM test;
|
|
SELECT count(*) from local;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO local(c, d) SELECT a, b FROM ref;
|
|
SELECT count(*) from local;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- Confirm that dummy placements work
|
|
SELECT count(*) FROM test WHERE false;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
|
count
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- Confirm that they work with round-robin task assignment policy
|
|
SET citus.task_assignment_policy TO 'round-robin';
|
|
SELECT count(*) FROM test WHERE false;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test WHERE false GROUP BY GROUPING SETS (x,y);
|
|
count
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
RESET citus.task_assignment_policy;
|
|
SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
5
|
|
(1 row)
|
|
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO ref(a, b) SELECT x, y FROM test;
|
|
SELECT count(*) from ref;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO ref(a, b) SELECT c, d FROM local;
|
|
SELECT count(*) from ref;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO local(c, d) SELECT x, y FROM test;
|
|
SELECT count(*) from local;
|
|
count
|
|
---------------------------------------------------------------------
|
|
8
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- INSERT SELECT from distributed table to local table
|
|
BEGIN;
|
|
INSERT INTO local(c, d) SELECT a, b FROM ref;
|
|
SELECT count(*) from local;
|
|
count
|
|
---------------------------------------------------------------------
|
|
6
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
-- query fails on the shards should be handled
|
|
-- nicely
|
|
SELECT x/0 FROM test;
|
|
ERROR: division by zero
|
|
CONTEXT: while executing command on localhost:xxxxx
|
|
-- Add "fake" pg_dist_transaction records and run recovery
|
|
-- to show that it is recovered
|
|
-- Temporarily disable automatic 2PC recovery
|
|
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
CREATE TABLE should_commit (value int);
|
|
PREPARE TRANSACTION 'citus_0_should_commit';
|
|
-- zero is the coordinator's group id, so we can hard code it
|
|
INSERT INTO pg_dist_transaction VALUES (0, 'citus_0_should_commit');
|
|
SELECT recover_prepared_transactions();
|
|
recover_prepared_transactions
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- the table should be seen
|
|
SELECT * FROM should_commit;
|
|
value
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
-- set the original back
|
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
RESET citus.task_executor_type;
|
|
-- make sure undistribute table works fine
|
|
ALTER TABLE test DROP CONSTRAINT foreign_key;
|
|
SELECT undistribute_table('test_2');
|
|
NOTICE: creating a new table for single_node.test_2
|
|
NOTICE: moving the data of single_node.test_2
|
|
NOTICE: dropping the old single_node.test_2
|
|
NOTICE: renaming the new table to single_node.test_2
|
|
undistribute_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'test_2'::regclass;
|
|
logicalrelid | partmethod | partkey | colocationid | repmodel
|
|
---------------------------------------------------------------------
|
|
(0 rows)
|
|
|
|
CREATE TABLE reference_table_1 (col_1 INT UNIQUE, col_2 INT UNIQUE, UNIQUE (col_2, col_1));
|
|
SELECT create_reference_table('reference_table_1');
|
|
create_reference_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE distributed_table_1 (col_1 INT UNIQUE);
|
|
SELECT create_distributed_table('distributed_table_1', 'col_1');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE citus_local_table_1 (col_1 INT UNIQUE);
|
|
SELECT citus_add_local_table_to_metadata('citus_local_table_1');
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE partitioned_table_1 (col_1 INT UNIQUE, col_2 INT) PARTITION BY RANGE (col_1);
|
|
CREATE TABLE partitioned_table_1_100_200 PARTITION OF partitioned_table_1 FOR VALUES FROM (100) TO (200);
|
|
CREATE TABLE partitioned_table_1_200_300 PARTITION OF partitioned_table_1 FOR VALUES FROM (200) TO (300);
|
|
SELECT create_distributed_table('partitioned_table_1', 'col_1');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
|
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_2 FOREIGN KEY (col_2) REFERENCES reference_table_1(col_1);
|
|
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_3 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_1);
|
|
ALTER TABLE citus_local_table_1 ADD CONSTRAINT fkey_4 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
|
ALTER TABLE partitioned_table_1 ADD CONSTRAINT fkey_5 FOREIGN KEY (col_1) REFERENCES reference_table_1(col_2);
|
|
SELECT undistribute_table('partitioned_table_1', cascade_via_foreign_keys=>true);
|
|
NOTICE: converting the partitions of single_node.partitioned_table_1
|
|
NOTICE: creating a new table for single_node.partitioned_table_1_100_200
|
|
NOTICE: moving the data of single_node.partitioned_table_1_100_200
|
|
NOTICE: dropping the old single_node.partitioned_table_1_100_200
|
|
NOTICE: renaming the new table to single_node.partitioned_table_1_100_200
|
|
NOTICE: creating a new table for single_node.partitioned_table_1_200_300
|
|
NOTICE: moving the data of single_node.partitioned_table_1_200_300
|
|
NOTICE: dropping the old single_node.partitioned_table_1_200_300
|
|
NOTICE: renaming the new table to single_node.partitioned_table_1_200_300
|
|
NOTICE: creating a new table for single_node.partitioned_table_1
|
|
NOTICE: dropping the old single_node.partitioned_table_1
|
|
NOTICE: renaming the new table to single_node.partitioned_table_1
|
|
NOTICE: creating a new table for single_node.reference_table_1
|
|
NOTICE: moving the data of single_node.reference_table_1
|
|
NOTICE: dropping the old single_node.reference_table_1
|
|
NOTICE: renaming the new table to single_node.reference_table_1
|
|
NOTICE: creating a new table for single_node.distributed_table_1
|
|
NOTICE: moving the data of single_node.distributed_table_1
|
|
NOTICE: dropping the old single_node.distributed_table_1
|
|
NOTICE: renaming the new table to single_node.distributed_table_1
|
|
NOTICE: creating a new table for single_node.citus_local_table_1
|
|
NOTICE: moving the data of single_node.citus_local_table_1
|
|
NOTICE: dropping the old single_node.citus_local_table_1
|
|
NOTICE: renaming the new table to single_node.citus_local_table_1
|
|
undistribute_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE TABLE local_table_1 (col_1 INT UNIQUE);
|
|
CREATE TABLE local_table_2 (col_1 INT UNIQUE);
|
|
CREATE TABLE local_table_3 (col_1 INT UNIQUE);
|
|
ALTER TABLE local_table_2 ADD CONSTRAINT fkey_6 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
|
ALTER TABLE local_table_3 ADD CONSTRAINT fkey_7 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
|
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_8 FOREIGN KEY (col_1) REFERENCES local_table_1(col_1);
|
|
SELECT citus_add_local_table_to_metadata('local_table_2', cascade_via_foreign_keys=>true);
|
|
citus_add_local_table_to_metadata
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE PROCEDURE call_delegation(x int) LANGUAGE plpgsql AS $$
|
|
BEGIN
|
|
INSERT INTO test (x) VALUES ($1);
|
|
END;$$;
|
|
SELECT * FROM pg_dist_node;
|
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
|
---------------------------------------------------------------------
|
|
4 | 0 | localhost | 57636 | default | t | t | primary | default | t | t
|
|
(1 row)
|
|
|
|
SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
CREATE FUNCTION function_delegation(int) RETURNS void AS $$
|
|
BEGIN
|
|
UPDATE test SET y = y + 1 WHERE x < $1;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
SELECT create_distributed_function('function_delegation(int)', '$1', 'test');
|
|
create_distributed_function
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET client_min_messages TO DEBUG1;
|
|
CALL call_delegation(1);
|
|
DEBUG: not pushing down procedure to the same node
|
|
SELECT function_delegation(1);
|
|
DEBUG: not pushing down function to the same node
|
|
function_delegation
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET client_min_messages TO WARNING;
|
|
DROP TABLE test CASCADE;
|
|
CREATE OR REPLACE FUNCTION pg_catalog.get_all_active_client_backend_count()
|
|
RETURNS bigint
|
|
LANGUAGE C STRICT
|
|
AS 'citus', $$get_all_active_client_backend_count$$;
|
|
-- set the cached connections to zero
|
|
-- and execute a distributed query so that
|
|
-- we end up with zero cached connections afterwards
|
|
ALTER SYSTEM SET citus.max_cached_conns_per_worker TO 0;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
-- disable deadlock detection and re-trigger 2PC recovery
|
|
-- once more when citus.max_cached_conns_per_worker is zero
|
|
-- so that we can be sure that the connections established for
|
|
-- maintanince daemon is closed properly.
|
|
-- this is to prevent random failures in the tests (otherwise, we
|
|
-- might see connections established for this operations)
|
|
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
|
ALTER SYSTEM SET citus.recover_2pc_interval TO '1ms';
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
SELECT pg_sleep(0.1);
|
|
pg_sleep
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- now that last 2PC recovery is done, we're good to disable it
|
|
ALTER SYSTEM SET citus.recover_2pc_interval TO '-1';
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
-- test alter_distributed_table UDF
|
|
CREATE TABLE adt_table (a INT, b INT);
|
|
CREATE TABLE adt_col (a INT UNIQUE, b INT);
|
|
CREATE TABLE adt_ref (a INT REFERENCES adt_col(a));
|
|
SELECT create_distributed_table('adt_table', 'a', colocate_with:='none');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('adt_col', 'a', colocate_with:='adt_table');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT create_distributed_table('adt_ref', 'a', colocate_with:='adt_table');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO adt_table VALUES (1, 2), (3, 4), (5, 6);
|
|
INSERT INTO adt_col VALUES (3, 4), (5, 6), (7, 8);
|
|
INSERT INTO adt_ref VALUES (3), (5);
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 4
|
|
adt_ref | distributed | a | 4
|
|
adt_table | distributed | a | 4
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref, adt_table
|
|
(1 row)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SELECT alter_distributed_table('adt_table', shard_count:=6, cascade_to_colocated:=true);
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 6
|
|
adt_ref | distributed | a | 6
|
|
adt_table | distributed | a | 6
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref, adt_table
|
|
(1 row)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SELECT alter_distributed_table('adt_table', distribution_column:='b', colocate_with:='none');
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text LIKE 'adt%';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_col | distributed | a | 6
|
|
adt_ref | distributed | a | 6
|
|
adt_table | distributed | b | 6
|
|
(3 rows)
|
|
|
|
SELECT STRING_AGG(table_name::text, ', ' ORDER BY 1) AS "Colocation Groups" FROM public.citus_tables WHERE table_name::text LIKE 'adt%' GROUP BY colocation_id ORDER BY 1;
|
|
Colocation Groups
|
|
---------------------------------------------------------------------
|
|
adt_col, adt_ref
|
|
adt_table
|
|
(2 rows)
|
|
|
|
SELECT conrelid::regclass::text AS "Referencing Table", pg_get_constraintdef(oid, true) AS "Definition" FROM pg_constraint
|
|
WHERE (conrelid::regclass::text = 'adt_col' OR confrelid::regclass::text = 'adt_col') ORDER BY 1;
|
|
Referencing Table | Definition
|
|
---------------------------------------------------------------------
|
|
adt_col | UNIQUE (a)
|
|
adt_ref | FOREIGN KEY (a) REFERENCES adt_col(a)
|
|
(2 rows)
|
|
|
|
SELECT * FROM adt_table ORDER BY 1;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
1 | 2
|
|
3 | 4
|
|
5 | 6
|
|
(3 rows)
|
|
|
|
SELECT * FROM adt_col ORDER BY 1;
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
3 | 4
|
|
5 | 6
|
|
7 | 8
|
|
(3 rows)
|
|
|
|
SELECT * FROM adt_ref ORDER BY 1;
|
|
a
|
|
---------------------------------------------------------------------
|
|
3
|
|
5
|
|
(2 rows)
|
|
|
|
-- make sure that COPY (e.g., INSERT .. SELECT) and
|
|
-- alter_distributed_table works in the same TX
|
|
BEGIN;
|
|
SET LOCAL citus.enable_local_execution=OFF;
|
|
INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x;
|
|
SELECT alter_distributed_table('adt_table', distribution_column:='a');
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
BEGIN;
|
|
INSERT INTO adt_table SELECT x, x+1 FROM generate_series(1, 1000) x;
|
|
SELECT alter_distributed_table('adt_table', distribution_column:='a');
|
|
alter_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT COUNT(*) FROM adt_table;
|
|
count
|
|
---------------------------------------------------------------------
|
|
1003
|
|
(1 row)
|
|
|
|
END;
|
|
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table';
|
|
table_name | citus_table_type | distribution_column | shard_count
|
|
---------------------------------------------------------------------
|
|
adt_table | distributed | a | 6
|
|
(1 row)
|
|
|
|
\c - - - :master_port
|
|
-- sometimes Postgres is a little slow to terminate the backends
|
|
-- even if PGFinish is sent. So, to prevent any flaky tests, sleep
|
|
SELECT pg_sleep(0.1);
|
|
pg_sleep
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- since max_cached_conns_per_worker == 0 at this point, the
|
|
-- backend(s) that execute on the shards will be terminated
|
|
-- so show that there is only a single client backend,
|
|
-- which is actually the backend that executes here
|
|
SET search_path TO single_node;
|
|
SELECT count(*) from should_commit;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
SELECT pg_catalog.get_all_active_client_backend_count();
|
|
get_all_active_client_backend_count
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
BEGIN;
|
|
SET LOCAL citus.shard_count TO 32;
|
|
SET LOCAL citus.force_max_query_parallelization TO ON;
|
|
SET LOCAL citus.enable_local_execution TO false;
|
|
CREATE TABLE test (a int);
|
|
SET citus.shard_replication_factor TO 1;
|
|
SELECT create_distributed_table('test', 'a');
|
|
create_distributed_table
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT count(*) FROM test;
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
-- now, we should have additional 32 connections
|
|
SELECT pg_catalog.get_all_active_client_backend_count();
|
|
get_all_active_client_backend_count
|
|
---------------------------------------------------------------------
|
|
33
|
|
(1 row)
|
|
|
|
ROLLBACK;
|
|
\c - - - :master_port
|
|
SET search_path TO single_node;
|
|
-- simulate that even if there is no connection slots
|
|
-- to connect, Citus can switch to local execution
|
|
SET citus.force_max_query_parallelization TO false;
|
|
SET citus.log_remote_commands TO ON;
|
|
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
SELECT pg_sleep(0.1);
|
|
pg_sleep
|
|
---------------------------------------------------------------------
|
|
|
|
(1 row)
|
|
|
|
SET citus.executor_slow_start_interval TO 10;
|
|
SELECT count(*) from another_schema_table;
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
UPDATE another_schema_table SET b = b;
|
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b
|
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b
|
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = b
|
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = b
|
|
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
|
|
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
|
|
-- COPY command
|
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a IS NOT NULL)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE (a IS NOT NULL)
|
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
-- multi-row INSERTs
|
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) VALUES (6,6)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) VALUES (2,2)
|
|
-- INSERT..SELECT with re-partitioning when using local execution
|
|
BEGIN;
|
|
INSERT INTO another_schema_table VALUES (1,100);
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 (a, b) VALUES (1, 100)
|
|
INSERT INTO another_schema_table VALUES (2,100);
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 (a, b) VALUES (2, 100)
|
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630513_to_2,repartitioned_results_xxxxx_from_90630514_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630514_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
|
SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b;
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
100 | 1
|
|
100 | 2
|
|
(2 rows)
|
|
|
|
ROLLBACK;
|
|
-- intermediate results
|
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
|
SELECT count(*) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
|
count
|
|
---------------------------------------------------------------------
|
|
7
|
|
(1 row)
|
|
|
|
-- this is to get ready for the next tests
|
|
TRUNCATE another_schema_table;
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
-- copy can use local execution even if there is no connection available
|
|
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
CONTEXT: COPY another_schema_table, line 1: "1"
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
CONTEXT: COPY another_schema_table, line 2: "2"
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
CONTEXT: COPY another_schema_table, line 3: "3"
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
CONTEXT: COPY another_schema_table, line 6: "6"
|
|
-- INSERT .. SELECT with co-located intermediate results
|
|
SET citus.log_remote_commands to false;
|
|
CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a);
|
|
SET citus.log_local_commands to true;
|
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING;
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *;
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
|
a | b
|
|
---------------------------------------------------------------------
|
|
1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
(10 rows)
|
|
|
|
-- INSERT .. SELECT with co-located intermediate result for non-binary input
|
|
WITH cte_1 AS
|
|
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value)
|
|
SELECT count(*) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value single_node.new_type)) cte_1
|
|
count
|
|
---------------------------------------------------------------------
|
|
1001
|
|
(1 row)
|
|
|
|
-- test with NULL columns
|
|
ALTER TABLE non_binary_copy_test ADD COLUMN z INT;
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630515, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630516, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630517, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
|
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630518, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
|
WITH cte_1 AS
|
|
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
|
SELECT bool_and(z is null) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
|
|
bool_and
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
-- test with type coersion (int -> text) and also NULL values with coersion
|
|
WITH cte_1 AS
|
|
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
|
|
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
|
NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS count, count(DISTINCT (z)::text) AS count FROM (SELECT intermediate_result.key, intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, z integer)) cte_1
|
|
count | count
|
|
---------------------------------------------------------------------
|
|
1001 | 0
|
|
(1 row)
|
|
|
|
-- lets flush the copy often to make sure everyhing is fine
|
|
SET citus.local_copy_flush_threshold TO 1;
|
|
TRUNCATE another_schema_table;
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
|
INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i;
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
NOTICE: executing the copy locally for shard xxxxx
|
|
WITH cte_1 AS
|
|
(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *)
|
|
SELECT count(*) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
|
count
|
|
---------------------------------------------------------------------
|
|
0
|
|
(1 row)
|
|
|
|
WITH cte_1 AS
|
|
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
|
SELECT bool_and(z is null) FROM cte_1;
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
|
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
|
|
bool_and
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
RESET citus.local_copy_flush_threshold;
|
|
-- if the local execution is disabled, we cannot failover to
|
|
-- local execution and the queries would fail
|
|
SET citus.enable_local_execution TO false;
|
|
SELECT count(*) from another_schema_table;
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
UPDATE another_schema_table SET b = b;
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
|
SELECT count(*) FROM cte_1;
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|
-- copy fails if local execution is disabled and there is no connection slot
|
|
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
|
ERROR: could not find an available connection
|
|
HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish
|
|
CONTEXT: COPY another_schema_table, line 1: "1"
|
|
-- set the values to originals back
|
|
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
|
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
|
SELECT pg_reload_conf();
|
|
pg_reload_conf
|
|
---------------------------------------------------------------------
|
|
t
|
|
(1 row)
|
|
|
|
-- suppress notices
|
|
SET client_min_messages TO error;
|
|
-- cannot remove coordinator since a reference table exists on coordinator and no other worker nodes are added
|
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
|
ERROR: cannot remove the last worker node because there are reference tables and it would cause data loss on reference tables
|
|
HINT: To proceed, either drop the reference tables or use undistribute_table() function to convert them to local tables
|
|
-- Cleanup
|
|
DROP SCHEMA single_node CASCADE;
|
|
-- Remove the coordinator again
|
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
|
?column?
|
|
---------------------------------------------------------------------
|
|
1
|
|
(1 row)
|
|
|
|
-- restart nodeid sequence so that multi_cluster_management still has the same
|
|
-- nodeids
|
|
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 1;
|