Misc code review comments

pull/6029/head
Nitish Upreti 2022-07-08 19:07:25 -07:00
parent f8c2c2555f
commit a515a49f4c
7 changed files with 14 additions and 25 deletions

View File

@ -81,11 +81,11 @@ LookupSplitMode(Oid shardSplitModeOid)
{ {
shardSplitMode = BLOCKING_SPLIT; shardSplitMode = BLOCKING_SPLIT;
} }
/* Extend with other modes as we support them */ /* Extend with other modes as we support them */
else else
{ {
ereport(ERROR, (errmsg("Invalid label for enum: %s", enumLabel))); ereport(ERROR, (errmsg("Invalid split mode: %s. Expected split mode is blocking.",
enumLabel)));
} }
return shardSplitMode; return shardSplitMode;

View File

@ -792,8 +792,10 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
referenceTableForeignConstraintList); referenceTableForeignConstraintList);
List *constraintCommandList = NIL; List *constraintCommandList = NIL;
constraintCommandList = list_concat(constraintCommandList, shardForeignConstraintCommandList); constraintCommandList = list_concat(constraintCommandList,
constraintCommandList = list_concat(constraintCommandList, referenceTableForeignConstraintList); shardForeignConstraintCommandList);
constraintCommandList = list_concat(constraintCommandList,
referenceTableForeignConstraintList);
char *constraintCommand = NULL; char *constraintCommand = NULL;
foreach_ptr(constraintCommand, constraintCommandList) foreach_ptr(constraintCommand, constraintCommandList)

View File

@ -74,7 +74,6 @@ static bool CanUseLocalCopy(uint64 destinationNodeId);
static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool static StringInfo ConstructCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat); useBinaryFormat);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest); static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static bool ShouldSendCopyNow(StringInfo buffer);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState); localCopyOutState);
@ -88,18 +87,6 @@ CanUseLocalCopy(uint64 destinationNodeId)
} }
/*
* ShouldSendCopyNow returns true if the given buffer size exceeds the
* local copy buffer size threshold.
*/
static bool
ShouldSendCopyNow(StringInfo buffer)
{
/* LocalCopyFlushThreshold is in bytes */
return buffer->len > LocalCopyFlushThresholdByte;
}
/* Connect to node with source shard and trigger copy start. */ /* Connect to node with source shard and trigger copy start. */
static void static void
ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest) ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
@ -197,7 +184,7 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
if (copyDest->useLocalCopy) if (copyDest->useLocalCopy)
{ {
WriteLocalTuple(slot, copyDest); WriteLocalTuple(slot, copyDest);
if (ShouldSendCopyNow(copyOutState->fe_msgbuf)) if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
{ {
LocalCopyToShard(copyDest, copyOutState); LocalCopyToShard(copyDest, copyOutState);
} }

View File

@ -197,9 +197,9 @@ CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplit
char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId); char *sourceShardNamePrefix = get_rel_name(shardIntervalToSplitCopy->relationId);
foreach_ptr(splitCopyInfo, splitCopyInfoList) foreach_ptr(splitCopyInfo, splitCopyInfoList)
{ {
char *destinationShardSchemaName = get_namespace_name(get_rel_namespace( char *destinationShardSchemaOid = get_rel_namespace(
shardIntervalToSplitCopy shardIntervalToSplitCopy->relationId);
->relationId)); char *destinationShardSchemaName = get_namespace_name(destinationShardSchemaOid);
char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix); char *destinationShardNameCopy = pstrdup(sourceShardNamePrefix);
AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId); AppendShardIdToName(&destinationShardNameCopy, splitCopyInfo->destinationShardId);

View File

@ -46,8 +46,8 @@ CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
STRICT STRICT
AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$; AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$;
DROP TYPE IF EXISTS citus.split_mode; DROP TYPE citus.split_mode;
DROP TYPE IF EXISTS citus.split_copy_info; DROP TYPE citus.split_copy_info;
DROP FUNCTION pg_catalog.citus_split_shard_by_split_points( DROP FUNCTION pg_catalog.citus_split_shard_by_split_points(
shard_id bigint, shard_id bigint,
split_points text[], split_points text[],

View File

@ -14,4 +14,4 @@ RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$; AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard' IS 'Perform split copy for shard';

View File

@ -14,4 +14,4 @@ RETURNS void
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$; AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard' IS 'Perform split copy for shard';