Address PR comments

pull/6029/head
Nitish Upreti 2022-07-11 15:44:48 -07:00
parent 5a1a8d5dcb
commit cf8fea5a3f
5 changed files with 188 additions and 32 deletions

View File

@ -32,6 +32,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/deparse_shard_query.h"
/* Function declarations */
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
@ -70,6 +71,8 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList,
WorkerNode *workerNode);
/* Customize error message strings based on operation type */
static const char *const SplitOperationName[] =
@ -354,7 +357,7 @@ SplitShard(SplitMode splitMode,
shardSplitPointsList,
nodeIdsForPlacementList);
List *workersForPlacementList = NULL;
List *workersForPlacementList = NIL;
Datum nodeId;
foreach_int(nodeId, nodeIdsForPlacementList)
{
@ -472,13 +475,19 @@ static void
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Iterate on shard interval list for shard group */
List *shardIntervalList = NULL;
/*
* Iterate over all the shards in the shard group.
*/
List *shardIntervalList = NIL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
/*
* Iterate on split shards DDL command list for a given shard
* and create them on corresponding workerPlacementNode.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
@ -498,6 +507,24 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
}
/* Create a DDL task with corresponding task list on given worker node */
static Task *
CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode)
{
Task *ddlTask = CitusMakeNode(Task);
ddlTask->jobId = jobId;
ddlTask->taskType = DDL_TASK;
ddlTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(ddlTask, ddlCommandList);
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, workerNode);
ddlTask->taskPlacementList = list_make1(taskPlacement);
return ddlTask;
}
/* Create ShardGroup auxiliary structures (indexes, stats, replicaindentities, triggers)
* on a list of corresponding workers.
*/
@ -505,29 +532,45 @@ static void
CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
List *shardIntervalList = NIL;
List *ddlTaskExecList = NIL;
/*
* Create auxiliary structures post copy.
* Iterate over all the shards in the shard group.
*/
List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
/*
* Iterate on split shard interval list for given shard and create tasks
* for every single split shard in a shard group.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
List *indexCommandList = GetPostLoadTableCreationCommands(
List *ddlCommandList = GetPostLoadTableCreationCommands(
shardInterval->relationId,
true /* includeIndexes */,
true /* includeReplicaIdentity */);
indexCommandList = WorkerApplyShardDDLCommandList(
indexCommandList,
ddlCommandList = WorkerApplyShardDDLCommandList(
ddlCommandList,
shardInterval->shardId);
CreateObjectOnPlacement(indexCommandList, workerPlacementNode);
uint64 jobId = shardInterval->shardId;
Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList,
workerPlacementNode);
ddlTaskExecList = lappend(ddlTaskExecList, ddlTask);
}
}
ExecuteTaskListOutsideTransaction(
ROW_MODIFY_NONE,
ddlTaskExecList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
}
@ -565,10 +608,10 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList)
{
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitShardIntervalList = NULL;
List *splitShardIntervalList = NIL;
int taskId = 0;
List *splitCopyTaskList = NULL;
List *splitCopyTaskList = NIL;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitShardIntervalList, shardGroupSplitIntervalListList)
{
@ -690,12 +733,12 @@ static List *
CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList,
List *splitPointsForShard)
{
List *shardGroupSplitIntervalListList = NULL;
List *shardGroupSplitIntervalListList = NIL;
ShardInterval *shardToSplitInterval = NULL;
foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList)
{
List *shardSplitIntervalList = NULL;
List *shardSplitIntervalList = NIL;
CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard,
&shardSplitIntervalList);
@ -761,12 +804,17 @@ static void
InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Iterate on shard intervals for shard group */
List *shardIntervalList = NULL;
List *syncedShardList = NULL;
List *shardIntervalList = NIL;
List *syncedShardList = NIL;
/*
* Iterate over all the shards in the shard group.
*/
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split children shards along with the respective placement workers */
/*
* Iterate on split shards list for a given shard and insert metadata.
*/
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
@ -811,12 +859,19 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Create constraints between shards */
List *shardIntervalList = NULL;
List *shardIntervalList = NIL;
/*
* Iterate over all the shards in the shard group.
*/
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split children shards along with the respective placement workers */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
/*
* Iterate on split shards list for a given shard and create constraints.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
@ -922,12 +977,19 @@ static void
TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
List *shardIntervalList = NULL;
List *shardIntervalList = NIL;
/*
* Iterate over all the shards in the shard group.
*/
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
/*
* Iterate on split shards list for a given shard and perform drop.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{

View File

@ -71,8 +71,9 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation,
static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat);
static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName,
bool
useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
@ -102,7 +103,7 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);
StringInfo copyStatement = ConstructCopyStatement(
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
@ -344,11 +345,12 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
/*
* ConstructCopyStatement constructs the text of a COPY statement
* ConstructShardCopyStatement constructs the text of a COPY statement
* for copying into a result table
*/
static StringInfo
ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool useBinaryFormat)
ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat)
{
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);

View File

@ -12,6 +12,7 @@
#include "utils/lsyscache.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "distributed/utils/array_type.h"
#include "distributed/listutils.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"
@ -54,10 +55,12 @@ worker_split_copy(PG_FUNCTION_ARGS)
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy);
ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1);
if (array_contains_nulls(splitCopyInfoArrayObject))
bool arrayHasNull = ARR_HASNULL(splitCopyInfoArrayObject);
if (arrayHasNull)
{
ereport(ERROR,
(errmsg("Shard Copy Info cannot have null values.")));
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg(
"citus.split_copy_info array cannot contain null values")));
}
const int slice_ndim = 0;
@ -67,7 +70,7 @@ worker_split_copy(PG_FUNCTION_ARGS)
mState);
Datum copyInfoDatum = 0;
bool isnull = false;
List *splitCopyInfoList = NULL;
List *splitCopyInfoList = NIL;
while (array_iterate(copyInfo_iterator, &copyInfoDatum, &isnull))
{
SplitCopyInfo *splitCopyInfo = NULL;

View File

@ -51,6 +51,52 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END: Set worker_1_node and worker_2_node
-- BEGIN: Test Negative scenario
SELECT * from worker_split_copy(
101, -- Invalid source shard id.
ARRAY[
-- split copy info for split children 1
ROW(81070015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::citus.split_copy_info,
-- split copy info for split children 2
ROW(81070016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_1_node)::citus.split_copy_info
]
);
ERROR: could not find valid entry for shard xxxxx
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[] -- empty array
);
ERROR: cannot determine type of empty array
HINT: Explicitly cast to the desired type, for example ARRAY[]::integer[].
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[NULL] -- empty array
);
ERROR: function worker_split_copy(integer, text[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[NULL::citus.split_copy_info]-- empty array
);
ERROR: citus.split_copy_info array cannot contain null values
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[ROW(NULL)]-- empty array
);
ERROR: function worker_split_copy(integer, record[]) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array
);
ERROR: destination_shard_id for split_copy_info cannot be null.
-- END: Test Negative scenario
-- BEGIN: Trigger 2-way local shard split copy.
-- Ensure we will perform text copy.
SET citus.enable_binary_protocol = false;

View File

@ -35,6 +35,49 @@ SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- END: Set worker_1_node and worker_2_node
-- BEGIN: Test Negative scenario
SELECT * from worker_split_copy(
101, -- Invalid source shard id.
ARRAY[
-- split copy info for split children 1
ROW(81070015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::citus.split_copy_info,
-- split copy info for split children 2
ROW(81070016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_1_node)::citus.split_copy_info
]
);
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[] -- empty array
);
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[NULL] -- empty array
);
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[NULL::citus.split_copy_info]-- empty array
);
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[ROW(NULL)]-- empty array
);
SELECT * from worker_split_copy(
81070000, -- source shard id to copy
ARRAY[ROW(NULL, NULL, NULL, NULL)::citus.split_copy_info] -- empty array
);
-- END: Test Negative scenario
-- BEGIN: Trigger 2-way local shard split copy.
-- Ensure we will perform text copy.
SET citus.enable_binary_protocol = false;