mirror of https://github.com/citusdata/citus.git
526 lines
16 KiB
Plaintext
526 lines
16 KiB
Plaintext
--
|
|
-- MULTI_COPY
|
|
--
|
|
|
|
|
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 560000;
|
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 560000;
|
|
|
|
|
|
-- Create a new hash-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_hash (
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
|
|
|
-- Test COPY into empty hash-partitioned table
|
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
|
|
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
|
|
|
-- Test empty copy
|
|
COPY customer_copy_hash FROM STDIN;
|
|
\.
|
|
|
|
-- Test syntax error
|
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
|
1,customer1
|
|
2,customer2,
|
|
notinteger,customernot
|
|
\.
|
|
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Test primary key violation
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
2,customer2
|
|
\.
|
|
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Test headers option
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
|
# header
|
|
1,customer1
|
|
2,customer2
|
|
3,customer3
|
|
\.
|
|
|
|
-- Confirm that only first row was skipped
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Test force_not_null option
|
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
|
"4","customer4",""
|
|
\.
|
|
|
|
-- Confirm that value is not null
|
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
|
|
|
-- Test force_null option
|
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
|
"5","customer5",""
|
|
\.
|
|
|
|
-- Confirm that value is null
|
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
|
|
|
-- Test null violation
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
6,customer6
|
|
7,customer7
|
|
8,
|
|
\.
|
|
|
|
-- Confirm that no data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Test server-side copy from program
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
|
WITH (DELIMITER ' ');
|
|
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
|
|
|
-- Test server-side copy from file
|
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
|
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Test client-side copy from file
|
|
\copy customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
|
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM customer_copy_hash;
|
|
|
|
-- Make sure that master_update_shard_statistics() only updates shard length for
|
|
-- hash-partitioned tables
|
|
SELECT master_update_shard_statistics(560000);
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000;
|
|
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000;
|
|
|
|
|
|
-- Create a new hash-partitioned table with default now() function
|
|
CREATE TABLE customer_with_default(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_time timestamp default now());
|
|
|
|
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
|
|
|
|
SELECT master_create_worker_shards('customer_with_default', 64, 1);
|
|
|
|
-- Test with default values for now() function
|
|
COPY customer_with_default (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
\.
|
|
|
|
-- Confirm that data was copied with now() function
|
|
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
|
|
|
|
-- Add columns to the table and perform a COPY
|
|
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
|
|
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
|
|
|
|
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
|
|
10,customer10,1,5
|
|
\.
|
|
|
|
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
|
|
|
|
-- Test dropping an intermediate column
|
|
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
|
|
|
|
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
|
|
11,customer11,5
|
|
\.
|
|
|
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
|
|
|
|
-- Test dropping the last column
|
|
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
|
|
|
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
|
|
12,customer12
|
|
\.
|
|
|
|
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
|
|
|
|
-- Create a new range-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_range (
|
|
c_custkey integer,
|
|
c_name varchar(25),
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
|
|
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
|
|
|
-- Test COPY into empty range-partitioned table
|
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
|
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
|
\gset
|
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
|
WHERE shardid = :new_shard_id;
|
|
|
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
|
\gset
|
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
WHERE shardid = :new_shard_id;
|
|
|
|
-- Test copy into range-partitioned table
|
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
|
|
|
-- Check whether data went into the right shard (maybe)
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
|
FROM customer_copy_range WHERE c_custkey <= 500;
|
|
|
|
-- Check whether data was copied
|
|
SELECT count(*) FROM customer_copy_range;
|
|
|
|
-- Manipulate min/max values and check shard statistics for new shard
|
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000
|
|
WHERE shardid = :new_shard_id;
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
|
|
|
-- Update shard statistics for range-partitioned shard and check that only the
|
|
-- shard length is updated.
|
|
SELECT master_update_shard_statistics(:new_shard_id);
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id;
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id;
|
|
|
|
-- Revert back min/max value updates
|
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
|
WHERE shardid = :new_shard_id;
|
|
|
|
-- Create a new append-partitioned table into which to COPY
|
|
CREATE TABLE customer_copy_append (
|
|
c_custkey integer,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117));
|
|
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
|
|
|
|
-- Test syntax error
|
|
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
notinteger,customernot
|
|
\.
|
|
|
|
-- Test that no shard is created for failing copy
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
|
|
|
-- Test empty copy
|
|
COPY customer_copy_append FROM STDIN;
|
|
\.
|
|
|
|
-- Test that no shard is created for copying zero rows
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
|
|
|
|
-- Test proper copy
|
|
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
\.
|
|
|
|
-- Check whether data was copied properly
|
|
SELECT * FROM customer_copy_append;
|
|
|
|
-- Manipulate manipulate and check shard statistics for append-partitioned table shard
|
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560131;
|
|
UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560131;
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560131;
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560131;
|
|
|
|
-- Update shard statistics for append-partitioned shard
|
|
SELECT master_update_shard_statistics(560131);
|
|
|
|
SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560131;
|
|
SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560131;
|
|
|
|
-- Create lineitem table
|
|
CREATE TABLE lineitem_copy_append (
|
|
l_orderkey bigint not null,
|
|
l_partkey integer not null,
|
|
l_suppkey integer not null,
|
|
l_linenumber integer not null,
|
|
l_quantity decimal(15, 2) not null,
|
|
l_extendedprice decimal(15, 2) not null,
|
|
l_discount decimal(15, 2) not null,
|
|
l_tax decimal(15, 2) not null,
|
|
l_returnflag char(1) not null,
|
|
l_linestatus char(1) not null,
|
|
l_shipdate date not null,
|
|
l_commitdate date not null,
|
|
l_receiptdate date not null,
|
|
l_shipinstruct char(25) not null,
|
|
l_shipmode char(10) not null,
|
|
l_comment varchar(44) not null);
|
|
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
|
|
|
|
-- Test multiple shard creation
|
|
SET citus.shard_max_size TO '256kB';
|
|
|
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
|
|
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
|
|
|
-- Test round robin shard policy
|
|
SET citus.shard_replication_factor TO 1;
|
|
|
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|';
|
|
|
|
SELECT
|
|
pg_dist_shard_placement.shardid,
|
|
pg_dist_shard_placement.nodeport
|
|
FROM
|
|
pg_dist_shard,
|
|
pg_dist_shard_placement
|
|
WHERE
|
|
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
|
|
logicalrelid = 'lineitem_copy_append'::regclass
|
|
ORDER BY
|
|
pg_dist_shard.shardid DESC
|
|
LIMIT
|
|
5;
|
|
|
|
-- Ensure that copy from worker node of table with serial column fails
|
|
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
|
|
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
|
|
|
|
-- Connect to the first worker node
|
|
\c - - - 57637
|
|
|
|
-- Test copy from the worker node
|
|
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
|
|
-- Create customer table for the worker copy with constraint and index
|
|
CREATE TABLE customer_worker_copy_append (
|
|
c_custkey integer ,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117),
|
|
primary key (c_custkey));
|
|
|
|
CREATE INDEX ON customer_worker_copy_append (c_name);
|
|
|
|
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
|
|
|
|
-- Connect to the first worker node
|
|
\c - - - 57637
|
|
|
|
-- Test copy from the worker node
|
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
|
|
-- Test if there is no relation to copy data with the worker copy
|
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
|
|
-- Test the content of the table
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
|
|
|
|
-- Test schema support on append partitioned tables
|
|
CREATE SCHEMA append;
|
|
CREATE TABLE append.customer_copy (
|
|
c_custkey integer ,
|
|
c_name varchar(25) not null,
|
|
c_address varchar(40),
|
|
c_nationkey integer,
|
|
c_phone char(15),
|
|
c_acctbal decimal(15,2),
|
|
c_mktsegment char(10),
|
|
c_comment varchar(117));
|
|
|
|
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
|
|
|
-- Test copy from the master node
|
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
|
|
|
|
-- Test copy from the worker node
|
|
\c - - - 57637
|
|
|
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
|
|
|
-- Connect back to the master node
|
|
\c - - - 57636
|
|
|
|
-- Test the content of the table
|
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
|
|
|
-- Test with table name which contains special character
|
|
CREATE TABLE "customer_with_special_\\_character"(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null);
|
|
|
|
SELECT master_create_distributed_table('"customer_with_special_\\_character"', 'c_custkey', 'hash');
|
|
|
|
SELECT master_create_worker_shards('"customer_with_special_\\_character"', 4, 1);
|
|
|
|
COPY "customer_with_special_\\_character" (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
\.
|
|
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM "customer_with_special_\\_character";
|
|
|
|
-- Test with table name which starts with number
|
|
CREATE TABLE "1_customer"(
|
|
c_custkey integer,
|
|
c_name varchar(25) not null);
|
|
|
|
SELECT master_create_distributed_table('"1_customer"', 'c_custkey', 'hash');
|
|
|
|
SELECT master_create_worker_shards('"1_customer"', 4, 1);
|
|
|
|
COPY "1_customer" (c_custkey, c_name) FROM STDIN
|
|
WITH (FORMAT 'csv');
|
|
1,customer1
|
|
2,customer2
|
|
\.
|
|
|
|
-- Confirm that data was copied
|
|
SELECT count(*) FROM "1_customer";
|
|
|
|
-- Test COPY with types having different Oid at master and workers
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
|
|
-- Create same types in worker1
|
|
\c - - - :worker_1_port
|
|
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
|
|
-- Create same types in worker2
|
|
\c - - - :worker_2_port
|
|
|
|
CREATE TYPE number_pack AS (
|
|
number1 integer,
|
|
number2 integer
|
|
);
|
|
|
|
CREATE TYPE super_number_pack AS (
|
|
packed_number1 number_pack,
|
|
packed_number2 number_pack
|
|
);
|
|
|
|
|
|
-- Connect back to master
|
|
\c - - - :master_port
|
|
|
|
|
|
-- Test array of user-defined type with hash distribution
|
|
CREATE TABLE packed_numbers_hash (
|
|
id integer,
|
|
packed_numbers number_pack[]
|
|
);
|
|
|
|
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
|
|
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
|
|
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite';
|
|
COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite';
|
|
|
|
-- Verify data is actually copied
|
|
SELECT * FROM packed_numbers_hash;
|
|
|
|
-- Test composite type containing an element with different Oid with hash distribution
|
|
|
|
CREATE TABLE super_packed_numbers_hash (
|
|
id integer,
|
|
super_packed_number super_number_pack
|
|
);
|
|
|
|
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
|
|
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
|
|
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite';
|
|
COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite';
|
|
|
|
-- Verify data is actually copied
|
|
SELECT * FROM super_packed_numbers_hash;
|
|
|
|
-- Test array of user-defined type with append distribution
|
|
CREATE TABLE packed_numbers_append (
|
|
id integer,
|
|
packed_numbers number_pack[]
|
|
);
|
|
|
|
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
|
|
COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite';
|
|
|
|
-- Verify data is actually copied
|
|
SELECT * FROM packed_numbers_append;
|
|
|
|
-- Test composite type containing an element with different Oid with append distribution
|
|
|
|
CREATE TABLE super_packed_numbers_append (
|
|
id integer,
|
|
super_packed_number super_number_pack
|
|
);
|
|
|
|
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
|
|
COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite';
|
|
|
|
-- Verify data is actually copied
|
|
SELECT * FROM super_packed_numbers_append;
|