mirror of https://github.com/citusdata/citus.git
1175 lines
41 KiB
Plaintext
1175 lines
41 KiB
Plaintext
--
|
|
-- MULTI_COPY
|
|
--
|
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
|
|
-- Create a new hash-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_hash (
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test COPY into empty hash-partitioned table
|
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
ERROR: could not find any shards into which to copy
|
|
DETAIL: No shards exist for distributed table "customer_copy_hash".
|
|
HINT: Run master_create_worker_shards to create shards and try again.
|
|
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test empty copy
|
|
COPY customer_copy_hash FROM STDIN;
|
|
-- Test syntax error
|
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
|
ERROR: invalid input syntax for integer: "1,customer1"
|
|
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Test primary key violation
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_560048"
|
|
DETAIL: Key (c_custkey)=(2) already exists.
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Test headers option
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
|
-- Confirm that only first row was skipped
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
3
|
|
(1 row)
|
|
|
|
-- Test force_not_null option
|
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
|
-- Confirm that value is not null
|
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
|
count
|
|
-------
|
|
1
|
|
(1 row)
|
|
|
|
-- Test force_null option
|
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
|
-- Confirm that value is null
|
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Test null violation
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
ERROR: null value in column "c_name" violates not-null constraint
|
|
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
5
|
|
(1 row)
|
|
|
|
-- Test server-side copy from program
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
|
WITH (DELIMITER ' ');
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
|
count
|
|
-------
|
|
1
|
|
(1 row)
|
|
|
|
-- Test server-side copy from file
|
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
1006
|
|
(1 row)
|
|
|
|
-- Test client-side copy from file
|
|
\copy customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
count
|
|
-------
|
|
2006
|
|
(1 row)
|
|
|
|
-- Make sure that master_update_shard_statistics() only updates shard length for
|
|
-- hash-partitioned tables
|
|
SELECT master_update_shard_statistics(560000);
|
|
master_update_shard_statistics
|
|
--------------------------------
|
|
8192
|
|
(1 row)
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
|
|
shardid | shardminvalue | shardmaxvalue
|
|
---------+---------------+---------------
|
|
560000 | -2147483648 | -2080374785
|
|
(1 row)
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
|
shardid | shardlength
|
|
---------+-------------
|
|
560000 | 8192
|
|
(1 row)
|
|
|
|
-- Create a new hash-partitioned table with default now() function
|
|
CREATE TABLE customer_with_default(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_time timestamp default now());
|
|
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test with default values for now() function
|
|
COPY customer_with_default (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
-- Confirm that data was copied with now() function
|
|
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
|
count
|
|
-------
|
|
2
|
|
(1 row)
|
|
|
|
-- Add columns to the table and perform a COPY
|
|
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
|
|
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
|
|
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
|
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
|
|
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
|
|
10 | customer10 | | | | | | | 1 | 5
|
|
(1 row)
|
|
|
|
-- Test dropping an intermediate column
|
|
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
|
|
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
|
|
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
|
|
11 | customer11 | | | | | | | 5
|
|
(1 row)
|
|
|
|
-- Test dropping the last column
|
|
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
|
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
|
|
12 | customer12 | | | | | |
|
|
(1 row)
|
|
|
|
-- Create a new range-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_range (
|
|
c_custkey integer,
|
|
c_name varchar(25),
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test COPY into empty range-partitioned table
|
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
ERROR: could not find any shards into which to copy
|
|
DETAIL: No shards exist for distributed table "customer_copy_range".
|
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
|
\gset
|
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
|
WHERE shardid = :new_shard_id;
|
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
|
\gset
|
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
WHERE shardid = :new_shard_id;
|
|
-- Test copy into range-partitioned table
|
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
-- Check whether data went into the right shard (maybe)
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
|
FROM customer_copy_range WHERE c_custkey <= 500;
|
|
min | max | avg | count
|
|
-----+-----+----------------------+-------
|
|
1 | 500 | 250.5000000000000000 | 500
|
|
(1 row)
|
|
|
|
-- Check whether data was copied
|
|
SELECT count(*) FROM customer_copy_range;
|
|
count
|
|
-------
|
|
1000
|
|
(1 row)
|
|
|
|
-- Manipulate min/max values and check shard statistics for new shard
|
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
|
|
WHERE shardid = :new_shard_id;
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
shardid | shardminvalue | shardmaxvalue
|
|
---------+---------------+---------------
|
|
560129 | 1501 | 2000
|
|
(1 row)
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
|
shardid | shardlength
|
|
---------+-------------
|
|
560129 | 0
|
|
560129 | 0
|
|
(2 rows)
|
|
|
|
-- Update shard statistics for range-partitioned shard and check that only the
|
|
-- shard length is updated.
|
|
SELECT master_update_shard_statistics(:new_shard_id);
|
|
master_update_shard_statistics
|
|
--------------------------------
|
|
131072
|
|
(1 row)
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
shardid | shardminvalue | shardmaxvalue
|
|
---------+---------------+---------------
|
|
560129 | 1501 | 2000
|
|
(1 row)
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
|
shardid | shardlength
|
|
---------+-------------
|
|
560129 | 131072
|
|
560129 | 131072
|
|
(2 rows)
|
|
|
|
-- Revert back min/max value updates
|
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
WHERE shardid = :new_shard_id;
|
|
-- Create a new append-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_append (
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117));
|
|
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test syntax error
|
|
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
|
ERROR: invalid input syntax for integer: "notinteger"
|
|
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
|
|
-- Test that no shard is created for failing copy
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Test empty copy
|
|
COPY customer_copy_append FROM STDIN;
|
|
-- Test that no shard is created for copying zero rows
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- Test proper copy
|
|
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
|
-- Check whether data was copied properly
|
|
SELECT * FROM customer_copy_append;
|
|
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
|
|
-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
|
|
1 | customer1 | | | | | |
|
|
2 | customer2 | | | | | |
|
|
(2 rows)
|
|
|
|
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560131;
|
|
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560131;
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560131;
|
|
shardid | shardminvalue | shardmaxvalue
|
|
---------+---------------+---------------
|
|
560131 | 1501 | 2000
|
|
(1 row)
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560131;
|
|
shardid | shardlength
|
|
---------+-------------
|
|
560131 | 0
|
|
560131 | 0
|
|
(2 rows)
|
|
|
|
-- Update shard statistics for append-partitioned shard
|
|
SELECT master_update_shard_statistics(560131);
|
|
master_update_shard_statistics
|
|
--------------------------------
|
|
8192
|
|
(1 row)
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560131;
|
|
shardid | shardminvalue | shardmaxvalue
|
|
---------+---------------+---------------
|
|
560131 | 1 | 2
|
|
(1 row)
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560131;
|
|
shardid | shardlength
|
|
---------+-------------
|
|
560131 | 8192
|
|
560131 | 8192
|
|
(2 rows)
|
|
|
|
-- Create lineitem table
|
|
CREATE TABLE lineitem_copy_append (
|
|
l_orderkey bigint not null,
|
|
l_partkey integer not null,
|
|
l_suppkey integer not null,
|
|
l_linenumber integer not null,
|
|
l_quantity decimal(15, 2) not null,
|
|
l_extendedprice decimal(15, 2) not null,
|
|
l_discount decimal(15, 2) not null,
|
|
l_tax decimal(15, 2) not null,
|
|
l_returnflag char(1) not null,
|
|
l_linestatus char(1) not null,
|
|
l_shipdate date not null,
|
|
l_commitdate date not null,
|
|
l_receiptdate date not null,
|
|
l_shipinstruct char(25) not null,
|
|
l_shipmode char(10) not null,
|
|
l_comment varchar(44) not null);
|
|
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test multiple shard creation
|
|
SET citus.shard_max_size TO '256kB';
|
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
|
count
|
|
-------
|
|
5
|
|
(1 row)
|
|
|
|
-- Test round robin shard policy
|
|
SET citus.shard_replication_factor TO 1;
|
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
|
|
SELECT
|
|
pg_dist_shard_placement.shardid,
|
|
pg_dist_shard_placement.nodeport
|
|
FROM
|
|
pg_dist_shard,
|
|
pg_dist_shard_placement
|
|
WHERE
|
|
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
|
|
logicalrelid = 'lineitem_copy_append'::regclass
|
|
ORDER BY
|
|
pg_dist_shard.shardid DESC
|
|
LIMIT
|
|
5;
|
|
shardid | nodeport
|
|
---------+----------
|
|
560141 | 57637
|
|
560140 | 57638
|
|
560139 | 57637
|
|
560138 | 57638
|
|
560137 | 57637
|
|
(5 rows)
|
|
|
|
-- Ensure that copy from worker node of table with serial column fails
|
|
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
|
|
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Connect to the first worker node
|
|
\c - - - 57637
|
|
-- Test copy from the worker node
|
|
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
ERROR: relation "public.customer_worker_copy_append_seq_seq_seq" does not exist
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
-- Create customer table for the worker copy with constraint and index
|
|
CREATE TABLE customer_worker_copy_append (
|
|
c_custkey integer ,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
CREATE INDEX ON customer_worker_copy_append (c_name);
|
|
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
|
|
WARNING: table "customer_worker_copy_append" has a UNIQUE or EXCLUDE constraint
|
|
DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
|
|
HINT: Consider using hash partitioning.
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Connect to the first worker node
|
|
\c - - - 57637
|
|
-- Test copy from the worker node
|
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
-- Make sure we don't use 2PC when connecting to master, even if requested
|
|
BEGIN;
|
|
SET LOCAL citus.multi_shard_commit_protocol TO '2pc';
|
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
COMMIT;
|
|
-- Test if there is no relation to copy data with the worker copy
|
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
WARNING: relation "lineitem_copy_none" does not exist
|
|
CONTEXT: while executing command on localhost:57636
|
|
ERROR: could not run copy from the worker node
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
-- Test the content of the table
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
|
|
min | max | avg | count
|
|
-----+------+-----------------------+-------
|
|
1 | 7000 | 4443.8028800000000000 | 2000
|
|
(1 row)
|
|
|
|
-- Test schema support on append partitioned tables
|
|
CREATE SCHEMA append;
|
|
CREATE TABLE append.customer_copy (
|
|
c_custkey integer ,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117));
|
|
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- Test copy from the master node
|
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
|
|
-- Test copy from the worker node
|
|
\c - - - 57637
|
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
-- Test the content of the table
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
|
min | max | avg | count
|
|
-----+------+-----------------------+-------
|
|
1 | 7000 | 4443.8028800000000000 | 2000
|
|
(1 row)
|
|
|
|
-- Test with table name which contains special character
|
|
CREATE TABLE "customer_with_special_\\_character"(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null);
|
|
SELECT master_create_distributed_table('"customer_with_special_\\_character"', 'c_custkey', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_worker_shards('"customer_with_special_\\_character"', 4, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY "customer_with_special_\\_character" (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM "customer_with_special_\\_character";
|
|
count
|
|
-------
|
|
2
|
|
(1 row)
|
|
|
|
-- Test with table name which starts with number
|
|
CREATE TABLE "1_customer"(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null);
|
|
SELECT master_create_distributed_table('"1_customer"', 'c_custkey', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_worker_shards('"1_customer"', 4, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY "1_customer" (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM "1_customer";
|
|
count
|
|
-------
|
|
2
|
|
(1 row)
|
|
|
|
-- Test COPY with types having different Oid at master and workers
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
-- Create same types in worker1
|
|
\c - - - :worker_1_port
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
-- Create same types in worker2
|
|
\c - - - :worker_2_port
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
-- Connect back to master
|
|
\c - - - :master_port
|
|
-- Test array of user-defined type with hash distribution
|
|
CREATE TABLE packed_numbers_hash (
|
|
id integer,
|
|
packed_numbers number_pack[]
|
|
);
|
|
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO :'temp_dir''copy_test_array_of_composite';
|
|
COPY packed_numbers_hash FROM :'temp_dir''copy_test_array_of_composite';
|
|
-- Verify data is actually copied
|
|
SELECT * FROM packed_numbers_hash;
|
|
id | packed_numbers
|
|
----+-----------------------
|
|
1 | {"(42,42)","(42,42)"}
|
|
(1 row)
|
|
|
|
-- Test composite type containing an element with different Oid with hash distribution
|
|
CREATE TABLE super_packed_numbers_hash (
|
|
id integer,
|
|
super_packed_number super_number_pack
|
|
);
|
|
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
|
|
master_create_worker_shards
|
|
-----------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO :'temp_dir''copy_test_composite_of_composite';
|
|
COPY super_packed_numbers_hash FROM :'temp_dir''copy_test_composite_of_composite';
|
|
-- Verify data is actually copied
|
|
SELECT * FROM super_packed_numbers_hash;
|
|
id | super_packed_number
|
|
----+-----------------------
|
|
1 | ("(42,42)","(42,42)")
|
|
(1 row)
|
|
|
|
-- Test array of user-defined type with append distribution
|
|
CREATE TABLE packed_numbers_append (
|
|
id integer,
|
|
packed_numbers number_pack[]
|
|
);
|
|
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY packed_numbers_append FROM :'temp_dir''copy_test_array_of_composite';
|
|
-- Verify data is actually copied
|
|
SELECT * FROM packed_numbers_append;
|
|
id | packed_numbers
|
|
----+-----------------------
|
|
1 | {"(42,42)","(42,42)"}
|
|
(1 row)
|
|
|
|
-- Test composite type containing an element with different Oid with append distribution
|
|
CREATE TABLE super_packed_numbers_append (
|
|
id integer,
|
|
super_packed_number super_number_pack
|
|
);
|
|
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY super_packed_numbers_append FROM :'temp_dir''copy_test_composite_of_composite';
|
|
-- Verify data is actually copied
|
|
SELECT * FROM super_packed_numbers_append;
|
|
id | super_packed_number
|
|
----+-----------------------
|
|
1 | ("(42,42)","(42,42)")
|
|
(1 row)
|
|
|
|
-- Test copy on append for composite type partition column
|
|
CREATE TABLE composite_partition_column_table(
|
|
id integer,
|
|
composite_column number_pack
|
|
);
|
|
SELECT master_create_distributed_table('composite_partition_column_table', 'composite_column', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY composite_partition_column_table FROM STDIN WITH (FORMAT 'csv');
|
|
WARNING: function min(number_pack) does not exist
|
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
|
CONTEXT: while executing command on localhost:57637
|
|
WARNING: function min(number_pack) does not exist
|
|
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
|
|
CONTEXT: while executing command on localhost:57638
|
|
WARNING: could not get statistics for shard public.composite_partition_column_table_560164
|
|
DETAIL: Setting shard statistics to NULL
|
|
ERROR: failure on connection marked as essential: localhost:57637
|
|
-- Test copy on append distributed tables do not create shards on removed workers
|
|
CREATE TABLE numbers_append (a int, b int);
|
|
SELECT master_create_distributed_table('numbers_append', 'a', 'append');
|
|
master_create_distributed_table
|
|
---------------------------------
|
|
|
|
(1 row)
|
|
|
|
-- no shards is created yet
|
|
SELECT shardid, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
|
shardid | nodename | nodeport
|
|
---------+----------+----------
|
|
(0 rows)
|
|
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
-- verify there are shards at both workers
|
|
SELECT shardid, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
|
shardid | nodename | nodeport
|
|
---------+-----------+----------
|
|
560165 | localhost | 57637
|
|
560165 | localhost | 57638
|
|
560166 | localhost | 57638
|
|
560166 | localhost | 57637
|
|
(4 rows)
|
|
|
|
-- disable the first node
|
|
SELECT master_disable_node('localhost', :worker_1_port);
|
|
NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57637) to activate this node back.
|
|
master_disable_node
|
|
---------------------
|
|
|
|
(1 row)
|
|
|
|
-- set replication factor to 1 so that copy will
|
|
-- succeed without replication count error
|
|
SET citus.shard_replication_factor TO 1;
|
|
-- add two new shards and verify they are created at the other node
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
SELECT shardid, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
|
shardid | nodename | nodeport
|
|
---------+-----------+----------
|
|
560165 | localhost | 57637
|
|
560165 | localhost | 57638
|
|
560166 | localhost | 57638
|
|
560166 | localhost | 57637
|
|
560167 | localhost | 57638
|
|
560168 | localhost | 57638
|
|
(6 rows)
|
|
|
|
-- add the node back
|
|
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
|
|
NOTICE: Replicating reference table "orders_reference" to the node localhost:57637
|
|
NOTICE: Replicating reference table "customer" to the node localhost:57637
|
|
NOTICE: Replicating reference table "nation" to the node localhost:57637
|
|
NOTICE: Replicating reference table "part" to the node localhost:57637
|
|
NOTICE: Replicating reference table "supplier" to the node localhost:57637
|
|
NOTICE: Replicating reference table "multi_outer_join_right_reference" to the node localhost:57637
|
|
NOTICE: Replicating reference table "multi_outer_join_third_reference" to the node localhost:57637
|
|
?column?
|
|
----------
|
|
1
|
|
(1 row)
|
|
|
|
RESET citus.shard_replication_factor;
|
|
-- add two new shards and verify they are created at both workers
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
|
SELECT shardid, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
|
shardid | nodename | nodeport
|
|
---------+-----------+----------
|
|
560165 | localhost | 57637
|
|
560165 | localhost | 57638
|
|
560166 | localhost | 57638
|
|
560166 | localhost | 57637
|
|
560167 | localhost | 57638
|
|
560168 | localhost | 57638
|
|
560169 | localhost | 57637
|
|
560169 | localhost | 57638
|
|
560170 | localhost | 57638
|
|
560170 | localhost | 57637
|
|
(10 rows)
|
|
|
|
DROP TABLE numbers_append;
|
|
-- Test copy failures against connection failures
|
|
-- create and switch to test user
|
|
CREATE USER test_user;
|
|
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
|
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
|
SELECT * FROM run_command_on_workers('CREATE USER test_user');
|
|
nodename | nodeport | success | result
|
|
-----------+----------+---------+-------------
|
|
localhost | 57637 | t | CREATE ROLE
|
|
localhost | 57638 | t | CREATE ROLE
|
|
(2 rows)
|
|
|
|
\c - test_user
|
|
SET citus.shard_count to 4;
|
|
CREATE TABLE numbers_hash (a int, b int);
|
|
SELECT create_distributed_table('numbers_hash', 'a');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
|
-- verify each placement is active
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560171 | 1 | localhost | 57637
|
|
560171 | 1 | localhost | 57638
|
|
560172 | 1 | localhost | 57637
|
|
560172 | 1 | localhost | 57638
|
|
560173 | 1 | localhost | 57637
|
|
560173 | 1 | localhost | 57638
|
|
560174 | 1 | localhost | 57637
|
|
560174 | 1 | localhost | 57638
|
|
(8 rows)
|
|
|
|
-- create a reference table
|
|
CREATE TABLE numbers_reference(a int, b int);
|
|
SELECT create_reference_table('numbers_reference');
|
|
create_reference_table
|
|
------------------------
|
|
|
|
(1 row)
|
|
|
|
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
|
-- create another hash distributed table
|
|
CREATE TABLE numbers_hash_other(a int, b int);
|
|
SELECT create_distributed_table('numbers_hash_other', 'a');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560176 | 1 | localhost | 57637
|
|
560176 | 1 | localhost | 57638
|
|
560177 | 1 | localhost | 57637
|
|
560177 | 1 | localhost | 57638
|
|
560178 | 1 | localhost | 57637
|
|
560178 | 1 | localhost | 57638
|
|
560179 | 1 | localhost | 57637
|
|
560179 | 1 | localhost | 57638
|
|
(8 rows)
|
|
|
|
-- manually corrupt pg_dist_shard such that both copies of one shard is placed in
|
|
-- worker_1. This is to test the behavior when no replica of a shard is accessible.
|
|
-- Whole copy operation is supposed to fail and rollback.
|
|
\c - :default_user
|
|
UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port WHERE shardid = 560176;
|
|
-- disable test_user on the first worker
|
|
\c - :default_user - :worker_1_port
|
|
ALTER USER test_user WITH nologin;
|
|
\c - test_user - :master_port
|
|
-- reissue copy
|
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash, line 1: "1,1"
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash, line 2: "2,2"
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash, line 3: "3,3"
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash, line 6: "6,6"
|
|
-- verify shards in the first worker as marked invalid
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560171 | 3 | localhost | 57637
|
|
560171 | 1 | localhost | 57638
|
|
560172 | 3 | localhost | 57637
|
|
560172 | 1 | localhost | 57638
|
|
560173 | 3 | localhost | 57637
|
|
560173 | 1 | localhost | 57638
|
|
560174 | 3 | localhost | 57637
|
|
560174 | 1 | localhost | 57638
|
|
(8 rows)
|
|
|
|
-- try to insert into a reference table copy should fail
|
|
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
|
ERROR: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_reference, line 1: "3,1"
|
|
-- verify shards for reference table are still valid
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560175 | 1 | localhost | 57637
|
|
560175 | 1 | localhost | 57638
|
|
(2 rows)
|
|
|
|
-- try to insert into numbers_hash_other. copy should fail and rollback
|
|
-- since it can not insert into either copies of a shard. shards are expected to
|
|
-- stay valid since the operation is rolled back.
|
|
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
|
WARNING: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
|
ERROR: connection error: localhost:57637
|
|
DETAIL: FATAL: role "test_user" is not permitted to log in
|
|
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
|
-- verify shards for numbers_hash_other are still valid
|
|
-- since copy has failed altogether
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560176 | 1 | localhost | 57637
|
|
560176 | 1 | localhost | 57637
|
|
560177 | 1 | localhost | 57637
|
|
560177 | 1 | localhost | 57638
|
|
560178 | 1 | localhost | 57637
|
|
560178 | 1 | localhost | 57638
|
|
560179 | 1 | localhost | 57637
|
|
560179 | 1 | localhost | 57638
|
|
(8 rows)
|
|
|
|
-- re-enable test_user on the first worker
|
|
\c - :default_user - :worker_1_port
|
|
ALTER USER test_user WITH login;
|
|
-- there is a dangling shard in worker_2, drop it
|
|
\c - test_user - :worker_2_port
|
|
DROP TABLE numbers_hash_other_560176;
|
|
\c - test_user - :master_port
|
|
DROP TABLE numbers_hash;
|
|
DROP TABLE numbers_hash_other;
|
|
DROP TABLE numbers_reference;
|
|
\c - :default_user
|
|
-- test copy failure inside the node
|
|
-- it will be done by changing definition of a shard table
|
|
SET citus.shard_count to 4;
|
|
CREATE TABLE numbers_hash(a int, b int);
|
|
SELECT create_distributed_table('numbers_hash', 'a');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\c - - - :worker_1_port
|
|
ALTER TABLE numbers_hash_560180 DROP COLUMN b;
|
|
\c - - - :master_port
|
|
-- operation will fail to modify a shard and roll back
|
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
|
ERROR: column "b" of relation "numbers_hash_560180" does not exist
|
|
CONTEXT: while executing command on localhost:57637
|
|
COPY numbers_hash, line 1: "1,1"
|
|
-- verify no row is inserted
|
|
SELECT count(a) FROM numbers_hash;
|
|
count
|
|
-------
|
|
0
|
|
(1 row)
|
|
|
|
-- verify shard is still marked as valid
|
|
SELECT shardid, shardstate, nodename, nodeport
|
|
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
|
|
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
|
|
shardid | shardstate | nodename | nodeport
|
|
---------+------------+-----------+----------
|
|
560180 | 1 | localhost | 57637
|
|
560180 | 1 | localhost | 57638
|
|
560181 | 1 | localhost | 57637
|
|
560181 | 1 | localhost | 57638
|
|
560182 | 1 | localhost | 57637
|
|
560182 | 1 | localhost | 57638
|
|
560183 | 1 | localhost | 57637
|
|
560183 | 1 | localhost | 57638
|
|
(8 rows)
|
|
|
|
DROP TABLE numbers_hash;
|
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
|
nodename | nodeport | success | result
|
|
-----------+----------+---------+-----------
|
|
localhost | 57637 | t | DROP ROLE
|
|
localhost | 57638 | t | DROP ROLE
|
|
(2 rows)
|
|
|
|
DROP USER test_user;
|
|
-- Test copy with built-in type without binary output function
|
|
CREATE TABLE test_binaryless_builtin (
|
|
col1 aclitem NOT NULL,
|
|
col2 character varying(255) NOT NULL
|
|
);
|
|
SELECT create_reference_table('test_binaryless_builtin');
|
|
create_reference_table
|
|
------------------------
|
|
|
|
(1 row)
|
|
|
|
\COPY test_binaryless_builtin FROM STDIN WITH (format CSV)
|
|
SELECT * FROM test_binaryless_builtin;
|
|
col1 | col2
|
|
---------------------+-------
|
|
postgres=r/postgres | test
|
|
(1 row)
|
|
|
|
DROP TABLE test_binaryless_builtin;
|
|
-- Test drop table with copy in the same transaction
|
|
BEGIN;
|
|
CREATE TABLE tt1(id int);
|
|
SELECT create_distributed_table('tt1','id');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
\copy tt1 from STDIN;
|
|
DROP TABLE tt1;
|
|
END;
|
|
-- Test dropping a column in front of the partition column
|
|
CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int);
|
|
SELECT create_distributed_table('drop_copy_test_table','col3');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
ALTER TABLE drop_copy_test_table drop column col1;
|
|
COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV;
|
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
|
col2 | col3 | col4
|
|
------+------+------
|
|
| 1 |
|
|
(1 row)
|
|
|
|
ALTER TABLE drop_copy_test_table drop column col4;
|
|
COPY drop_copy_test_table (col2,col3) from STDIN with CSV;
|
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
|
col2 | col3
|
|
------+------
|
|
| 1
|
|
| 1
|
|
(2 rows)
|
|
|
|
DROP TABLE drop_copy_test_table;
|
|
-- There should be no "tt1" shard on the worker nodes
|
|
\c - - - :worker_1_port
|
|
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
|
relname
|
|
---------
|
|
(0 rows)
|
|
|
|
\c - - - :master_port
|
|
-- copy >8MB to a single worker to trigger a flush in PutRemoteCopyData
|
|
BEGIN;
|
|
CREATE UNLOGGED TABLE trigger_flush AS
|
|
SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
|
SELECT create_distributed_table('trigger_flush','a');
|
|
NOTICE: Copying data from local table...
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
ABORT;
|
|
-- trigger switch-over when using single connection per worker
|
|
BEGIN;
|
|
SET citus.shard_count TO 3;
|
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
|
CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int);
|
|
SELECT create_distributed_table('trigger_switchover','a');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
INSERT INTO trigger_switchover
|
|
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,250000) s;
|
|
ABORT;
|
|
-- copy into a table with a JSONB column
|
|
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
|
|
SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none');
|
|
create_distributed_table
|
|
--------------------------
|
|
|
|
(1 row)
|
|
|
|
-- JSONB from text should work
|
|
\COPY copy_jsonb (key, value) FROM STDIN
|
|
SELECT * FROM copy_jsonb ORDER BY key;
|
|
key | value | extra
|
|
-------+----------------------------+-------------
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
(2 rows)
|
|
|
|
-- JSONB from binary should work
|
|
COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
|
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
|
SELECT * FROM copy_jsonb ORDER BY key;
|
|
key | value | extra
|
|
-------+----------------------------+-------------
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
(4 rows)
|
|
|
|
-- JSONB parsing error without validation: no line number
|
|
\COPY copy_jsonb (key, value) FROM STDIN
|
|
ERROR: invalid input syntax for type json
|
|
DETAIL: The input string ended unexpectedly.
|
|
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
|
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
|
TRUNCATE copy_jsonb;
|
|
SET citus.skip_jsonb_validation_in_copy TO off;
|
|
-- JSONB from text should work
|
|
\COPY copy_jsonb (key, value) FROM STDIN
|
|
SELECT * FROM copy_jsonb ORDER BY key;
|
|
key | value | extra
|
|
-------+----------------------------+-------------
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
(2 rows)
|
|
|
|
-- JSONB from binary should work
|
|
COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
|
COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary);
|
|
SELECT * FROM copy_jsonb ORDER BY key;
|
|
key | value | extra
|
|
-------+----------------------------+-------------
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
green | {"b": 0, "g": 255, "r": 0} | ["default"]
|
|
(4 rows)
|
|
|
|
-- JSONB parsing error with validation: should see line number
|
|
\COPY copy_jsonb (key, value) FROM STDIN
|
|
ERROR: invalid input syntax for type json
|
|
DETAIL: The input string ended unexpectedly.
|
|
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
|
|
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
|
|
DROP TABLE copy_jsonb;
|