From c56b79b6f7b9f5c05a2cbf5238b2fdcd03a422b5 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Mon, 1 Aug 2022 12:54:30 +0530 Subject: [PATCH] Address review comments --- .../connection/connection_management.c | 4 +- .../distributed/metadata/metadata_utility.c | 21 +++ .../distributed/operations/shard_split.c | 138 +++++++++--------- .../replication/multi_logical_replication.c | 14 +- .../shardsplit_logical_replication.c | 15 +- src/backend/distributed/shared_library_init.c | 3 +- .../sql/downgrades/citus--11.1-1--11.0-3.sql | 6 +- .../11.1-1.sql | 14 +- .../latest.sql | 15 +- .../distributed/utils/citus_safe_lib.c | 94 ++++++------ .../distributed/connection_management.h | 2 +- src/include/distributed/metadata_utility.h | 2 + .../distributed/multi_logical_replication.h | 4 +- ...enterprise_isolation_logicalrep_1_schedule | 2 +- ...plit_shard_replication_colocated_setup.out | 10 +- .../split_shard_replication_setup.out | 6 +- .../split_shard_replication_setup_local.out | 6 +- ...t_shard_replication_setup_remote_local.out | 6 +- ...d_split_with_index_as_replicaIdentity.spec | 2 +- src/test/regress/sql/citus_sameer.sql | 2 +- ...plit_shard_replication_colocated_setup.sql | 10 +- .../sql/split_shard_replication_setup.sql | 6 +- .../split_shard_replication_setup_local.sql | 6 +- ...t_shard_replication_setup_remote_local.sql | 6 +- 24 files changed, 217 insertions(+), 177 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 56e84d063..cb7c28eb4 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -675,7 +675,7 @@ CloseConnection(MultiConnection *connection) strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; - key.replicationConnParam = connection->requiresReplicationOption; + key.replicationConnParam = connection->requiresReplication; strlcpy(key.user, connection->user, NAMEDATALEN); strlcpy(key.database, connection->database, NAMEDATALEN); @@ -1262,7 +1262,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN); - connection->requiresReplicationOption = key->replicationConnParam; + connection->requiresReplication = key->replicationConnParam; connection->pgConn = PQconnectStartParams((const char **) entry->keywords, (const char **) entry->values, diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 0b64f5726..ff85f6930 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1437,6 +1437,27 @@ ActiveShardPlacement(uint64 shardId, bool missingOk) } +/* + * ActiveShardPlacementWorkerNode returns the worker node of the first placement of + * a shard. + */ +WorkerNode * +ActiveShardPlacementWorkerNode(uint64 shardId) +{ + bool missingOK = false; + + List *sourcePlacementList = ActiveShardPlacementList(shardId); + + Assert(sourcePlacementList->length == 1); + + ShardPlacement *sourceShardPlacement = linitial(sourcePlacementList); + WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, + missingOK); + + return sourceShardToCopyNode; +} + + /* * BuildShardPlacementList finds shard placements for the given shardId from * system catalogs, converts these placements to their in-memory diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 8992e81d2..a606044d0 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -88,7 +88,6 @@ static void NonBlockingShardSplit(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *workersForPlacementList); - static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -97,6 +96,8 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, List *workersForPlacementList); +static Task * CreateSplitCopyTask(StringInfo splitCopyUdfCommand, char *snapshotName, int + taskId, uint64 jobId); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, @@ -106,7 +107,6 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); - static StringInfo CreateSplitShardReplicationSetupUDF( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); @@ -122,7 +122,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static void DropDummyShards(void); -static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval); +static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); /* Customize error message strings based on operation type */ @@ -137,7 +137,11 @@ static const char *const SplitTargetName[] = [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", }; -/* Map containing list of dummy shards created on target nodes */ +/* + * Map containing list of dummy shards created on target nodes. + * Key - + * Value - ShardInterval + */ static HTAB *DummyShardInfoHashMap = NULL; /* Function definitions */ @@ -724,8 +728,10 @@ CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkfl shardGroupSplitIntervalListList, workersForPlacementList); + /* For Blocking split, copy isn't snapshotted */ + char *snapshotName = NULL; DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, workersForPlacementList, NULL); + shardGroupSplitIntervalListList, workersForPlacementList, snapshotName); /* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, @@ -766,42 +772,13 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, splitShardIntervalList, destinationWorkerNodesList); - /* - * TODO(saawasek):1)Potentially refactor query list into a different method. - * 2) Assign Distributed Txn(confirm)? - */ - List *ddlCommandList = NIL; - StringInfo beginTransaction = makeStringInfo(); - appendStringInfo(beginTransaction, - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); - ddlCommandList = lappend(ddlCommandList, beginTransaction->data); - - /* Set snapshot for non-blocking shard split. */ - if (snapShotName != NULL) - { - StringInfo snapShotString = makeStringInfo(); - appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", - quote_literal_cstr( - snapShotName)); - ddlCommandList = lappend(ddlCommandList, snapShotString->data); - } - - ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); - - StringInfo commitCommand = makeStringInfo(); - appendStringInfo(commitCommand, "COMMIT;"); - ddlCommandList = lappend(ddlCommandList, commitCommand->data); - - Task *splitCopyTask = CitusMakeNode(Task); - splitCopyTask->jobId = sourceShardIntervalToCopy->shardId; - splitCopyTask->taskId = taskId; - splitCopyTask->taskType = READ_TASK; - splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID; - SetTaskQueryStringList(splitCopyTask, ddlCommandList); + /* Create copy task. Snapshot name is required for nonblocking splits */ + Task *splitCopyTask = CreateSplitCopyTask(splitCopyUdfCommand, snapShotName, + taskId, + sourceShardIntervalToCopy->shardId); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); SetPlacementNodeMetadata(taskPlacement, sourceShardNode); - splitCopyTask->taskPlacementList = list_make1(taskPlacement); splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask); @@ -878,6 +855,43 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, } +static Task * +CreateSplitCopyTask(StringInfo splitCopyUdfCommand, char *snapshotName, int taskId, uint64 + jobId) +{ + List *ddlCommandList = NIL; + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + ddlCommandList = lappend(ddlCommandList, beginTransaction->data); + + /* Set snapshot for non-blocking shard split. */ + if (snapshotName != NULL) + { + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr( + snapshotName)); + ddlCommandList = lappend(ddlCommandList, snapShotString->data); + } + + ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); + + StringInfo commitCommand = makeStringInfo(); + appendStringInfo(commitCommand, "COMMIT;"); + ddlCommandList = lappend(ddlCommandList, commitCommand->data); + + Task *splitCopyTask = CitusMakeNode(Task); + splitCopyTask->jobId = jobId; + splitCopyTask->taskId = taskId; + splitCopyTask->taskType = READ_TASK; + splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(splitCopyTask, ddlCommandList); + + return splitCopyTask; +} + + /* * Create an object on a worker node. */ @@ -1231,10 +1245,10 @@ TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) /* * SplitShard API to split a given shard (or shard group) in non-blocking fashion * based on specified split points to a set of destination nodes. - * 'splitOperation' : Customer operation that triggered split. - * 'shardIntervalToSplit' : Source shard interval to be split. - * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. - * 'workersForPlacementList' : Placement list corresponding to split children. + * splitOperation : Customer operation that triggered split. + * shardIntervalToSplit : Source shard interval to be split. + * shardSplitPointsList : Split Points list for the source 'shardInterval'. + * workersForPlacementList : Placement list corresponding to split children. */ static void NonBlockingShardSplit(SplitOperation splitOperation, @@ -1254,13 +1268,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, sourceColocatedShardIntervalList, shardSplitPointsList); - /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ - List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); - Assert(sourcePlacementList->length == 1); - ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial( - sourcePlacementList); - WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, - false /* missingOk */); + WorkerNode *sourceShardToCopyNode = ActiveShardPlacementWorkerNode( + shardIntervalToSplit->shardId); /* Create hashmap to group shards for publication-subscription management */ HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( @@ -1636,7 +1645,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, destinationWorkerNodesList); /* Force a new connection to execute the UDF */ - int connectionFlags = FORCE_NEW_CONNECTION; + int connectionFlags = 0; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode-> workerName, @@ -1660,14 +1669,14 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) < 1 || PQnfields(result) != 3) { + PQclear(result); + ForgetResults(sourceConnection); + CloseConnection(sourceConnection); + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg( "Failed to run worker_split_shard_replication_setup UDF. It should successfully execute " " for splitting a shard in a non-blocking way. Please retry."))); - - PQclear(result); - ForgetResults(sourceConnection); - CloseConnection(sourceConnection); } /* Get replication slot information */ @@ -1740,7 +1749,7 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, splitChildShardInterval->maxValue)); appendStringInfo(splitChildrenRows, - "ROW(%lu, %s, %lu, %s, %s, %u)::citus.split_shard_info", + "ROW(%lu, %s, %lu, %s, %s, %u)::pg_catalog.split_shard_info", sourceShardId, quote_literal_cstr(partitionColumnName), splitChildShardInterval->shardId, @@ -1754,7 +1763,7 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, StringInfo splitShardReplicationUDF = makeStringInfo(); appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s]);", + "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[%s]);", splitChildrenRows->data); return splitShardReplicationUDF; @@ -1845,7 +1854,6 @@ DropDummyShards() int connectionFlags = FOR_DDL; connectionFlags |= OUTSIDE_TRANSACTION; - connectionFlags |= FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeUserDatabaseConnection( connectionFlags, shardToBeDroppedNode->workerName, @@ -1857,7 +1865,7 @@ DropDummyShards() ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, dummyShardIntervalList) { - TryDroppingShard(connection, shardInterval); + DropDummyShard(connection, shardInterval); } CloseConnection(connection); @@ -1866,10 +1874,11 @@ DropDummyShards() /* - * TryDroppingShard drops a given shard on the source node connection. + * DropDummyShard drops a given shard on the node connection. + * It fails if the shard cannot be dropped. */ static void -TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval) +DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval) { char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); StringInfo dropShardQuery = makeStringInfo(); @@ -1879,13 +1888,12 @@ TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval) qualifiedShardName); /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. + * Since the dummy shard is expected to be present on the given node, + * fail if it cannot be dropped during cleanup. */ - ExecuteOptionalRemoteCommand( + ExecuteCriticalRemoteCommand( connection, - dropShardQuery->data, - NULL /* pgResult */); + dropShardQuery->data); } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 5bd41d561..80238fce1 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1079,19 +1079,19 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) * If replication slot can not be dropped while dropping the subscriber, drop * it here. */ - DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)); + DropShardReplicationSlot(connection, ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardPublication(connection, ShardMovePublicationName(ownerId)); } } /* - * DropShardMoveReplicationSlot drops the replication slot with the given name + * DropShardReplicationSlot drops the replication slot with the given name * if it exists. */ void -DropShardMoveReplicationSlot(MultiConnection *connection, char *replicationSlotName) +DropShardReplicationSlot(MultiConnection *connection, char *replicationSlotName) { ExecuteCriticalRemoteCommand( connection, @@ -1271,7 +1271,7 @@ DropAllShardMoveReplicationSlots(MultiConnection *connection) char *slotName; foreach_ptr(slotName, slotNameList) { - DropShardMoveReplicationSlot(connection, slotName); + DropShardReplicationSlot(connection, slotName); } } @@ -1469,6 +1469,10 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, /* * The CREATE USER command should not propagate, so we temporarily * disable DDL propagation. + * + * Subscription workers have SUPERUSER permissions. Hence we temporarily + * create a user with SUPERUSER permissions and then alter it to NOSUPERUSER. + * This prevents permission escalations. */ SendCommandListToWorkerOutsideTransaction( connection->hostname, connection->port, connection->user, diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index d4cd37368..36161b819 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -505,17 +505,8 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot "SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')", templateSlotName, slotName); - PGresult *result = NULL; - int response = ExecuteOptionalRemoteCommand(sourceNodeConnection, - createReplicationSlotCommand->data, - &result); - if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) - { - ReportResultError(sourceNodeConnection, result, ERROR); - } - - PQclear(result); - ForgetResults(sourceNodeConnection); + ExecuteCriticalRemoteCommand(sourceNodeConnection, + createReplicationSlotCommand->data); } } @@ -665,7 +656,7 @@ DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection) char *slotName; foreach_ptr(slotName, slotNameList) { - DropShardMoveReplicationSlot(cleanupConnection, slotName); + DropShardReplicationSlot(cleanupConnection, slotName); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 6705b61e3..52a40e5a6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -73,6 +73,7 @@ #include "distributed/run_from_same_connection.h" #include "distributed/shard_cleaner.h" #include "distributed/shared_connection_stats.h" +#include "distributed/shardsplit_shared_memory.h" #include "distributed/query_pushdown_planning.h" #include "distributed/time_constants.h" #include "distributed/query_stats.h" @@ -103,8 +104,6 @@ #include "utils/syscache.h" #include "utils/varlena.h" -#include "distributed/shardsplit_shared_memory.h" - #include "columnar/columnar.h" ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL; diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index 81712ab38..6853732ee 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -46,7 +46,6 @@ CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint) STRICT AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$; -#include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql" -- add relations to citus ALTER EXTENSION citus ADD SCHEMA columnar; ALTER EXTENSION citus ADD SEQUENCE columnar.storageid_seq; @@ -74,6 +73,11 @@ DROP FUNCTION pg_catalog.worker_split_copy( splitCopyInfos pg_catalog.split_copy_info[]); DROP TYPE pg_catalog.split_copy_info; +DROP FUNCTION pg_catalog.worker_split_shard_replication_setup( + splitShardInfo pg_catalog.split_shard_info[]); +DROP TYPE pg_catalog.split_shard_info; +DROP TYPE pg_catalog.replication_slot_info; + DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT global_pid int8); diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql index 35ca3b898..3ee131d45 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql @@ -5,15 +5,21 @@ CREATE TYPE citus.split_shard_info AS ( shard_min_value text, shard_max_value text, node_id integer); +ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog; +COMMENT ON TYPE pg_catalog.split_shard_info + IS 'Stores split child shard information'; CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); +ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog; +COMMENT ON TYPE pg_catalog.replication_slot_info + IS 'Replication slot information to be used for subscriptions during non blocking shard split'; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - splitShardInfo citus.split_shard_info[]) -RETURNS setof citus.replication_slot_info + splitShardInfo pg_catalog.split_shard_info[]) +RETURNS setof pg_catalog.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[]) IS 'Replication setup for splitting a shard'; -REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql index 6e70b827c..3ee131d45 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql @@ -5,16 +5,21 @@ CREATE TYPE citus.split_shard_info AS ( shard_min_value text, shard_max_value text, node_id integer); +ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog; +COMMENT ON TYPE pg_catalog.split_shard_info + IS 'Stores split child shard information'; CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); +ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog; +COMMENT ON TYPE pg_catalog.replication_slot_info + IS 'Replication slot information to be used for subscriptions during non blocking shard split'; CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( - splitShardInfo citus.split_shard_info[]) -RETURNS setof citus.replication_slot_info + splitShardInfo pg_catalog.split_shard_info[]) +RETURNS setof pg_catalog.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; -COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) - IS 'Replication setup for splitting a shard'; +COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[]) IS 'Replication setup for splitting a shard'; -REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; diff --git a/src/backend/distributed/utils/citus_safe_lib.c b/src/backend/distributed/utils/citus_safe_lib.c index 7e53bfe3f..0b830fafc 100644 --- a/src/backend/distributed/utils/citus_safe_lib.c +++ b/src/backend/distributed/utils/citus_safe_lib.c @@ -110,6 +110,53 @@ SafeStringToInt64(const char *str) } +/* + * SafeStringToInt32 converts a string containing a number to a int32. When it + * fails it calls ereport. + * + * The different error cases are inspired by + * https://stackoverflow.com/a/26083517/2570866 + */ +int32 +SafeStringToInt32(const char *str) +{ + char *endptr; + errno = 0; + long number = strtol(str, &endptr, 10); + + if (str == endptr) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str))); + } + else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str))); + } + else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX) + { + ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str))); + } + else if (errno == EINVAL) + { + ereport(ERROR, (errmsg( + "Error parsing %s as int32, base contains unsupported value\n", + str))); + } + else if (errno != 0 && number == 0) + { + int err = errno; + ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err))); + } + else if (errno == 0 && str && *endptr != '\0') + { + ereport(ERROR, (errmsg( + "Error parsing %s as int32, aditional characters remain after int32\n", + str))); + } + return number; +} + + /* * SafeStringToUint64 converts a string containing a number to a uint64. When it * fails it calls ereport. @@ -295,50 +342,3 @@ SafeSnprintf(char *restrict buffer, rsize_t bufsz, const char *restrict format, va_end(args); return result; } - - -/* - * SafeStringToInt32 converts a string containing a number to a int32. When it - * fails it calls ereport. - * - * The different error cases are inspired by - * https://stackoverflow.com/a/26083517/2570866 - */ -int32 -SafeStringToInt32(const char *str) -{ - char *endptr; - errno = 0; - long number = strtol(str, &endptr, 10); - - if (str == endptr) - { - ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str))); - } - else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN) - { - ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str))); - } - else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX) - { - ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str))); - } - else if (errno == EINVAL) - { - ereport(ERROR, (errmsg( - "Error parsing %s as int32, base contains unsupported value\n", - str))); - } - else if (errno != 0 && number == 0) - { - int err = errno; - ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err))); - } - else if (errno == 0 && str && *endptr != '\0') - { - ereport(ERROR, (errmsg( - "Error parsing %s as int32, aditional characters remain after int32\n", - str))); - } - return number; -} diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 627a59865..6c3b8ae8d 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -199,7 +199,7 @@ typedef struct MultiConnection uint64 copyBytesWrittenSinceLastFlush; /* replication option */ - bool requiresReplicationOption; + bool requiresReplication; MultiConnectionStructInitializationState initilizationState; } MultiConnection; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index bad361ae6..56e15d171 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -25,6 +25,7 @@ #include "distributed/connection_management.h" #include "distributed/errormessage.h" #include "distributed/relay_utility.h" +#include "distributed/worker_manager.h" #include "utils/acl.h" #include "utils/relcache.h" @@ -226,6 +227,7 @@ extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); extern List * ActiveShardPlacementList(uint64 shardId); extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); +extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId); extern List * BuildShardPlacementList(int64 shardId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index c80ff6e6c..9537f0224 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -37,8 +37,8 @@ extern void DropShardSubscription(MultiConnection *connection, extern void DropShardPublication(MultiConnection *connection, char *publicationName); extern void DropShardUser(MultiConnection *connection, char *username); -extern void DropShardMoveReplicationSlot(MultiConnection *connection, - char *publicationName); +extern void DropShardReplicationSlot(MultiConnection *connection, + char *publicationName); extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix); diff --git a/src/test/regress/enterprise_isolation_logicalrep_1_schedule b/src/test/regress/enterprise_isolation_logicalrep_1_schedule index f9fd93f95..96cc9915e 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_1_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_1_schedule @@ -9,4 +9,4 @@ test: isolation_logical_replication_single_shard_commands test: isolation_logical_replication_multi_shard_commands test: isolation_non_blocking_shard_split test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity -test: isolation_non_blocking_shard_split_fkey \ No newline at end of file +test: isolation_non_blocking_shard_split_fkey diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index 7485f850f..229c8aedf 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -64,11 +64,11 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, - ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 7590388ca..be0a46d4b 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -62,9 +62,9 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index 4dced5752..77bdf8336 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -10,9 +10,9 @@ SET client_min_messages TO ERROR; -- Create publication at worker1 CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ]); count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 9b1ec403c..1f41519ca 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -8,9 +8,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec index 64316f641..78904b217 100644 --- a/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec +++ b/src/test/regress/spec/isolation_non_blocking_shard_split_with_index_as_replicaIdentity.spec @@ -13,7 +13,7 @@ setup teardown { - DROP TABLE to_split_table CASCADE; + DROP TABLE to_split_table CASCADE; } diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index 212b771c8..e122221cd 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -75,4 +75,4 @@ SELECT pg_catalog.citus_split_shard_by_split_points( SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); \c - - - :worker_2_port -SELECT slot_name FROM pg_replication_slots; \ No newline at end of file +SELECT slot_name FROM pg_replication_slots; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 4733e1cf4..7d8f0d71d 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -66,11 +66,11 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, - ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 176dd6576..97e1d275f 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -66,9 +66,9 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index a33b4e27d..40cbd4063 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -13,9 +13,9 @@ SET client_min_messages TO ERROR; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ]); SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index d9f8d527c..47e0c2aa3 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -11,9 +11,9 @@ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info +SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset