From a4bf6a1dbb5782ec8979f62f6c07e3e37a0119b5 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Tue, 19 Jul 2022 21:02:35 +0530 Subject: [PATCH] Change version of worker_split_shard_replication_setup UDF --- .../distributed/operations/shard_split.c | 78 +++--- .../shardsplit_logical_replication.c | 2 +- .../distributed/sql/citus--11.0-3--11.1-1.sql | 2 +- .../{11.0-2.sql => 11.1-1.sql} | 0 src/test/regress/expected/citus_sameer.out | 241 ++++-------------- src/test/regress/split_schedule | 18 +- src/test/regress/sql/citus_sameer.sql | 6 +- 7 files changed, 103 insertions(+), 244 deletions(-) rename src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/{11.0-2.sql => 11.1-1.sql} (100%) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 31794c09e..3ab7e61f2 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -100,8 +100,7 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, char *snapShotName); static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, - List *workersForPlacementList, - char *snapShotName); + List *workersForPlacementList); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, @@ -446,6 +445,7 @@ SplitShard(SplitMode splitMode, } else { + /*TODO(saawasek): Discussing about existing bug with the assumption of move shard*/ NonBlockingShardSplit( splitOperation, shardIntervalToSplit, @@ -764,14 +764,37 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, { StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, - destinationWorkerNodesList, - snapShotName); + destinationWorkerNodesList); - Task *splitCopyTask = CreateBasicTask( - sourceShardIntervalToCopy->shardId, /* jobId */ - taskId, - READ_TASK, - splitCopyUdfCommand->data); + List *ddlCommandList = NIL; + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + ddlCommandList = lappend(ddlCommandList, beginTransaction->data); + + /* Set snapshot */ + if (snapShotName != NULL) + { + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr( + snapShotName)); + ddlCommandList = lappend(ddlCommandList, snapShotString->data); + printf("Sameer final string snapshotted:%s\n", snapShotString->data); + } + + ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); + + StringInfo commitCommand = makeStringInfo(); + appendStringInfo(commitCommand, "COMMIT;"); + ddlCommandList = lappend(ddlCommandList, commitCommand->data); + + Task *splitCopyTask = CitusMakeNode(Task); + splitCopyTask->jobId = sourceShardIntervalToCopy->shardId; + splitCopyTask->taskId = taskId; + splitCopyTask->taskType = READ_TASK; + splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(splitCopyTask, ddlCommandList); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); SetPlacementNodeMetadata(taskPlacement, sourceShardNode); @@ -813,8 +836,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, - List *destinationWorkerNodesList, - char *snapShotName) + List *destinationWorkerNodesList) { StringInfo splitCopyInfoArray = makeStringInfo(); appendStringInfo(splitCopyInfoArray, "ARRAY["); @@ -848,31 +870,7 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, sourceShardSplitInterval->shardId, splitCopyInfoArray->data); - if (snapShotName == NULL) - { - return splitCopyUdf; - } - - - StringInfo beginTransaction = makeStringInfo(); - appendStringInfo(beginTransaction, - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); - - StringInfo commitTransaction = makeStringInfo(); - appendStringInfo(commitTransaction, "COMMIT;"); - - StringInfo snapShotString = makeStringInfo(); - appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", quote_literal_cstr( - snapShotName)); - - StringInfo snapShottedCopyUDF = makeStringInfo(); - appendStringInfo(snapShottedCopyUDF, "%s%s%s%s", beginTransaction->data, - snapShotString->data, splitCopyUdf->data, commitTransaction->data); - - printf("sameer value:%s\n", snapShottedCopyUDF->data); - printf("sameer actual value :%s \n", splitCopyUdf->data); - - return snapShottedCopyUDF; + return splitCopyUdf; } @@ -1237,7 +1235,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, superUser, databaseName); ClaimConnectionExclusively(sourceConnection); - + HTAB *mapOfShardToPlacementCreatedByWorkflow = CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); @@ -1263,8 +1261,13 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); /*Create Template Replication Slot */ + char *snapShotName = NULL; + snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(shardIntervalToSplit, sourceShardToCopyNode); /* DoSplitCopy */ + DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, workersForPlacementList, + snapShotName); /*worker_split_replication_setup_udf*/ List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( @@ -1330,7 +1333,6 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); - DropDummyShards(); } PG_CATCH(); diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 85ee3d813..0504ad860 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -416,7 +416,7 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo /*'snapshot_name' is second column where index starts from zero. * We're using the pstrdup to copy the data into the current memory context */ char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); - printf("Sameer sanpshot name %s \n", snapShotName); + return snapShotName; } diff --git a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql index d5c48367d..7d7c80b5a 100644 --- a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql @@ -67,4 +67,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/get_all_active_transactions/11.1-1.sql" #include "udfs/citus_split_shard_by_split_points/11.1-1.sql" #include "udfs/worker_split_copy/11.1-1.sql" -#include "udfs/worker_split_shard_replication_setup/11.0-2.sql" +#include "udfs/worker_split_shard_replication_setup/11.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql similarity index 100% rename from src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql rename to src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index 2662917b9..fabf31965 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -37,8 +37,8 @@ SELECT * FROM pg_dist_shard; table_to_split | 1 | t | -2147483648 | 2147483647 (1 row) -SET client_min_messages TO LOG; -SET citus.log_remote_commands TO on; +--SET client_min_messages TO LOG; +--SET citus.log_remote_commands TO on; CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", c.relname as "Name", pg_catalog.pg_get_userbyid(c.relowner) as "Owner" @@ -50,49 +50,9 @@ WHERE c.relkind IN ('r','p','') AND n.nspname <> 'information_schema' AND pg_catalog.pg_table_is_visible(c.oid) ORDER BY 1,2; -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema", - c.relname AS "Name", - pg_get_userbyid(c.relowner) AS "Owner" - FROM (pg_class c - LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) - WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid)) - ORDER BY n.nspname, c.relname; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema", - c.relname AS "Name", - pg_get_userbyid(c.relowner) AS "Owner" - FROM (pg_class c - LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) - WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid)) - ORDER BY n.nspname, c.relname; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -- UDF fails for range partitioned tables. \c - - - :master_port -SET citus.log_remote_commands TO on; +--SET citus.log_remote_commands TO on; SET citus.next_shard_id TO 100; SET search_path TO citus_split_shard_by_split_points_negative; SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset @@ -100,138 +60,12 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['0'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101 -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 18)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE USER citus_shard_split_subscription_role_10 SUPERUSER IN ROLE postgres -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_split_subscription_role_10 -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER ROLE citus_shard_split_subscription_role_10 NOSUPERUSER -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1]) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1]) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +WARNING: replication slot "citus_split_replicationslot_for_shard_1" does not exist +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection claimed exclusively at transaction commit +WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit citus_split_shard_by_split_points @@ -239,46 +73,69 @@ WARNING: connection claimed exclusively at transaction commit (1 row) --- On worker2, we want child shard xxxxx and dummy shard xxxxx -- --- on worker1, we want child shard xxxxx and 1 and dummy shard xxxxx -- +INSERT INTO table_to_split values(100,'a'); +INSERT INTO table_to_split values(400, 'a'); +INSERT INTO table_to_split values(500, 'a'); \c - - - :worker_2_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; Schema | Name | Owner --------------------------------------------------------------------- citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_100 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres -(3 rows) +(2 rows) SELECT * FROM pg_subscription; - oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications + oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications --------------------------------------------------------------------- - 17324 | 16384 | citus_shard_split_subscription_10 | 17323 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_shard_split_subscription_10 | off | {citus_shard_split_publication_18_10} + 17311 | 16384 | citus_shard_split_subscription_10 | 17310 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10} (1 row) +SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM table_to_split_101; + id | value +--------------------------------------------------------------------- + 100 | a + 500 | a +(2 rows) + \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; - Schema | Name | Owner + Schema | Name | Owner --------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_to_split | postgres -(1 row) + citus_split_shard_by_split_points_negative | table_to_split | postgres + citus_split_shard_by_split_points_negative | table_to_split_100 | postgres +(2 rows) SELECT * FROM pg_publication; oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot --------------------------------------------------------------------- - 17381 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f -(1 row) + 17371 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f + 17374 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f +(2 rows) SELECT * FROM pg_subscription; - oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications + oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications --------------------------------------------------------------------- -(0 rows) - -SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_subscription_10 + 17378 | 16384 | citus_shard_split_subscription_10 | 17377 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10} +(1 row) + +SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_split_replicationslot_for_shard_1 + citus_split_16_10 + citus_split_18_10 +(3 rows) + +SELECT * FROM table_to_split_100; + id | value +--------------------------------------------------------------------- + 400 | a (1 row) diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 4cc29e635..4e603af98 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -8,13 +8,13 @@ test: tablespace #test: foreign_key_to_reference_table # Split tests go here. #test: citus_sameer -#test: split_shard_replication_setup -#test: split_shard_replication_setup_remote_local -#test: split_shard_replication_setup_local -#test: split_shard_replication_colocated_setup +test: split_shard_replication_setup +test: split_shard_replication_setup_remote_local +test: split_shard_replication_setup_local +test: split_shard_replication_colocated_setup test: worker_split_copy_test -#test: worker_split_binary_copy_test -#test: worker_split_text_copy_test -#test: citus_split_shard_by_split_points_negative -#test: citus_split_shard_by_split_points -#test: citus_split_shard_by_split_points_failure \ No newline at end of file +test: worker_split_binary_copy_test +test: worker_split_text_copy_test +test: citus_split_shard_by_split_points_negative +test: citus_split_shard_by_split_points +test: citus_split_shard_by_split_points_failure diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index b0184d824..acc59a4c8 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -21,8 +21,8 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT * FROM citus_shards; SELECT * FROM pg_dist_shard; -SET client_min_messages TO LOG; -SET citus.log_remote_commands TO on; +--SET client_min_messages TO LOG; +--SET citus.log_remote_commands TO on; CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", c.relname as "Name", @@ -38,7 +38,7 @@ ORDER BY 1,2; -- UDF fails for range partitioned tables. \c - - - :master_port -SET citus.log_remote_commands TO on; +--SET citus.log_remote_commands TO on; SET citus.next_shard_id TO 100; SET search_path TO citus_split_shard_by_split_points_negative;