From 77253cdafb2ba1c72ea26219a2ecd6a59e3283e7 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 16 Jun 2022 14:47:48 -0700 Subject: [PATCH] worker_split_copy UDF --- .../distributed/operations/shard_split.c | 1 + .../operations/worker_shard_copy.c | 26 +++-- .../operations/worker_split_copy.c | 37 ++++--- .../operations/worker_split_copy_udf.c | 103 ++++++++++++++++++ .../sql/udfs/worker_split_copy/11.0-2.sql | 17 +++ .../sql/udfs/worker_split_copy/latest.sql | 17 +++ src/include/distributed/relation_utils.h | 6 - src/include/distributed/worker_shard_copy.h | 2 +- src/include/distributed/worker_split_copy.h | 10 +- 9 files changed, 182 insertions(+), 37 deletions(-) create mode 100644 src/backend/distributed/operations/worker_split_copy_udf.c create mode 100644 src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql create mode 100644 src/backend/distributed/sql/udfs/worker_split_copy/latest.sql diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1f8857221..032c7a252 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -473,6 +473,7 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Perform Split Copy */ + // TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely. /* Create Indexes post copy */ foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) { diff --git a/src/backend/distributed/operations/worker_shard_copy.c b/src/backend/distributed/operations/worker_shard_copy.c index e16e55cef..dcc14a321 100644 --- a/src/backend/distributed/operations/worker_shard_copy.c +++ b/src/backend/distributed/operations/worker_shard_copy.c @@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver DestReceiver pub; /* Destination Relation Name */ - FullRelationName *destinationRelation; + char* destinationShardFullyQualifiedName; /* descriptor of the tuples that are sent to the worker */ TupleDesc tupleDescriptor; @@ -69,7 +69,7 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver); static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool CanUseLocalCopy(uint64 destinationNodeId); -static StringInfo ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat); +static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState); static bool ShouldSendCopyNow(StringInfo buffer); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); @@ -113,7 +113,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) NULL); ClaimConnectionExclusively(copyDest->connection); - StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationRelation, + StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName, copyDest->destinationNodeId); ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data); } @@ -139,7 +139,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to shard %s,", - copyDest->destinationRelation->relationName), + copyDest->destinationShardFullyQualifiedName), errdetail("failed to send %d bytes %s", copyOutState->fe_msgbuf->len, copyOutState->fe_msgbuf->data))); } @@ -201,7 +201,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest) { ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("Failed to COPY to destination shard %s", - copyDest->destinationRelation->relationName))); + copyDest->destinationShardFullyQualifiedName))); } /* check whether there were any COPY errors */ @@ -240,11 +240,11 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest) * for copying into a result table */ static StringInfo -ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat) +ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat) { StringInfo command = makeStringInfo(); appendStringInfo(command, "COPY %s FROM STDIN", - quote_qualified_identifier(relation->schemaName, relation->relationName)); + destinationShardFullyQualifiedName); if(useBinaryFormat) { @@ -255,7 +255,7 @@ ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat) } DestReceiver * CreateShardCopyDestReceiver( - FullRelationName* destinationRelation, + char* destinationShardFullyQualifiedName, uint32_t destinationNodeId) { ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0( @@ -269,7 +269,7 @@ DestReceiver * CreateShardCopyDestReceiver( copyDest->pub.mydest = DestCopyOut; copyDest->destinationNodeId = destinationNodeId; - copyDest->destinationRelation = destinationRelation; + copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName; copyDest->tuplesSent = 0; copyDest->connection = NULL; copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId); @@ -320,8 +320,12 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState */ LocalCopyBuffer = localCopyOutState->fe_msgbuf; - Oid destinationSchemaOid = get_namespace_oid(copyDest->destinationRelation->schemaName, false /* missing_ok */); - Oid destinationShardOid = get_relname_relid(copyDest->destinationRelation->relationName, destinationSchemaOid); + char *destinationShardSchemaName = NULL; + char *destinationShardRelationName = NULL; + DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), &destinationShardSchemaName, &destinationShardRelationName); + + Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */); + Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid); DefElem *binaryFormatOption = NULL; if (isBinaryCopy) diff --git a/src/backend/distributed/operations/worker_split_copy.c b/src/backend/distributed/operations/worker_split_copy.c index b1e565ade..38168600e 100644 --- a/src/backend/distributed/operations/worker_split_copy.c +++ b/src/backend/distributed/operations/worker_split_copy.c @@ -14,11 +14,13 @@ #include "postgres.h" #include "catalog/namespace.h" #include "utils/lsyscache.h" +#include "utils/builtins.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/relation_utils.h" #include "distributed/worker_split_copy.h" #include "distributed/worker_shard_copy.h" +#include "distributed/relay_utility.h" typedef struct SplitCopyDestReceiver { @@ -35,10 +37,10 @@ typedef struct SplitCopyDestReceiver uint splitFactor; /* Source shard name */ - FullRelationName *sourceShardName; + char *sourceShardName; /* Source shard Oid */ - Oid sourceShardOid; + Oid sourceShardRelationOid; } SplitCopyDestReceiver; @@ -49,7 +51,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, static void SplitCopyDestReceiverShutdown(DestReceiver *dest); static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest); -DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList) +DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList) { SplitCopyDestReceiver *splitCopyDest = palloc0(sizeof(SplitCopyDestReceiver)); @@ -60,22 +62,29 @@ DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown; splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy; - Oid sourceSchemaOid = get_namespace_oid(sourceShard->schemaName, false /* missing_ok */); - Oid sourceShardOid = get_relname_relid(sourceShard->relationName, sourceSchemaOid); - splitCopyDest->sourceShardOid = sourceShardOid; - splitCopyDest->splitFactor = splitCopyInfoList->length; + ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy); + splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId; DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * sizeof(DestReceiver *)); SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * sizeof(SplitCopyInfo *)); SplitCopyInfo *splitCopyInfo = NULL; int index = 0; + + char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); foreach_ptr(splitCopyInfo, splitCopyInfoList) { + char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(splitCopyDest->sourceShardRelationOid));; + char *destinationShardNameCopy = strdup(sourceShardNamePrefix); + AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); + + char *destinationShardFullyQualifiedName = + quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy); + DestReceiver *shardCopyDest = CreateShardCopyDestReceiver( - splitCopyInfo->destinationShard, - splitCopyInfo->nodeId); + destinationShardFullyQualifiedName, + splitCopyInfo->destinationShardNodeId); shardCopyDests[index] = shardCopyDest; splitCopyInfos[index] = splitCopyInfo; @@ -103,11 +112,11 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des { SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest; - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardOid); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardRelationOid); if (cacheEntry == NULL) { ereport(ERROR, errmsg("Could not find shard %s for split copy.", - self->sourceShardName->relationName)); + self->sourceShardName)); } /* Partition Column Metadata on source shard */ @@ -122,7 +131,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des if (columnNulls[partitionColumnIndex]) { ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.", - self->sourceShardName->relationName)); + self->sourceShardName)); } Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]); @@ -132,8 +141,8 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des { SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index]; - if (splitCopyInfo->shardMinValue <= hashedValue && - splitCopyInfo->shardMaxValue >= hashedValue) + if (splitCopyInfo->destinationShardMinHashValue <= hashedValue && + splitCopyInfo->destinationShardMaxHashValue >= hashedValue) { DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index]; shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver); diff --git a/src/backend/distributed/operations/worker_split_copy_udf.c b/src/backend/distributed/operations/worker_split_copy_udf.c new file mode 100644 index 000000000..28e5b5254 --- /dev/null +++ b/src/backend/distributed/operations/worker_split_copy_udf.c @@ -0,0 +1,103 @@ +/*------------------------------------------------------------------------- + * + * worker_split_copy_udf.c + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "pg_version_compat.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "distributed/multi_executor.h" +#include "distributed/worker_split_copy.h" +#include "distributed/citus_ruleutils.h" + +PG_FUNCTION_INFO_V1(worker_split_shardgroup_copy); +PG_FUNCTION_INFO_V1(worker_split_copy); + +static void +ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo); + +/* + * + */ +Datum +worker_split_copy(PG_FUNCTION_ARGS) +{ + uint64 shardIdToSplitCopy = DatumGetUInt64(PG_GETARG_DATUM(0)); + ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy); + + ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1); + if (array_contains_nulls(splitCopyInfoArrayObject)) + { + ereport(ERROR, + (errmsg("Shard Copy Info cannot have null values."))); + } + + ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, 0 /* slice_ndim */, NULL /* mState */); + Datum copyInfoDatum = 0; + bool isnull = false; + List* splitCopyInfoList = NULL; + while (array_iterate(copyInfo_iterator, ©InfoDatum, &isnull)) + { + SplitCopyInfo *splitCopyInfo = NULL; + ParseSplitCopyInfoDatum(copyInfoDatum, &splitCopyInfo); + + splitCopyInfoList = lappend(splitCopyInfoList, splitCopyInfo); + } + DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(shardIdToSplitCopy, splitCopyInfoList); + + StringInfo selectShardQueryForCopy = makeStringInfo(); + appendStringInfo(selectShardQueryForCopy, + "SELECT * FROM %s;", + generate_qualified_relation_name(shardIntervalToSplitCopy->relationId)); + + ParamListInfo params = NULL; + ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver); + + PG_RETURN_VOID(); +} + +static void +ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo) +{ + HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(splitCopyInfoDatum); + + SplitCopyInfo *copyInfo = palloc0(sizeof(SplitCopyInfo)); + + bool isnull = false; + Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("destination_shard_id for split_copy_info cannot be null."))); + } + copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum); + + Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("destination_shard_min_value for split_copy_info cannot be null."))); + } + char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum)); + copyInfo->destinationShardMinHashValue = pg_strtoint64(destinationMinHash); + + Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("destination_shard_max_value for split_copy_info cannot be null."))); + } + char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum)); + copyInfo->destinationShardMaxHashValue = pg_strtoint64(destinationMaxHash); + + Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", &isnull); + if (isnull) + { + ereport(ERROR, (errmsg("destination_shard_node_id for split_copy_info cannot be null."))); + } + copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum); + + *splitCopyInfo = copyInfo; +} diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql new file mode 100644 index 000000000..a21840783 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql @@ -0,0 +1,17 @@ +DROP TYPE IF EXISTS citus.split_copy_info; +CREATE TYPE citus.split_copy_info AS ( + destination_shard_id bigint, + destination_shard_min_value text, + destination_shard_max_value text, + -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. + -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. + destination_shard_node_id integer); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( + source_shard_id bigint, + splitShardInfo citus.split_shard_info[]) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_copy$$; +COMMENT ON FUNCTION pg_catalog.worker_split_copy(splitShardInfo citus.split_shard_info[]) + IS 'Perform split copy for shard' diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql new file mode 100644 index 000000000..a21840783 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql @@ -0,0 +1,17 @@ +DROP TYPE IF EXISTS citus.split_copy_info; +CREATE TYPE citus.split_copy_info AS ( + destination_shard_id bigint, + destination_shard_min_value text, + destination_shard_max_value text, + -- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support. + -- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough. + destination_shard_node_id integer); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy( + source_shard_id bigint, + splitShardInfo citus.split_shard_info[]) +RETURNS void +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$worker_split_copy$$; +COMMENT ON FUNCTION pg_catalog.worker_split_copy(splitShardInfo citus.split_shard_info[]) + IS 'Perform split copy for shard' diff --git a/src/include/distributed/relation_utils.h b/src/include/distributed/relation_utils.h index 0e512efad..873398f00 100644 --- a/src/include/distributed/relation_utils.h +++ b/src/include/distributed/relation_utils.h @@ -15,12 +15,6 @@ #include "utils/relcache.h" -typedef struct FullRelationName -{ - char *schemaName; - char *relationName; -} FullRelationName; - extern char * RelationGetNamespaceName(Relation relation); #endif /* RELATION_UTILS_H */ diff --git a/src/include/distributed/worker_shard_copy.h b/src/include/distributed/worker_shard_copy.h index c61f86ec5..2db6514de 100644 --- a/src/include/distributed/worker_shard_copy.h +++ b/src/include/distributed/worker_shard_copy.h @@ -15,7 +15,7 @@ struct FullRelationName; extern DestReceiver * CreateShardCopyDestReceiver( - struct FullRelationName* relationName, + char* destinationShardFullyQualifiedName, uint32_t destinationNodeId); #endif /* WORKER_SHARD_COPY_H_ */ diff --git a/src/include/distributed/worker_split_copy.h b/src/include/distributed/worker_split_copy.h index 12e0f4621..a9f664f59 100644 --- a/src/include/distributed/worker_split_copy.h +++ b/src/include/distributed/worker_split_copy.h @@ -14,12 +14,12 @@ typedef struct SplitCopyInfo { - FullRelationName *destinationShard; /* destination shard name */ - int32 shardMinValue; /* min hash value of destination shard */ - int32 shardMaxValue; /* max hash value of destination shard */ - uint32_t nodeId; /* node where split child shard is to be placed */ + uint64 destinationShardId; /* destination shard id */ + int32 destinationShardMinHashValue; /* min hash value of destination shard */ + int32 destinationShardMaxHashValue; /* max hash value of destination shard */ + uint32_t destinationShardNodeId; /* node where split child shard is to be placed */ } SplitCopyInfo; -extern DestReceiver* CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList); +extern DestReceiver* CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList); #endif /* WORKER_SPLIT_COPY_H_ */