worker_split_copy UDF

pull/6029/head
Nitish Upreti 2022-06-16 14:47:48 -07:00
parent ab9f92eaa3
commit 77253cdafb
9 changed files with 182 additions and 37 deletions

View File

@ -473,6 +473,7 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
/* Perform Split Copy */
// TODO(niupre) : Use Adaptive execution for creating multiple indexes parallely.
/* Create Indexes post copy */
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{

View File

@ -38,7 +38,7 @@ typedef struct ShardCopyDestReceiver
DestReceiver pub;
/* Destination Relation Name */
FullRelationName *destinationRelation;
char* destinationShardFullyQualifiedName;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
@ -69,7 +69,7 @@ static void ShardCopyDestReceiverStartup(DestReceiver *dest, int operation,
static void ShardCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat);
static StringInfo ConstructCopyStatement(char* destinationShardFullyQualifiedName, bool useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState);
static bool ShouldSendCopyNow(StringInfo buffer);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
@ -113,7 +113,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
NULL);
ClaimConnectionExclusively(copyDest->connection);
StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationRelation,
StringInfo copyStatement = ConstructCopyStatement(copyDest->destinationShardFullyQualifiedName,
copyDest->destinationNodeId);
ExecuteCriticalRemoteCommand(copyDest->connection, copyStatement->data);
}
@ -139,7 +139,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to shard %s,",
copyDest->destinationRelation->relationName),
copyDest->destinationShardFullyQualifiedName),
errdetail("failed to send %d bytes %s", copyOutState->fe_msgbuf->len,
copyOutState->fe_msgbuf->data)));
}
@ -201,7 +201,7 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)
{
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("Failed to COPY to destination shard %s",
copyDest->destinationRelation->relationName)));
copyDest->destinationShardFullyQualifiedName)));
}
/* check whether there were any COPY errors */
@ -240,11 +240,11 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
* for copying into a result table
*/
static StringInfo
ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat)
ConstructCopyStatement(char *destinationShardFullyQualifiedName, bool useBinaryFormat)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY %s FROM STDIN",
quote_qualified_identifier(relation->schemaName, relation->relationName));
destinationShardFullyQualifiedName);
if(useBinaryFormat)
{
@ -255,7 +255,7 @@ ConstructCopyStatement(FullRelationName *relation, bool useBinaryFormat)
}
DestReceiver * CreateShardCopyDestReceiver(
FullRelationName* destinationRelation,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) palloc0(
@ -269,7 +269,7 @@ DestReceiver * CreateShardCopyDestReceiver(
copyDest->pub.mydest = DestCopyOut;
copyDest->destinationNodeId = destinationNodeId;
copyDest->destinationRelation = destinationRelation;
copyDest->destinationShardFullyQualifiedName = destinationShardFullyQualifiedName;
copyDest->tuplesSent = 0;
copyDest->connection = NULL;
copyDest->useLocalCopy = CanUseLocalCopy(destinationNodeId);
@ -320,8 +320,12 @@ LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState localCopyOutState
*/
LocalCopyBuffer = localCopyOutState->fe_msgbuf;
Oid destinationSchemaOid = get_namespace_oid(copyDest->destinationRelation->schemaName, false /* missing_ok */);
Oid destinationShardOid = get_relname_relid(copyDest->destinationRelation->relationName, destinationSchemaOid);
char *destinationShardSchemaName = NULL;
char *destinationShardRelationName = NULL;
DeconstructQualifiedName(list_make1(copyDest->destinationShardFullyQualifiedName), &destinationShardSchemaName, &destinationShardRelationName);
Oid destinationSchemaOid = get_namespace_oid(destinationShardSchemaName, false /* missing_ok */);
Oid destinationShardOid = get_relname_relid(destinationShardRelationName, destinationSchemaOid);
DefElem *binaryFormatOption = NULL;
if (isBinaryCopy)

View File

@ -14,11 +14,13 @@
#include "postgres.h"
#include "catalog/namespace.h"
#include "utils/lsyscache.h"
#include "utils/builtins.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/relation_utils.h"
#include "distributed/worker_split_copy.h"
#include "distributed/worker_shard_copy.h"
#include "distributed/relay_utility.h"
typedef struct SplitCopyDestReceiver
{
@ -35,10 +37,10 @@ typedef struct SplitCopyDestReceiver
uint splitFactor;
/* Source shard name */
FullRelationName *sourceShardName;
char *sourceShardName;
/* Source shard Oid */
Oid sourceShardOid;
Oid sourceShardRelationOid;
} SplitCopyDestReceiver;
@ -49,7 +51,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot,
static void SplitCopyDestReceiverShutdown(DestReceiver *dest);
static void SplitCopyDestReceiverDestroy(DestReceiver *copyDest);
DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList)
DestReceiver * CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList)
{
SplitCopyDestReceiver *splitCopyDest =
palloc0(sizeof(SplitCopyDestReceiver));
@ -60,22 +62,29 @@ DestReceiver * CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List*
splitCopyDest->pub.rShutdown = SplitCopyDestReceiverShutdown;
splitCopyDest->pub.rDestroy = SplitCopyDestReceiverDestroy;
Oid sourceSchemaOid = get_namespace_oid(sourceShard->schemaName, false /* missing_ok */);
Oid sourceShardOid = get_relname_relid(sourceShard->relationName, sourceSchemaOid);
splitCopyDest->sourceShardOid = sourceShardOid;
splitCopyDest->splitFactor = splitCopyInfoList->length;
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(sourceShardIdToCopy);
splitCopyDest->sourceShardRelationOid = shardIntervalToSplitCopy->relationId;
DestReceiver **shardCopyDests = palloc0(splitCopyDest->splitFactor * sizeof(DestReceiver *));
SplitCopyInfo **splitCopyInfos = palloc0(splitCopyDest->splitFactor * sizeof(SplitCopyInfo *));
SplitCopyInfo *splitCopyInfo = NULL;
int index = 0;
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList)
{
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace(splitCopyDest->sourceShardRelationOid));;
char *destinationShardNameCopy = strdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);
char *destinationShardFullyQualifiedName =
quote_qualified_identifier(destinationShardSchemaName, destinationShardNameCopy);
DestReceiver *shardCopyDest = CreateShardCopyDestReceiver(
splitCopyInfo->destinationShard,
splitCopyInfo->nodeId);
destinationShardFullyQualifiedName,
splitCopyInfo->destinationShardNodeId);
shardCopyDests[index] = shardCopyDest;
splitCopyInfos[index] = splitCopyInfo;
@ -103,11 +112,11 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
{
SplitCopyDestReceiver *self = (SplitCopyDestReceiver *) dest;
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardOid);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(self->sourceShardRelationOid);
if (cacheEntry == NULL)
{
ereport(ERROR, errmsg("Could not find shard %s for split copy.",
self->sourceShardName->relationName));
self->sourceShardName));
}
/* Partition Column Metadata on source shard */
@ -122,7 +131,7 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
if (columnNulls[partitionColumnIndex])
{
ereport(ERROR, errmsg("Found null partition value for shard %s during split copy.",
self->sourceShardName->relationName));
self->sourceShardName));
}
Datum hashedValueDatum = FunctionCall1(hashFunction, columnValues[partitionColumnIndex]);
@ -132,8 +141,8 @@ static bool SplitCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *des
{
SplitCopyInfo *splitCopyInfo = self->splitCopyInfoArray[index];
if (splitCopyInfo->shardMinValue <= hashedValue &&
splitCopyInfo->shardMaxValue >= hashedValue)
if (splitCopyInfo->destinationShardMinHashValue <= hashedValue &&
splitCopyInfo->destinationShardMaxHashValue >= hashedValue)
{
DestReceiver *shardCopyDestReceiver = self->shardCopyDestReceiverArray[index];
shardCopyDestReceiver->receiveSlot(slot, shardCopyDestReceiver);

View File

@ -0,0 +1,103 @@
/*-------------------------------------------------------------------------
*
* worker_split_copy_udf.c
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pg_version_compat.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_split_copy.h"
#include "distributed/citus_ruleutils.h"
PG_FUNCTION_INFO_V1(worker_split_shardgroup_copy);
PG_FUNCTION_INFO_V1(worker_split_copy);
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo);
/*
*
*/
Datum
worker_split_copy(PG_FUNCTION_ARGS)
{
uint64 shardIdToSplitCopy = DatumGetUInt64(PG_GETARG_DATUM(0));
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy);
ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1);
if (array_contains_nulls(splitCopyInfoArrayObject))
{
ereport(ERROR,
(errmsg("Shard Copy Info cannot have null values.")));
}
ArrayIterator copyInfo_iterator = array_create_iterator(splitCopyInfoArrayObject, 0 /* slice_ndim */, NULL /* mState */);
Datum copyInfoDatum = 0;
bool isnull = false;
List* splitCopyInfoList = NULL;
while (array_iterate(copyInfo_iterator, &copyInfoDatum, &isnull))
{
SplitCopyInfo *splitCopyInfo = NULL;
ParseSplitCopyInfoDatum(copyInfoDatum, &splitCopyInfo);
splitCopyInfoList = lappend(splitCopyInfoList, splitCopyInfo);
}
DestReceiver *splitCopyDestReceiver = CreateSplitCopyDestReceiver(shardIdToSplitCopy, splitCopyInfoList);
StringInfo selectShardQueryForCopy = makeStringInfo();
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;",
generate_qualified_relation_name(shardIntervalToSplitCopy->relationId));
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params, (DestReceiver *) splitCopyDestReceiver);
PG_RETURN_VOID();
}
static void
ParseSplitCopyInfoDatum(Datum splitCopyInfoDatum, SplitCopyInfo** splitCopyInfo)
{
HeapTupleHeader dataTuple = DatumGetHeapTupleHeader(splitCopyInfoDatum);
SplitCopyInfo *copyInfo = palloc0(sizeof(SplitCopyInfo));
bool isnull = false;
Datum destinationShardIdDatum = GetAttributeByName(dataTuple, "destination_shard_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_id for split_copy_info cannot be null.")));
}
copyInfo->destinationShardId = DatumGetUInt64(destinationShardIdDatum);
Datum minValueDatum = GetAttributeByName(dataTuple, "destination_shard_min_value", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_min_value for split_copy_info cannot be null.")));
}
char *destinationMinHash = text_to_cstring(DatumGetTextP(minValueDatum));
copyInfo->destinationShardMinHashValue = pg_strtoint64(destinationMinHash);
Datum maxValueDatum = GetAttributeByName(dataTuple, "destination_shard_max_value", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_max_value for split_copy_info cannot be null.")));
}
char *destinationMaxHash = text_to_cstring(DatumGetTextP(maxValueDatum));
copyInfo->destinationShardMaxHashValue = pg_strtoint64(destinationMaxHash);
Datum nodeIdDatum = GetAttributeByName(dataTuple, "destination_shard_node_id", &isnull);
if (isnull)
{
ereport(ERROR, (errmsg("destination_shard_node_id for split_copy_info cannot be null.")));
}
copyInfo->destinationShardNodeId = DatumGetInt32(nodeIdDatum);
*splitCopyInfo = copyInfo;
}

View File

@ -0,0 +1,17 @@
DROP TYPE IF EXISTS citus.split_copy_info;
CREATE TYPE citus.split_copy_info AS (
destination_shard_id bigint,
destination_shard_min_value text,
destination_shard_max_value text,
-- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support.
-- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough.
destination_shard_node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint,
splitShardInfo citus.split_shard_info[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(splitShardInfo citus.split_shard_info[])
IS 'Perform split copy for shard'

View File

@ -0,0 +1,17 @@
DROP TYPE IF EXISTS citus.split_copy_info;
CREATE TYPE citus.split_copy_info AS (
destination_shard_id bigint,
destination_shard_min_value text,
destination_shard_max_value text,
-- A 'nodeId' is a uint32 in CITUS [1, 4294967296] but postgres does not have unsigned type support.
-- Use integer (consistent with other previously defined UDFs that take nodeId as integer) as for all practical purposes it is big enough.
destination_shard_node_id integer);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint,
splitShardInfo citus.split_shard_info[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(splitShardInfo citus.split_shard_info[])
IS 'Perform split copy for shard'

View File

@ -15,12 +15,6 @@
#include "utils/relcache.h"
typedef struct FullRelationName
{
char *schemaName;
char *relationName;
} FullRelationName;
extern char * RelationGetNamespaceName(Relation relation);
#endif /* RELATION_UTILS_H */

View File

@ -15,7 +15,7 @@
struct FullRelationName;
extern DestReceiver * CreateShardCopyDestReceiver(
struct FullRelationName* relationName,
char* destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
#endif /* WORKER_SHARD_COPY_H_ */

View File

@ -14,12 +14,12 @@
typedef struct SplitCopyInfo
{
FullRelationName *destinationShard; /* destination shard name */
int32 shardMinValue; /* min hash value of destination shard */
int32 shardMaxValue; /* max hash value of destination shard */
uint32_t nodeId; /* node where split child shard is to be placed */
uint64 destinationShardId; /* destination shard id */
int32 destinationShardMinHashValue; /* min hash value of destination shard */
int32 destinationShardMaxHashValue; /* max hash value of destination shard */
uint32_t destinationShardNodeId; /* node where split child shard is to be placed */
} SplitCopyInfo;
extern DestReceiver* CreateSplitCopyDestReceiver(FullRelationName *sourceShard, List* splitCopyInfoList);
extern DestReceiver* CreateSplitCopyDestReceiver(uint64 sourceShardIdToCopy, List* splitCopyInfoList);
#endif /* WORKER_SPLIT_COPY_H_ */