Address review comments

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-08-01 12:54:30 +05:30
parent 8c0599d409
commit c56b79b6f7
24 changed files with 217 additions and 177 deletions

View File

@ -675,7 +675,7 @@ CloseConnection(MultiConnection *connection)
strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH);
key.port = connection->port; key.port = connection->port;
key.replicationConnParam = connection->requiresReplicationOption; key.replicationConnParam = connection->requiresReplication;
strlcpy(key.user, connection->user, NAMEDATALEN); strlcpy(key.user, connection->user, NAMEDATALEN);
strlcpy(key.database, connection->database, NAMEDATALEN); strlcpy(key.database, connection->database, NAMEDATALEN);
@ -1262,7 +1262,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
connection->port = key->port; connection->port = key->port;
strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->database, key->database, NAMEDATALEN);
strlcpy(connection->user, key->user, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN);
connection->requiresReplicationOption = key->replicationConnParam; connection->requiresReplication = key->replicationConnParam;
connection->pgConn = PQconnectStartParams((const char **) entry->keywords, connection->pgConn = PQconnectStartParams((const char **) entry->keywords,
(const char **) entry->values, (const char **) entry->values,

View File

@ -1437,6 +1437,27 @@ ActiveShardPlacement(uint64 shardId, bool missingOk)
} }
/*
* ActiveShardPlacementWorkerNode returns the worker node of the first placement of
* a shard.
*/
WorkerNode *
ActiveShardPlacementWorkerNode(uint64 shardId)
{
bool missingOK = false;
List *sourcePlacementList = ActiveShardPlacementList(shardId);
Assert(sourcePlacementList->length == 1);
ShardPlacement *sourceShardPlacement = linitial(sourcePlacementList);
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
missingOK);
return sourceShardToCopyNode;
}
/* /*
* BuildShardPlacementList finds shard placements for the given shardId from * BuildShardPlacementList finds shard placements for the given shardId from
* system catalogs, converts these placements to their in-memory * system catalogs, converts these placements to their in-memory

View File

@ -88,7 +88,6 @@ static void NonBlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit, ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList, List *shardSplitPointsList,
List *workersForPlacementList); List *workersForPlacementList);
static void DoSplitCopy(WorkerNode *sourceShardNode, static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
@ -97,6 +96,8 @@ static void DoSplitCopy(WorkerNode *sourceShardNode,
static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
List *splitChildrenShardIntervalList, List *splitChildrenShardIntervalList,
List *workersForPlacementList); List *workersForPlacementList);
static Task * CreateSplitCopyTask(StringInfo splitCopyUdfCommand, char *snapshotName, int
taskId, uint64 jobId);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList, static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList,
@ -106,7 +107,6 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow);
static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static HTAB * CreateEmptyMapForShardsCreatedByWorkflow();
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
static StringInfo CreateSplitShardReplicationSetupUDF( static StringInfo CreateSplitShardReplicationSetupUDF(
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
@ -122,7 +122,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval);
static void DropDummyShards(void); static void DropDummyShards(void);
static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval); static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
@ -137,7 +137,11 @@ static const char *const SplitTargetName[] =
[ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant",
}; };
/* Map containing list of dummy shards created on target nodes */ /*
* Map containing list of dummy shards created on target nodes.
* Key - <nodeId, tableOwnerId>
* Value - ShardInterval
*/
static HTAB *DummyShardInfoHashMap = NULL; static HTAB *DummyShardInfoHashMap = NULL;
/* Function definitions */ /* Function definitions */
@ -724,8 +728,10 @@ CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkfl
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/* For Blocking split, copy isn't snapshotted */
char *snapshotName = NULL;
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList, NULL); shardGroupSplitIntervalListList, workersForPlacementList, snapshotName);
/* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */ /* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
@ -766,42 +772,13 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
splitShardIntervalList, splitShardIntervalList,
destinationWorkerNodesList); destinationWorkerNodesList);
/* /* Create copy task. Snapshot name is required for nonblocking splits */
* TODO(saawasek):1)Potentially refactor query list into a different method. Task *splitCopyTask = CreateSplitCopyTask(splitCopyUdfCommand, snapShotName,
* 2) Assign Distributed Txn(confirm)? taskId,
*/ sourceShardIntervalToCopy->shardId);
List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
/* Set snapshot for non-blocking shard split. */
if (snapShotName != NULL)
{
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(
snapShotName));
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
}
ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data);
StringInfo commitCommand = makeStringInfo();
appendStringInfo(commitCommand, "COMMIT;");
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
Task *splitCopyTask = CitusMakeNode(Task);
splitCopyTask->jobId = sourceShardIntervalToCopy->shardId;
splitCopyTask->taskId = taskId;
splitCopyTask->taskType = READ_TASK;
splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(splitCopyTask, ddlCommandList);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, sourceShardNode); SetPlacementNodeMetadata(taskPlacement, sourceShardNode);
splitCopyTask->taskPlacementList = list_make1(taskPlacement); splitCopyTask->taskPlacementList = list_make1(taskPlacement);
splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask); splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask);
@ -878,6 +855,43 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval,
} }
static Task *
CreateSplitCopyTask(StringInfo splitCopyUdfCommand, char *snapshotName, int taskId, uint64
jobId)
{
List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction,
"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
ddlCommandList = lappend(ddlCommandList, beginTransaction->data);
/* Set snapshot for non-blocking shard split. */
if (snapshotName != NULL)
{
StringInfo snapShotString = makeStringInfo();
appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;",
quote_literal_cstr(
snapshotName));
ddlCommandList = lappend(ddlCommandList, snapShotString->data);
}
ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data);
StringInfo commitCommand = makeStringInfo();
appendStringInfo(commitCommand, "COMMIT;");
ddlCommandList = lappend(ddlCommandList, commitCommand->data);
Task *splitCopyTask = CitusMakeNode(Task);
splitCopyTask->jobId = jobId;
splitCopyTask->taskId = taskId;
splitCopyTask->taskType = READ_TASK;
splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(splitCopyTask, ddlCommandList);
return splitCopyTask;
}
/* /*
* Create an object on a worker node. * Create an object on a worker node.
*/ */
@ -1231,10 +1245,10 @@ TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow)
/* /*
* SplitShard API to split a given shard (or shard group) in non-blocking fashion * SplitShard API to split a given shard (or shard group) in non-blocking fashion
* based on specified split points to a set of destination nodes. * based on specified split points to a set of destination nodes.
* 'splitOperation' : Customer operation that triggered split. * splitOperation : Customer operation that triggered split.
* 'shardIntervalToSplit' : Source shard interval to be split. * shardIntervalToSplit : Source shard interval to be split.
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. * shardSplitPointsList : Split Points list for the source 'shardInterval'.
* 'workersForPlacementList' : Placement list corresponding to split children. * workersForPlacementList : Placement list corresponding to split children.
*/ */
static void static void
NonBlockingShardSplit(SplitOperation splitOperation, NonBlockingShardSplit(SplitOperation splitOperation,
@ -1254,13 +1268,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardSplitPointsList); shardSplitPointsList);
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */ WorkerNode *sourceShardToCopyNode = ActiveShardPlacementWorkerNode(
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1);
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(
sourcePlacementList);
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
/* Create hashmap to group shards for publication-subscription management */ /* Create hashmap to group shards for publication-subscription management */
HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
@ -1636,7 +1645,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
destinationWorkerNodesList); destinationWorkerNodesList);
/* Force a new connection to execute the UDF */ /* Force a new connection to execute the UDF */
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = 0;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode-> sourceWorkerNode->
workerName, workerName,
@ -1660,14 +1669,14 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) < 1 || if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) < 1 ||
PQnfields(result) != 3) PQnfields(result) != 3)
{ {
PQclear(result);
ForgetResults(sourceConnection);
CloseConnection(sourceConnection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg( errmsg(
"Failed to run worker_split_shard_replication_setup UDF. It should successfully execute " "Failed to run worker_split_shard_replication_setup UDF. It should successfully execute "
" for splitting a shard in a non-blocking way. Please retry."))); " for splitting a shard in a non-blocking way. Please retry.")));
PQclear(result);
ForgetResults(sourceConnection);
CloseConnection(sourceConnection);
} }
/* Get replication slot information */ /* Get replication slot information */
@ -1740,7 +1749,7 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
splitChildShardInterval->maxValue)); splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows, appendStringInfo(splitChildrenRows,
"ROW(%lu, %s, %lu, %s, %s, %u)::citus.split_shard_info", "ROW(%lu, %s, %lu, %s, %s, %u)::pg_catalog.split_shard_info",
sourceShardId, sourceShardId,
quote_literal_cstr(partitionColumnName), quote_literal_cstr(partitionColumnName),
splitChildShardInterval->shardId, splitChildShardInterval->shardId,
@ -1754,7 +1763,7 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
StringInfo splitShardReplicationUDF = makeStringInfo(); StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF, appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s]);", "SELECT * FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[%s]);",
splitChildrenRows->data); splitChildrenRows->data);
return splitShardReplicationUDF; return splitShardReplicationUDF;
@ -1845,7 +1854,6 @@ DropDummyShards()
int connectionFlags = FOR_DDL; int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION; connectionFlags |= OUTSIDE_TRANSACTION;
connectionFlags |= FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection( MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags, connectionFlags,
shardToBeDroppedNode->workerName, shardToBeDroppedNode->workerName,
@ -1857,7 +1865,7 @@ DropDummyShards()
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList) foreach_ptr(shardInterval, dummyShardIntervalList)
{ {
TryDroppingShard(connection, shardInterval); DropDummyShard(connection, shardInterval);
} }
CloseConnection(connection); CloseConnection(connection);
@ -1866,10 +1874,11 @@ DropDummyShards()
/* /*
* TryDroppingShard drops a given shard on the source node connection. * DropDummyShard drops a given shard on the node connection.
* It fails if the shard cannot be dropped.
*/ */
static void static void
TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval) DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval)
{ {
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo(); StringInfo dropShardQuery = makeStringInfo();
@ -1879,13 +1888,12 @@ TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval)
qualifiedShardName); qualifiedShardName);
/* /*
* Perform a drop in best effort manner. * Since the dummy shard is expected to be present on the given node,
* The shard may or may not exist and the connection could have died. * fail if it cannot be dropped during cleanup.
*/ */
ExecuteOptionalRemoteCommand( ExecuteCriticalRemoteCommand(
connection, connection,
dropShardQuery->data, dropShardQuery->data);
NULL /* pgResult */);
} }

View File

@ -1079,19 +1079,19 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds)
* If replication slot can not be dropped while dropping the subscriber, drop * If replication slot can not be dropped while dropping the subscriber, drop
* it here. * it here.
*/ */
DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, DropShardReplicationSlot(connection, ShardSubscriptionName(ownerId,
SHARD_MOVE_SUBSCRIPTION_PREFIX)); SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardPublication(connection, ShardMovePublicationName(ownerId)); DropShardPublication(connection, ShardMovePublicationName(ownerId));
} }
} }
/* /*
* DropShardMoveReplicationSlot drops the replication slot with the given name * DropShardReplicationSlot drops the replication slot with the given name
* if it exists. * if it exists.
*/ */
void void
DropShardMoveReplicationSlot(MultiConnection *connection, char *replicationSlotName) DropShardReplicationSlot(MultiConnection *connection, char *replicationSlotName)
{ {
ExecuteCriticalRemoteCommand( ExecuteCriticalRemoteCommand(
connection, connection,
@ -1271,7 +1271,7 @@ DropAllShardMoveReplicationSlots(MultiConnection *connection)
char *slotName; char *slotName;
foreach_ptr(slotName, slotNameList) foreach_ptr(slotName, slotNameList)
{ {
DropShardMoveReplicationSlot(connection, slotName); DropShardReplicationSlot(connection, slotName);
} }
} }
@ -1469,6 +1469,10 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
/* /*
* The CREATE USER command should not propagate, so we temporarily * The CREATE USER command should not propagate, so we temporarily
* disable DDL propagation. * disable DDL propagation.
*
* Subscription workers have SUPERUSER permissions. Hence we temporarily
* create a user with SUPERUSER permissions and then alter it to NOSUPERUSER.
* This prevents permission escalations.
*/ */
SendCommandListToWorkerOutsideTransaction( SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user, connection->hostname, connection->port, connection->user,

View File

@ -505,17 +505,8 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot
"SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')", "SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')",
templateSlotName, slotName); templateSlotName, slotName);
PGresult *result = NULL; ExecuteCriticalRemoteCommand(sourceNodeConnection,
int response = ExecuteOptionalRemoteCommand(sourceNodeConnection, createReplicationSlotCommand->data);
createReplicationSlotCommand->data,
&result);
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ReportResultError(sourceNodeConnection, result, ERROR);
}
PQclear(result);
ForgetResults(sourceNodeConnection);
} }
} }
@ -665,7 +656,7 @@ DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection)
char *slotName; char *slotName;
foreach_ptr(slotName, slotNameList) foreach_ptr(slotName, slotNameList)
{ {
DropShardMoveReplicationSlot(cleanupConnection, slotName); DropShardReplicationSlot(cleanupConnection, slotName);
} }
} }

View File

@ -73,6 +73,7 @@
#include "distributed/run_from_same_connection.h" #include "distributed/run_from_same_connection.h"
#include "distributed/shard_cleaner.h" #include "distributed/shard_cleaner.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#include "distributed/time_constants.h" #include "distributed/time_constants.h"
#include "distributed/query_stats.h" #include "distributed/query_stats.h"
@ -103,8 +104,6 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/varlena.h" #include "utils/varlena.h"
#include "distributed/shardsplit_shared_memory.h"
#include "columnar/columnar.h" #include "columnar/columnar.h"
ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL; ColumnarSupportsIndexAM_type extern_ColumnarSupportsIndexAM = NULL;

View File

@ -46,7 +46,6 @@ CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
STRICT STRICT
AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$; AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$;
#include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql"
-- add relations to citus -- add relations to citus
ALTER EXTENSION citus ADD SCHEMA columnar; ALTER EXTENSION citus ADD SCHEMA columnar;
ALTER EXTENSION citus ADD SEQUENCE columnar.storageid_seq; ALTER EXTENSION citus ADD SEQUENCE columnar.storageid_seq;
@ -74,6 +73,11 @@ DROP FUNCTION pg_catalog.worker_split_copy(
splitCopyInfos pg_catalog.split_copy_info[]); splitCopyInfos pg_catalog.split_copy_info[]);
DROP TYPE pg_catalog.split_copy_info; DROP TYPE pg_catalog.split_copy_info;
DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo pg_catalog.split_shard_info[]);
DROP TYPE pg_catalog.split_shard_info;
DROP TYPE pg_catalog.replication_slot_info;
DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8); OUT global_pid int8);

View File

@ -5,15 +5,21 @@ CREATE TYPE citus.split_shard_info AS (
shard_min_value text, shard_min_value text,
shard_max_value text, shard_max_value text,
node_id integer); node_id integer);
ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.split_shard_info
IS 'Stores split child shard information';
CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text);
ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.replication_slot_info
IS 'Replication slot information to be used for subscriptions during non blocking shard split';
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[]) splitShardInfo pg_catalog.split_shard_info[])
RETURNS setof citus.replication_slot_info RETURNS setof pg_catalog.replication_slot_info
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[])
IS 'Replication setup for splitting a shard'; IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC;

View File

@ -5,16 +5,21 @@ CREATE TYPE citus.split_shard_info AS (
shard_min_value text, shard_min_value text,
shard_max_value text, shard_max_value text,
node_id integer); node_id integer);
ALTER TYPE citus.split_shard_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.split_shard_info
IS 'Stores split child shard information';
CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text);
ALTER TYPE citus.replication_slot_info SET SCHEMA pg_catalog;
COMMENT ON TYPE pg_catalog.replication_slot_info
IS 'Replication slot information to be used for subscriptions during non blocking shard split';
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[]) splitShardInfo pg_catalog.split_shard_info[])
RETURNS setof citus.replication_slot_info RETURNS setof pg_catalog.replication_slot_info
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo pg_catalog.split_shard_info[])
IS 'Replication setup for splitting a shard';
IS 'Replication setup for splitting a shard'; IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC;

View File

@ -110,6 +110,53 @@ SafeStringToInt64(const char *str)
} }
/*
* SafeStringToInt32 converts a string containing a number to a int32. When it
* fails it calls ereport.
*
* The different error cases are inspired by
* https://stackoverflow.com/a/26083517/2570866
*/
int32
SafeStringToInt32(const char *str)
{
char *endptr;
errno = 0;
long number = strtol(str, &endptr, 10);
if (str == endptr)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str)));
}
else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str)));
}
else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str)));
}
else if (errno == EINVAL)
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, base contains unsupported value\n",
str)));
}
else if (errno != 0 && number == 0)
{
int err = errno;
ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err)));
}
else if (errno == 0 && str && *endptr != '\0')
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, aditional characters remain after int32\n",
str)));
}
return number;
}
/* /*
* SafeStringToUint64 converts a string containing a number to a uint64. When it * SafeStringToUint64 converts a string containing a number to a uint64. When it
* fails it calls ereport. * fails it calls ereport.
@ -295,50 +342,3 @@ SafeSnprintf(char *restrict buffer, rsize_t bufsz, const char *restrict format,
va_end(args); va_end(args);
return result; return result;
} }
/*
* SafeStringToInt32 converts a string containing a number to a int32. When it
* fails it calls ereport.
*
* The different error cases are inspired by
* https://stackoverflow.com/a/26083517/2570866
*/
int32
SafeStringToInt32(const char *str)
{
char *endptr;
errno = 0;
long number = strtol(str, &endptr, 10);
if (str == endptr)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, no digits found\n", str)));
}
else if ((errno == ERANGE && number == LONG_MIN) || number < INT32_MIN)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, underflow occurred\n", str)));
}
else if ((errno == ERANGE && number == LONG_MAX) || number > INT32_MAX)
{
ereport(ERROR, (errmsg("Error parsing %s as int32, overflow occurred\n", str)));
}
else if (errno == EINVAL)
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, base contains unsupported value\n",
str)));
}
else if (errno != 0 && number == 0)
{
int err = errno;
ereport(ERROR, (errmsg("Error parsing %s as int32, errno %d\n", str, err)));
}
else if (errno == 0 && str && *endptr != '\0')
{
ereport(ERROR, (errmsg(
"Error parsing %s as int32, aditional characters remain after int32\n",
str)));
}
return number;
}

View File

@ -199,7 +199,7 @@ typedef struct MultiConnection
uint64 copyBytesWrittenSinceLastFlush; uint64 copyBytesWrittenSinceLastFlush;
/* replication option */ /* replication option */
bool requiresReplicationOption; bool requiresReplication;
MultiConnectionStructInitializationState initilizationState; MultiConnectionStructInitializationState initilizationState;
} MultiConnection; } MultiConnection;

View File

@ -25,6 +25,7 @@
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/worker_manager.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/relcache.h" #include "utils/relcache.h"
@ -226,6 +227,7 @@ extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId); extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId); extern List * ShardPlacementListWithoutOrphanedPlacements(uint64 shardId);
extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk);
extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId);
extern List * BuildShardPlacementList(int64 shardId); extern List * BuildShardPlacementList(int64 shardId);
extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId);
extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState); extern List * AllShardPlacementsWithShardPlacementState(ShardState shardState);

View File

@ -37,8 +37,8 @@ extern void DropShardSubscription(MultiConnection *connection,
extern void DropShardPublication(MultiConnection *connection, char *publicationName); extern void DropShardPublication(MultiConnection *connection, char *publicationName);
extern void DropShardUser(MultiConnection *connection, char *username); extern void DropShardUser(MultiConnection *connection, char *username);
extern void DropShardMoveReplicationSlot(MultiConnection *connection, extern void DropShardReplicationSlot(MultiConnection *connection,
char *publicationName); char *publicationName);
extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix); extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix);

View File

@ -9,4 +9,4 @@ test: isolation_logical_replication_single_shard_commands
test: isolation_logical_replication_multi_shard_commands test: isolation_logical_replication_multi_shard_commands
test: isolation_non_blocking_shard_split test: isolation_non_blocking_shard_split
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
test: isolation_non_blocking_shard_split_fkey test: isolation_non_blocking_shard_split_fkey

View File

@ -64,11 +64,11 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
SET search_path TO split_shard_replication_setup_schema; SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count count

View File

@ -62,9 +62,9 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1 -- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -10,9 +10,9 @@ SET client_min_messages TO ERROR;
-- Create publication at worker1 -- Create publication at worker1
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3 -- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ]);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -8,9 +8,9 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema; SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1 -- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
count count

View File

@ -13,7 +13,7 @@ setup
teardown teardown
{ {
DROP TABLE to_split_table CASCADE; DROP TABLE to_split_table CASCADE;
} }

View File

@ -75,4 +75,4 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port);
\c - - - :worker_2_port \c - - - :worker_2_port
SELECT slot_name FROM pg_replication_slots; SELECT slot_name FROM pg_replication_slots;

View File

@ -66,11 +66,11 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset

View File

@ -66,9 +66,9 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1 -- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset

View File

@ -13,9 +13,9 @@ SET client_min_messages TO ERROR;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3 -- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
]); ]);
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset

View File

@ -11,9 +11,9 @@ SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1 -- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
]); ]);
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset