Change version of worker_split_shard_replication_setup UDF

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-19 21:02:35 +05:30
parent 1243fe20ae
commit a4bf6a1dbb
7 changed files with 103 additions and 244 deletions

View File

@ -100,8 +100,7 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
char *snapShotName);
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *workersForPlacementList,
char *snapShotName);
List *workersForPlacementList);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
@ -446,6 +445,7 @@ SplitShard(SplitMode splitMode,
}
else
{
/*TODO(saawasek): Discussing about existing bug with the assumption of move shard*/
NonBlockingShardSplit(
splitOperation,
shardIntervalToSplit,
@ -764,14 +764,37 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
{
StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy,
splitShardIntervalList,
destinationWorkerNodesList,
snapShotName);
destinationWorkerNodesList);
Task *splitCopyTask = CreateBasicTask(
sourceShardIntervalToCopy->shardId, /* jobId */
taskId,
READ_TASK,
splitCopyUdfCommand->data);
List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
/* Set snapshot */
if (snapShotName != NULL)
{
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(
snapShotName));
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
printf("Sameer final string snapshotted:%s\n", snapShotString->data);
}
ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data);
StringInfo commitCommand = makeStringInfo();
appendStringInfo(commitCommand, "COMMIT;");
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
Task *splitCopyTask = CitusMakeNode(Task);
splitCopyTask->jobId = sourceShardIntervalToCopy->shardId;
splitCopyTask->taskId = taskId;
splitCopyTask->taskType = READ_TASK;
splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(splitCopyTask, ddlCommandList);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
@ -813,8 +836,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
static StringInfo
CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList,
List *destinationWorkerNodesList,
char *snapShotName)
List *destinationWorkerNodesList)
{
StringInfo splitCopyInfoArray = makeStringInfo();
appendStringInfo(splitCopyInfoArray, "ARRAY[");
@ -848,31 +870,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
sourceShardSplitInterval->shardId,
splitCopyInfoArray->data);
if (snapShotName == NULL)
{
return splitCopyUdf;
}
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
StringInfo commitTransaction = makeStringInfo();
appendStringInfo(commitTransaction, "COMMIT;");
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", quote_literal_cstr(
snapShotName));
StringInfo snapShottedCopyUDF = makeStringInfo();
appendStringInfo(snapShottedCopyUDF, "%s%s%s%s", beginTransaction->data,
snapShotString->data, splitCopyUdf->data, commitTransaction->data);
printf("sameer value:%s\n", snapShottedCopyUDF->data);
printf("sameer actual value :%s \n", splitCopyUdf->data);
return snapShottedCopyUDF;
return splitCopyUdf;
}
@ -1237,7 +1235,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
superUser,
databaseName);
ClaimConnectionExclusively(sourceConnection);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
PG_TRY();
@ -1263,8 +1261,13 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*Create Template Replication Slot */
char *snapShotName = NULL;
snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(shardIntervalToSplit, sourceShardToCopyNode);
/* DoSplitCopy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapShotName);
/*worker_split_replication_setup_udf*/
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
@ -1330,7 +1333,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
DropDummyShards();
}
PG_CATCH();

View File

@ -416,7 +416,7 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo
/*'snapshot_name' is second column where index starts from zero.
* We're using the pstrdup to copy the data into the current memory context */
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
printf("Sameer sanpshot name %s \n", snapShotName);
return snapShotName;
}

View File

@ -67,4 +67,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/get_all_active_transactions/11.1-1.sql"
#include "udfs/citus_split_shard_by_split_points/11.1-1.sql"
#include "udfs/worker_split_copy/11.1-1.sql"
#include "udfs/worker_split_shard_replication_setup/11.0-2.sql"
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"

View File

@ -37,8 +37,8 @@ SELECT * FROM pg_dist_shard;
table_to_split | 1 | t | -2147483648 | 2147483647
(1 row)
SET client_min_messages TO LOG;
SET citus.log_remote_commands TO on;
--SET client_min_messages TO LOG;
--SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
@ -50,49 +50,9 @@ WHERE c.relkind IN ('r','p','')
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema",
c.relname AS "Name",
pg_get_userbyid(c.relowner) AS "Owner"
FROM (pg_class c
LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid))
ORDER BY n.nspname, c.relname;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema",
c.relname AS "Name",
pg_get_userbyid(c.relowner) AS "Owner"
FROM (pg_class c
LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid))
ORDER BY n.nspname, c.relname;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- UDF fails for range partitioned tables.
\c - - - :master_port
SET citus.log_remote_commands TO on;
--SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
@ -100,138 +60,12 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points(
1,
ARRAY['0'],
ARRAY[:worker_2_node, :worker_2_node],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 18)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE USER citus_shard_split_subscription_role_10 SUPERUSER IN ROLE postgres
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false)
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_split_subscription_role_10
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER ROLE citus_shard_split_subscription_role_10 NOSUPERUSER
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_current_wal_lsn()
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1);
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
WARNING: replication slot "citus_split_replicationslot_for_shard_1" does not exist
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
citus_split_shard_by_split_points
@ -239,46 +73,69 @@ WARNING: connection claimed exclusively at transaction commit
(1 row)
-- On worker2, we want child shard xxxxx and dummy shard xxxxx --
-- on worker1, we want child shard xxxxx and 1 and dummy shard xxxxx --
INSERT INTO table_to_split values(100,'a');
INSERT INTO table_to_split values(400, 'a');
INSERT INTO table_to_split values(500, 'a');
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(3 rows)
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
17324 | 16384 | citus_shard_split_subscription_10 | 17323 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_shard_split_subscription_10 | off | {citus_shard_split_publication_18_10}
17311 | 16384 | citus_shard_split_subscription_10 | 17310 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_101;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres
(1 row)
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
(2 rows)
SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
---------------------------------------------------------------------
17381 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
(1 row)
17371 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f
17374 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
(0 rows)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_subscription_10
17378 | 16384 | citus_shard_split_subscription_10 | 17377 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_split_replicationslot_for_shard_1
citus_split_16_10
citus_split_18_10
(3 rows)
SELECT * FROM table_to_split_100;
id | value
---------------------------------------------------------------------
400 | a
(1 row)

View File

@ -8,13 +8,13 @@ test: tablespace
#test: foreign_key_to_reference_table
# Split tests go here.
#test: citus_sameer
#test: split_shard_replication_setup
#test: split_shard_replication_setup_remote_local
#test: split_shard_replication_setup_local
#test: split_shard_replication_colocated_setup
test: split_shard_replication_setup
test: split_shard_replication_setup_remote_local
test: split_shard_replication_setup_local
test: split_shard_replication_colocated_setup
test: worker_split_copy_test
#test: worker_split_binary_copy_test
#test: worker_split_text_copy_test
#test: citus_split_shard_by_split_points_negative
#test: citus_split_shard_by_split_points
#test: citus_split_shard_by_split_points_failure
test: worker_split_binary_copy_test
test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points
test: citus_split_shard_by_split_points_failure

View File

@ -21,8 +21,8 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT * FROM citus_shards;
SELECT * FROM pg_dist_shard;
SET client_min_messages TO LOG;
SET citus.log_remote_commands TO on;
--SET client_min_messages TO LOG;
--SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
@ -38,7 +38,7 @@ ORDER BY 1,2;
-- UDF fails for range partitioned tables.
\c - - - :master_port
SET citus.log_remote_commands TO on;
--SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;