Merge branch 'master' into naisila/fix-partitioned-index

pull/5397/head
Önder Kalacı 2021-11-08 10:53:16 +01:00 committed by GitHub
commit d5b371b2e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 131 additions and 747 deletions

View File

@ -69,7 +69,6 @@
/* Shard related configuration */
int ShardCount = 32;
int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
int NextShardId = 0;
int NextPlacementId = 0;

View File

@ -229,118 +229,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
/*
* master_append_table_to_shard appends the given table's contents to the given
* shard, and updates shard metadata on the master node. If the function fails
* to append table data to all shard placements, it doesn't update any metadata
* and errors out. Else if the function fails to append table data to some of
* the shard placements, it marks those placements as invalid. These invalid
* placements will get cleaned up during shard rebalancing.
* master_append_table_to_shard is a deprecated function for appending data
* to a shard in an append-distributed table.
*/
Datum
master_append_table_to_shard(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 shardId = PG_GETARG_INT64(0);
text *sourceTableNameText = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(2);
uint32 sourceNodePort = PG_GETARG_UINT32(3);
char *sourceTableName = text_to_cstring(sourceTableNameText);
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
float4 shardFillLevel = 0.0;
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
/* don't allow the table to be dropped */
LockRelationOid(relationId, AccessShareLock);
bool cstoreTable = CStoreTable(relationId);
char storageType = shardInterval->storageType;
EnsureTablePermissions(relationId, ACL_INSERT);
if (storageType != SHARD_STORAGE_TABLE && !cstoreTable)
{
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
errdetail("The underlying shard is not a regular table")));
}
if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || IsCitusTableType(relationId,
CITUS_TABLE_WITH_NO_DIST_KEY))
{
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
errdetail("We currently don't support appending to shards "
"in hash-partitioned, reference and local tables")));
}
/* ensure that the shard placement metadata does not change during the append */
LockShardDistributionMetadata(shardId, ShareLock);
/* serialize appends to the same shard */
LockShardResource(shardId, ExclusiveLock);
/* get schame name of the target shard */
Oid shardSchemaOid = get_rel_namespace(relationId);
char *shardSchemaName = get_namespace_name(shardSchemaOid);
/* Build shard table name. */
char *shardTableName = get_rel_name(relationId);
AppendShardIdToName(&shardTableName, shardId);
char *shardQualifiedName = quote_qualified_identifier(shardSchemaName,
shardTableName);
List *shardPlacementList = ActiveShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
ereport(ERROR, (errmsg("could not find any shard placements for shardId "
UINT64_FORMAT, shardId),
errhint("Try running master_create_empty_shard() first")));
}
UseCoordinatedTransaction();
Use2PCForCoordinatedTransaction();
/* issue command to append table to each shard placement */
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
int connectionFlags = FOR_DML;
MultiConnection *connection =
GetPlacementConnection(connectionFlags, shardPlacement, NULL);
/*
* This code-path doesn't support optional connections, so we don't expect
* NULL connections.
*/
Assert(connection != NULL);
PGresult *queryResult = NULL;
StringInfo workerAppendQuery = makeStringInfo();
appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardQualifiedName),
quote_literal_cstr(sourceTableName),
quote_literal_cstr(sourceNodeName), sourceNodePort);
RemoteTransactionBeginIfNecessary(connection);
ExecuteCriticalRemoteCommand(connection, workerAppendQuery->data);
PQclear(queryResult);
ForgetResults(connection);
}
/* update shard statistics and get new shard size */
uint64 newShardSize = UpdateShardStatistics(shardId);
/* calculate ratio of current shard size compared to shard max size */
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
shardFillLevel = ((float4) newShardSize / (float4) shardMaxSizeInBytes);
PG_RETURN_FLOAT4(shardFillLevel);
ereport(ERROR, (errmsg("master_append_table_to_shard has been deprecated")));
}

View File

@ -1579,21 +1579,6 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.shard_max_size",
gettext_noop("Sets the maximum size a shard will grow before it gets split."),
gettext_noop("Shards store table and file data. When the source "
"file's size for one shard exceeds this configuration "
"value, the database ensures that either a new shard "
"gets created, or the current one gets split. Note that "
"shards read this configuration value at sharded table "
"creation time, and later reuse the initially read value."),
&ShardMaxSize,
1048576, 256, INT_MAX, /* max allowed size not set to MAX_KILOBYTES on purpose */
PGC_USERSET,
GUC_UNIT_KB | GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.shard_placement_policy",
gettext_noop("Sets the policy to use when choosing nodes for shard placement."),

View File

@ -8,6 +8,7 @@
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);
DROP FUNCTION pg_catalog.master_get_table_metadata(text);
DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, integer);
-- all existing citus local tables are auto converted
-- none of the other tables can have auto-converted as true

View File

@ -26,5 +26,12 @@ COMMENT ON FUNCTION master_get_table_metadata(relation_name text)
IS 'fetch metadata values for the table';
ALTER TABLE pg_catalog.pg_dist_partition DROP COLUMN autoconverted;
CREATE FUNCTION master_append_table_to_shard(bigint, text, text, integer)
RETURNS real
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_append_table_to_shard$$;
COMMENT ON FUNCTION master_append_table_to_shard(bigint, text, text, integer)
IS 'append given table to all shard placements and update metadata';
GRANT ALL ON FUNCTION start_metadata_sync_to_node(text, integer) TO PUBLIC;
GRANT ALL ON FUNCTION stop_metadata_sync_to_node(text, integer,bool) TO PUBLIC;

View File

@ -199,7 +199,6 @@ extern char * GetTableDDLCommand(TableDDLCommand *command);
/* Config variables managed via guc.c */
extern int ShardCount;
extern int ShardReplicationFactor;
extern int ShardMaxSize;
extern int ShardPlacementPolicy;
extern int NextShardId;
extern int NextPlacementId;
@ -268,7 +267,6 @@ extern Datum master_stage_shard_placement_row(PG_FUNCTION_ARGS);
/* Function declarations to help with data staging and deletion */
extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);

View File

@ -5,7 +5,6 @@
/multi_agg_distinct.out
/multi_agg_type_conversion.out
/multi_alter_table_statements.out
/multi_append_table_to_shard.out
/multi_behavioral_analytics_create_table.out
/multi_behavioral_analytics_create_table_superuser.out
/multi_complex_count_distinct.out

View File

@ -699,10 +699,6 @@ SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with
ERROR: relation citus_local_table_4 should be a hash distributed table
SELECT master_create_empty_shard('citus_local_table_4');
ERROR: relation "citus_local_table_4" is a local table
CREATE TABLE postgres_local_table (a int);
SELECT master_append_table_to_shard(shardId, 'postgres_local_table', 'localhost', :master_port)
FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_4'::regclass) as shardid;
ERROR: cannot append to shardId xxxxxx
-- return true
SELECT citus_table_is_visible('citus_local_table_4'::regclass::oid);
citus_table_is_visible

View File

@ -1,42 +0,0 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s2-begin s1-master_append_table_to_shard s2-master_append_table_to_shard s1-commit s2-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-master_append_table_to_shard:
SELECT
master_append_table_to_shard(shardid, 'table_to_be_appended', 'localhost', 57636)
FROM
pg_dist_shard
WHERE
'table_to_append'::regclass::oid = logicalrelid;
master_append_table_to_shard
---------------------------------------------------------------------
0.0426667
(1 row)
step s2-master_append_table_to_shard:
SELECT
master_append_table_to_shard(shardid, 'table_to_be_appended', 'localhost', 57636)
FROM
pg_dist_shard
WHERE
'table_to_append'::regclass::oid = logicalrelid;
<waiting ...>
step s1-commit:
COMMIT;
step s2-master_append_table_to_shard: <... completed>
master_append_table_to_shard
---------------------------------------------------------------------
0.064
(1 row)
step s2-commit:
COMMIT;

View File

@ -892,14 +892,15 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.0-1
ALTER EXTENSION citus UPDATE TO '11.0-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record |
| function fix_all_partition_shard_index_names() SETOF regclass
| function fix_partition_shard_index_names(regclass) void
| function worker_fix_partition_shard_index_names(regclass,text,text) void
(5 rows)
function master_append_table_to_shard(bigint,text,text,integer) real |
function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record |
| function fix_all_partition_shard_index_names() SETOF regclass
| function fix_partition_shard_index_names(regclass) void
| function worker_fix_partition_shard_index_names(regclass,text,text) void
(6 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -1413,10 +1413,6 @@ SELECT master_update_shard_statistics(:a_shard_id);
8192
(1 row)
CREATE TABLE append_reference_tmp_table (id INT);
SELECT master_append_table_to_shard(:a_shard_id, 'append_reference_tmp_table', 'localhost', :master_port);
ERROR: cannot append to shardId xxxxxx
DETAIL: We currently don't support appending to shards in hash-partitioned, reference and local tables
SELECT master_get_table_ddl_events('reference_schema.reference_table_ddl');
master_get_table_ddl_events
---------------------------------------------------------------------
@ -1633,7 +1629,7 @@ SET client_min_messages TO ERROR;
DROP SEQUENCE example_ref_value_seq;
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite,
colocated_table_test, colocated_table_test_2, append_reference_tmp_table;
colocated_table_test, colocated_table_test_2;
DROP TYPE reference_comp_key;
DROP SCHEMA reference_schema CASCADE;
RESET client_min_messages;

View File

@ -4,7 +4,7 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
-- create schema to test schema support
CREATE SCHEMA test_schema_support;
-- test master_append_table_to_shard with schema
-- test COPY with schema
-- create local table to append
CREATE TABLE public.nation_local(
n_nationkey integer not null,
@ -25,19 +25,9 @@ SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_n
(1 row)
SELECT master_create_empty_shard('test_schema_support.nation_append');
master_create_empty_shard
---------------------------------------------------------------------
1190000
(1 row)
SELECT master_create_empty_shard('test_schema_support.nation_append') as simple_shardid \gset
-- append table to shard
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
copy test_schema_support.nation_append FROM STDIN with (append_to_shard :simple_shardid, delimiter '|');
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support.nation_append;
count
@ -57,18 +47,8 @@ SELECT master_create_distributed_table('test_schema_support."nation._''append"',
(1 row)
SELECT master_create_empty_shard('test_schema_support."nation._''append"');
master_create_empty_shard
---------------------------------------------------------------------
1190001
(1 row)
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_schema_support."nation._''append"') as special_shardid \gset
copy test_schema_support."nation._'append" FROM STDIN with (append_to_shard :special_shardid, delimiter '|');
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support."nation._'append";
count
@ -76,14 +56,9 @@ SELECT COUNT(*) FROM test_schema_support."nation._'append";
6
(1 row)
-- test master_append_table_to_shard with schema with search_path is set
-- test COPY with schema with search_path is set
SET search_path TO test_schema_support;
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
copy nation_append FROM STDIN with (append_to_shard :simple_shardid, delimiter '|');
-- verify table actually appended to shard
SELECT COUNT(*) FROM nation_append;
count
@ -92,12 +67,7 @@ SELECT COUNT(*) FROM nation_append;
(1 row)
-- test with search_path is set and shard name contains special characters
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
copy "nation._'append" FROM STDIN with (append_to_shard :special_shardid, delimiter '|');
-- verify table actually appended to shard
SELECT COUNT(*) FROM "nation._'append";
count

View File

@ -179,30 +179,10 @@ SELECT create_distributed_table('test_append_table','id','append');
(1 row)
SELECT master_create_empty_shard('test_append_table');
master_create_empty_shard
---------------------------------------------------------------------
1440010
(1 row)
SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
new_shard_id
---------------------------------------------------------------------
1440011
(1 row)
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_append_table') AS shardid \gset
COPY test_append_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
SELECT master_create_empty_shard('test_append_table') AS shardid \gset
COPY test_append_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table ORDER BY 1 DESC, 2 DESC;
id | col_2

View File

@ -452,15 +452,10 @@ DELETE FROM pg_dist_shard_placement WHERE placementid in (
ORDER BY nodename, nodeport limit 1)
);
-- Upload the test data to the shards
SELECT count(master_append_table_to_shard(shardid, 'shard_rebalancer_test_data',
host(inet_server_addr()), inet_server_port()))
FROM pg_dist_shard
WHERE logicalrelid = 'replication_test_table'::regclass;
count
---------------------------------------------------------------------
4
(1 row)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
-- Verify that there is one node with all placements
SELECT * FROM replication_test_table_placements_per_node;
count
@ -581,15 +576,12 @@ $$;
CALL create_unbalanced_shards('rebalance_test_table');
SET citus.shard_replication_factor TO 2;
-- Upload the test data to the shards
SELECT count(master_append_table_to_shard(shardid, 'shard_rebalancer_test_data',
host(inet_server_addr()), inet_server_port()))
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass;
count
---------------------------------------------------------------------
6
(1 row)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard xxxxx)
-- Verify that there is one node with all placements
SELECT * FROM table_placements_per_node;
nodeport | logicalrelid | count
@ -874,44 +866,14 @@ SELECT master_create_distributed_table('test_schema_support.imbalanced_table', '
(1 row)
SET citus.shard_replication_factor TO 1;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
master_create_empty_shard
---------------------------------------------------------------------
123018
(1 row)
SELECT master_append_table_to_shard(123018, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
SET citus.shard_replication_factor TO 2;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
master_create_empty_shard
---------------------------------------------------------------------
123019
(1 row)
SELECT master_append_table_to_shard(123019, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
SET citus.shard_replication_factor TO 1;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
master_create_empty_shard
---------------------------------------------------------------------
123020
(1 row)
SELECT master_append_table_to_shard(123020, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
master_append_table_to_shard
---------------------------------------------------------------------
0.00533333
(1 row)
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
-- imbalanced_table is now imbalanced
-- Shard counts in each node before rebalance
SELECT * FROM public.table_placements_per_node;

View File

@ -146,7 +146,6 @@ ORDER BY 1;
function master_add_inactive_node(text,integer,integer,noderole,name)
function master_add_node(text,integer,integer,noderole,name)
function master_add_secondary_node(text,integer,text,integer,name)
function master_append_table_to_shard(bigint,text,text,integer)
function master_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
function master_create_empty_shard(text)
function master_disable_node(text,integer)
@ -259,5 +258,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(243 rows)
(242 rows)

View File

@ -34,10 +34,8 @@ SELECT master_create_empty_shard('lineitem_range') AS new_shard_id
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "500MB";
\copy lineitem_range FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_range FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
RESET citus.shard_max_size;
-- Run aggregate(distinct) on partition column for range partitioned table

View File

@ -1,126 +0,0 @@
--
-- MULTI_APPEND_TABLE_TO_SHARD
--
SET citus.next_shard_id TO 230000;
-- Initialize tables to join
CREATE TABLE multi_append_table_to_shard_right_reference
(
right_number INTEGER not null,
right_text TEXT not null
);
SELECT create_reference_table('multi_append_table_to_shard_right_reference');
CREATE TABLE multi_append_table_to_shard_left
(
left_number INTEGER not null,
left_text TEXT not null
);
SELECT create_distributed_table('multi_append_table_to_shard_left', 'left_number', 'append');
SELECT master_create_empty_shard('multi_append_table_to_shard_left') AS shardid1 \gset
SELECT master_create_empty_shard('multi_append_table_to_shard_left') AS shardid2 \gset
CREATE TABLE multi_append_table_to_shard_right_reference_hash
(
right_number INTEGER not null,
right_text TEXT not null
);
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('multi_append_table_to_shard_right_reference_hash', 'right_number', 'hash');
-- Replicate 'left' table on both workers
SELECT set_config('citus.shard_replication_factor', '2', false);
copy multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data' with (append_to_shard :shardid1);
copy multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data' with (append_to_shard :shardid2);
-- Place 'right' table on both workers
\copy multi_append_table_to_shard_right_reference FROM '@abs_srcdir@/data/agg.data'
-- Reset shard replication factor to ensure tasks will be assigned to both workers
SELECT set_config('citus.shard_replication_factor', '2', false);
-- All 8 rows in left table match a row in right table
SELECT COUNT(*)
FROM multi_append_table_to_shard_left,
multi_append_table_to_shard_right_reference
WHERE left_number = right_number;
-- Now append more data to the 'right' table
CREATE TABLE multi_append_table_to_shard_stage
(
number INTEGER not null,
text TEXT not null
);
\COPY multi_append_table_to_shard_stage FROM '@abs_srcdir@/data/large_records.data' with delimiter '|';
-- Check that we error out if we try to append data to a hash partitioned table.
SELECT master_create_empty_shard('multi_append_table_to_shard_right_reference_hash');
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_right_reference_hash'::regclass::oid = logicalrelid;
-- Clean up after test
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_right_reference;
DROP TABLE multi_append_table_to_shard_left;
-- Check partitioning by date
CREATE TABLE multi_append_table_to_shard_date
(
event_date DATE,
value INT
);
SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append');
-- Create an empty shard and check that we can query the table
SELECT master_create_empty_shard('multi_append_table_to_shard_date');
SELECT * FROM multi_append_table_to_shard_date;
-- Create an empty distributed table and check that we can query it
CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
-- INSERT NULL values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
-- INSERT regular values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
SELECT * FROM multi_append_table_to_shard_date;
-- When run inside aborted transaction does not persist changes
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-02-02', 4);
BEGIN;
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
ROLLBACK;
SELECT * FROM multi_append_table_to_shard_date;
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_date;

View File

@ -290,8 +290,6 @@ SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'

View File

@ -1,12 +1,6 @@
--
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement
-- policy is left to the default value (round-robin) to test the common install case.
SET citus.next_shard_id TO 290000;
\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'

View File

@ -40,7 +40,7 @@ test: isolation_create_citus_local_table
# writes, run this test serially.
test: isolation_create_restore_point
test: isolation_create_distributed_table isolation_master_append_table
test: isolation_create_distributed_table
test: isolation_multi_shard_modify_vs_all
test: isolation_modify_with_subquery_vs_dml
test: isolation_hash_copy_vs_all

View File

@ -147,12 +147,6 @@ test: multi_utility_warnings data_types
# ----------
test: sequential_modifications
# ---------
# multi_append_table_to_shard loads data to create shards in a way that forces
# shard caching.
# ---------
test: multi_append_table_to_shard
# ---------
# multi_outer_join loads data to create shards to test outer join mappings
# ---------

View File

@ -34,10 +34,8 @@ SELECT master_create_empty_shard('lineitem_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "500MB";
\copy lineitem_range FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_range FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
RESET citus.shard_max_size;
-- Run aggregate(distinct) on partition column for range partitioned table
SELECT count(distinct l_orderkey) FROM lineitem_range;
count

View File

@ -1,190 +0,0 @@
--
-- MULTI_APPEND_TABLE_TO_SHARD
--
SET citus.next_shard_id TO 230000;
-- Initialize tables to join
CREATE TABLE multi_append_table_to_shard_right_reference
(
right_number INTEGER not null,
right_text TEXT not null
);
SELECT create_reference_table('multi_append_table_to_shard_right_reference');
create_reference_table
------------------------
(1 row)
CREATE TABLE multi_append_table_to_shard_left
(
left_number INTEGER not null,
left_text TEXT not null
);
SELECT create_distributed_table('multi_append_table_to_shard_left', 'left_number', 'append');
create_distributed_table
--------------------------
(1 row)
SELECT master_create_empty_shard('multi_append_table_to_shard_left') AS shardid1 \gset
SELECT master_create_empty_shard('multi_append_table_to_shard_left') AS shardid2 \gset
CREATE TABLE multi_append_table_to_shard_right_reference_hash
(
right_number INTEGER not null,
right_text TEXT not null
);
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('multi_append_table_to_shard_right_reference_hash', 'right_number', 'hash');
create_distributed_table
--------------------------
(1 row)
-- Replicate 'left' table on both workers
SELECT set_config('citus.shard_replication_factor', '2', false);
set_config
------------
2
(1 row)
copy multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data' with (append_to_shard :shardid1);
copy multi_append_table_to_shard_left FROM '@abs_srcdir@/data/agg.data' with (append_to_shard :shardid2);
-- Place 'right' table on both workers
\copy multi_append_table_to_shard_right_reference FROM '@abs_srcdir@/data/agg.data'
-- Reset shard replication factor to ensure tasks will be assigned to both workers
SELECT set_config('citus.shard_replication_factor', '2', false);
set_config
------------
2
(1 row)
-- All 8 rows in left table match a row in right table
SELECT COUNT(*)
FROM multi_append_table_to_shard_left,
multi_append_table_to_shard_right_reference
WHERE left_number = right_number;
count
-------
8
(1 row)
-- Now append more data to the 'right' table
CREATE TABLE multi_append_table_to_shard_stage
(
number INTEGER not null,
text TEXT not null
);
\COPY multi_append_table_to_shard_stage FROM '@abs_srcdir@/data/large_records.data' with delimiter '|';
-- Check that we error out if we try to append data to a hash partitioned table.
SELECT master_create_empty_shard('multi_append_table_to_shard_right_reference_hash');
ERROR: relation "multi_append_table_to_shard_right_reference_hash" is a hash partitioned table
DETAIL: We currently don't support creating shards on hash-partitioned tables
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_right_reference_hash'::regclass::oid = logicalrelid;
ERROR: cannot append to shardId 230003
DETAIL: We currently don't support appending to shards in hash-partitioned, reference and local tables
-- Clean up after test
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_right_reference;
DROP TABLE multi_append_table_to_shard_left;
-- Check partitioning by date
CREATE TABLE multi_append_table_to_shard_date
(
event_date DATE,
value INT
);
SELECT master_create_distributed_table('multi_append_table_to_shard_date', 'event_date', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Create an empty shard and check that we can query the table
SELECT master_create_empty_shard('multi_append_table_to_shard_date');
master_create_empty_shard
---------------------------
230004
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
(0 rows)
-- Create an empty distributed table and check that we can query it
CREATE TABLE multi_append_table_to_shard_stage (LIKE multi_append_table_to_shard_date);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
(0 rows)
-- INSERT NULL values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES (NULL, NULL);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0.00533333
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
|
(1 row)
-- INSERT regular values and check that we can query the table
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-01-01', 3);
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0.00533333
(1 row)
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
|
|
01-01-2016 | 3
(3 rows)
-- When run inside aborted transaction does not persist changes
INSERT INTO multi_append_table_to_shard_stage VALUES ('2016-02-02', 4);
BEGIN;
SELECT master_append_table_to_shard(shardid, 'multi_append_table_to_shard_stage', 'localhost', 57636)
FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_date'::regclass::oid = logicalrelid;
master_append_table_to_shard
------------------------------
0.00533333
(1 row)
ROLLBACK;
SELECT * FROM multi_append_table_to_shard_date;
event_date | value
------------+-------
|
|
01-01-2016 | 3
(3 rows)
DROP TABLE multi_append_table_to_shard_stage;
DROP TABLE multi_append_table_to_shard_date;

View File

@ -330,7 +330,6 @@ SELECT master_create_empty_shard('orders_subquery') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947
WHERE shardid = :new_shard_id;
SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'

View File

@ -1,11 +1,6 @@
--
-- MULTI_LOAD_DATA
--
-- Tests for loading data in a distributed cluster. Please note that the number
-- of shards uploaded depends on two config values: citus.shard_replication_factor and
-- citus.shard_max_size. These values are set in pg_regress_multi.pl. Shard placement
-- policy is left to the default value (round-robin) to test the common install case.
SET citus.next_shard_id TO 290000;
\copy lineitem FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'

View File

@ -457,7 +457,6 @@ if ($majorversion >= "14") {
# Citus options set for the tests
push(@pgOptions, "citus.shard_count=4");
push(@pgOptions, "citus.max_adaptive_executor_pool_size=4");
push(@pgOptions, "citus.shard_max_size=1500kB");
push(@pgOptions, "citus.defer_shard_delete_interval=-1");
push(@pgOptions, "citus.repartition_join_bucket_count_per_node=2");
push(@pgOptions, "citus.sort_returning='on'");

View File

@ -1,67 +0,0 @@
setup
{
SET citus.next_shard_id TO 4080102;
CREATE TABLE table_to_append(id int);
CREATE TABLE table_to_be_appended(id int);
SELECT create_distributed_table('table_to_append', 'id', 'append');
SELECT master_create_empty_shard('table_to_append');
INSERT INTO table_to_be_appended SELECT generate_series(1,1000);
COPY table_to_append FROM PROGRAM 'echo 0 && echo 7 && echo 8 && echo 9 && echo 10000' WITH (append_to_shard 4080102);
}
teardown
{
DROP TABLE table_to_append CASCADE;
DROP TABLE table_to_be_appended CASCADE;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-master_append_table_to_shard"
{
SELECT
master_append_table_to_shard(shardid, 'table_to_be_appended', 'localhost', 57636)
FROM
pg_dist_shard
WHERE
'table_to_append'::regclass::oid = logicalrelid;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-master_append_table_to_shard"
{
SELECT
master_append_table_to_shard(shardid, 'table_to_be_appended', 'localhost', 57636)
FROM
pg_dist_shard
WHERE
'table_to_append'::regclass::oid = logicalrelid;
}
step "s2-commit"
{
COMMIT;
}
// concurrent master_append_table_to_shard tests
permutation "s1-begin" "s2-begin" "s1-master_append_table_to_shard" "s2-master_append_table_to_shard" "s1-commit" "s2-commit"

View File

@ -5,7 +5,6 @@
/multi_agg_distinct.sql
/multi_agg_type_conversion.sql
/multi_alter_table_statements.sql
/multi_append_table_to_shard.sql
/multi_behavioral_analytics_create_table.sql
/multi_behavioral_analytics_create_table_superuser.sql
/multi_complex_count_distinct.sql

View File

@ -458,10 +458,6 @@ SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with
SELECT master_create_empty_shard('citus_local_table_4');
CREATE TABLE postgres_local_table (a int);
SELECT master_append_table_to_shard(shardId, 'postgres_local_table', 'localhost', :master_port)
FROM (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='citus_local_table_4'::regclass) as shardid;
-- return true
SELECT citus_table_is_visible('citus_local_table_4'::regclass::oid);

View File

@ -902,9 +902,6 @@ SELECT master_create_empty_shard('reference_schema.reference_table_ddl');
SELECT shardid AS a_shard_id FROM pg_dist_shard WHERE logicalrelid = 'reference_schema.reference_table_ddl'::regclass \gset
SELECT master_update_shard_statistics(:a_shard_id);
CREATE TABLE append_reference_tmp_table (id INT);
SELECT master_append_table_to_shard(:a_shard_id, 'append_reference_tmp_table', 'localhost', :master_port);
SELECT master_get_table_ddl_events('reference_schema.reference_table_ddl');
-- in reality, we wouldn't need to repair any reference table shard placements
@ -1014,7 +1011,7 @@ SET client_min_messages TO ERROR;
DROP SEQUENCE example_ref_value_seq;
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite,
colocated_table_test, colocated_table_test_2, append_reference_tmp_table;
colocated_table_test, colocated_table_test_2;
DROP TYPE reference_comp_key;
DROP SCHEMA reference_schema CASCADE;
RESET client_min_messages;

View File

@ -8,7 +8,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
CREATE SCHEMA test_schema_support;
-- test master_append_table_to_shard with schema
-- test COPY with schema
-- create local table to append
CREATE TABLE public.nation_local(
n_nationkey integer not null,
@ -33,10 +33,17 @@ CREATE TABLE test_schema_support.nation_append(
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_nationkey', 'append');
SELECT master_create_empty_shard('test_schema_support.nation_append');
SELECT master_create_empty_shard('test_schema_support.nation_append') as simple_shardid \gset
-- append table to shard
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
copy test_schema_support.nation_append FROM STDIN with (append_to_shard :simple_shardid, delimiter '|');
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support.nation_append;
@ -49,23 +56,45 @@ CREATE TABLE test_schema_support."nation._'append" (
n_comment varchar(152));
SELECT master_create_distributed_table('test_schema_support."nation._''append"', 'n_nationkey', 'append');
SELECT master_create_empty_shard('test_schema_support."nation._''append"');
SELECT master_create_empty_shard('test_schema_support."nation._''append"') as special_shardid \gset
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
copy test_schema_support."nation._'append" FROM STDIN with (append_to_shard :special_shardid, delimiter '|');
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support."nation._'append";
-- test master_append_table_to_shard with schema with search_path is set
-- test COPY with schema with search_path is set
SET search_path TO test_schema_support;
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
copy nation_append FROM STDIN with (append_to_shard :simple_shardid, delimiter '|');
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- verify table actually appended to shard
SELECT COUNT(*) FROM nation_append;
-- test with search_path is set and shard name contains special characters
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
copy "nation._'append" FROM STDIN with (append_to_shard :special_shardid, delimiter '|');
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- verify table actually appended to shard
SELECT COUNT(*) FROM "nation._'append";

View File

@ -141,10 +141,18 @@ INSERT INTO append_stage_table_2 VALUES(10,4);
CREATE TABLE test_append_table(id int, col_2 int);
SELECT create_distributed_table('test_append_table','id','append');
SELECT master_create_empty_shard('test_append_table');
SELECT * FROM master_append_table_to_shard(1440010, 'append_stage_table', 'localhost', :master_port);
SELECT master_create_empty_shard('test_append_table') AS new_shard_id;
SELECT * FROM master_append_table_to_shard(1440011, 'append_stage_table_2', 'localhost', :master_port);
SELECT master_create_empty_shard('test_append_table') AS shardid \gset
COPY test_append_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
1,3
3,2
5,4
\.
SELECT master_create_empty_shard('test_append_table') AS shardid \gset
COPY test_append_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
8,3
9,2
10,4
\.
UPDATE test_append_table SET col_2 = 5;
SELECT * FROM test_append_table ORDER BY 1 DESC, 2 DESC;

View File

@ -331,10 +331,10 @@ DELETE FROM pg_dist_shard_placement WHERE placementid in (
-- Upload the test data to the shards
SELECT count(master_append_table_to_shard(shardid, 'shard_rebalancer_test_data',
host(inet_server_addr()), inet_server_port()))
FROM pg_dist_shard
WHERE logicalrelid = 'replication_test_table'::regclass;
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123000)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123001)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123002)
\COPY replication_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123003)
-- Verify that there is one node with all placements
@ -422,10 +422,12 @@ SET citus.shard_replication_factor TO 2;
-- Upload the test data to the shards
SELECT count(master_append_table_to_shard(shardid, 'shard_rebalancer_test_data',
host(inet_server_addr()), inet_server_port()))
FROM pg_dist_shard
WHERE logicalrelid = 'rebalance_test_table'::regclass;
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123004)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123005)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123006)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123007)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123008)
\COPY rebalance_test_table FROM PROGRAM 'echo 1' WITH (format 'csv', append_to_shard 123009)
-- Verify that there is one node with all placements
@ -598,16 +600,31 @@ CREATE TABLE test_schema_support.imbalanced_table (
SELECT master_create_distributed_table('test_schema_support.imbalanced_table', 'id', 'append');
SET citus.shard_replication_factor TO 1;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
SELECT master_append_table_to_shard(123018, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
1
2
3
4
\.
SET citus.shard_replication_factor TO 2;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
SELECT master_append_table_to_shard(123019, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
1
2
3
4
\.
SET citus.shard_replication_factor TO 1;
SELECT * from master_create_empty_shard('test_schema_support.imbalanced_table');
SELECT master_append_table_to_shard(123020, 'test_schema_support.imbalanced_table_local', 'localhost', :master_port);
SELECT master_create_empty_shard('test_schema_support.imbalanced_table') AS shardid \gset
COPY test_schema_support.imbalanced_table FROM STDIN WITH (format 'csv', append_to_shard :shardid);
1
2
3
4
\.
-- imbalanced_table is now imbalanced