diff --git a/src/backend/distributed/operations/auto_shard_split.c b/src/backend/distributed/operations/auto_shard_split.c index 7f1431254..2fd56be15 100644 --- a/src/backend/distributed/operations/auto_shard_split.c +++ b/src/backend/distributed/operations/auto_shard_split.c @@ -36,7 +36,7 @@ typedef struct ShardInfoData int64 shardminvalue; int64 shardmaxvalue; int64 shardid; - int64 nodeid; + int32 nodeid; char *tablename; char *distributionColumn; char *datatype; @@ -77,7 +77,7 @@ ErrorOnConcurrentOperation() * For a given SplitPoints , it creates the SQL query for the shard Splitting */ StringInfo -GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints) +GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) { StringInfo splitQuery = makeStringInfo(); @@ -87,16 +87,16 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints) for (int i = 0; i < length - 1; i++) { - appendStringInfo(splitQuery, "'%ld',", DatumGetInt64(list_nth(SplitPoints, i))); + appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i)); } - appendStringInfo(splitQuery, "'%ld'], ARRAY[", DatumGetInt64(list_nth(SplitPoints, - length - 1))); + appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints, + length - 1)); for (int i = 0; i < length; i++) { - appendStringInfo(splitQuery, "%ld,", shardinfo->nodeid); + appendStringInfo(splitQuery, "%d,", shardinfo->nodeid); } - appendStringInfo(splitQuery, "%ld], 'block_writes')", shardinfo->nodeid); + appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(shardSplitMode)); return splitQuery; } @@ -106,10 +106,10 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints) * It creates a background job for citus_split_shard_by_split_points and executes it in background. */ void -ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints) +ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode) { StringInfo splitQuery = makeStringInfo(); - splitQuery = GetShardSplitQuery(shardinfo, SplitPoints); + splitQuery = GetShardSplitQuery(shardinfo, SplitPoints , shardSplitMode); ereport(LOG, (errmsg(splitQuery->data))); int32 nodesInvolved[1]; nodesInvolved[0] = shardinfo->nodeid; @@ -193,7 +193,8 @@ FindShardSplitPoints(ShardInfo shardinfo) SPI_exec(CommonValueQuery->data, 0); MemoryContext spiContext = CurrentMemoryContext; int64 rowCount = SPI_processed; - int64 average, hashedValue; + int64 average; + int32 hashedValue; ereport(LOG, errmsg("%ld", rowCount)); if (rowCount > 0) @@ -212,20 +213,20 @@ FindShardSplitPoints(ShardInfo shardinfo) varcollid, tenantIdDatum); hashedValue = DatumGetInt32(hashedValueDatum); - ereport(LOG, errmsg("%ld", hashedValue)); + ereport(LOG, errmsg("%d", hashedValue)); MemoryContextSwitchTo(originalContext); if (hashedValue == shardinfo->shardminvalue) { - SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue)); + SplitPoints = lappend_unique_int(SplitPoints,hashedValue); } else if (hashedValue == shardinfo->shardmaxvalue) { - SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1)); + SplitPoints = lappend_unique_int(SplitPoints, hashedValue - 1); } else { - SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1)); - SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue)); + SplitPoints = lappend_unique_int(SplitPoints, hashedValue - 1); + SplitPoints = lappend_unique_int(SplitPoints, hashedValue); } MemoryContextSwitchTo(spiContext); } @@ -247,7 +248,7 @@ FindShardSplitPoints(ShardInfo shardinfo) { if (shardinfo->shardminvalue <= average) { - SplitPoints = lappend(SplitPoints, Int64GetDatum(average)); + SplitPoints = lappend_int(SplitPoints, average); } } @@ -260,7 +261,7 @@ FindShardSplitPoints(ShardInfo shardinfo) * split and then executes the background job for the shard split. */ void -ScheduleShardSplit(ShardInfo shardinfo) +ScheduleShardSplit(ShardInfo shardinfo , char* shardSplitMode) { List *SplitPoints = FindShardSplitPoints(shardinfo); if (list_length(SplitPoints) > 0) @@ -268,7 +269,8 @@ ScheduleShardSplit(ShardInfo shardinfo) ErrorOnConcurrentOperation(); int64 jobId = CreateBackgroundJob("Automatic Shard Split", "Split using SplitPoints List"); - ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints); + ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints,shardSplitMode)->data)); + ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints , shardSplitMode); } else { @@ -304,8 +306,12 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) MaxShardSize ); - ereport(LOG, errmsg("%s", query->data)); - + ereport(LOG, errmsg("%s", query->data)); + char *shardSplitMode; + Oid shardTransferModeOid = PG_GETARG_OID(0); + Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid); + shardSplitMode = DatumGetCString(enumLabelDatum); + ereport(LOG,errmsg("%s",shardSplitMode)); if (SPI_connect() != SPI_OK_CONNECT) { elog(ERROR, "SPI_connect to the query failed"); @@ -331,7 +337,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) shardinfo.shardsize = DatumGetInt64(shardSize); Datum nodeId = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull); - shardinfo.nodeid = DatumGetInt64(nodeId); + shardinfo.nodeid = DatumGetInt32(nodeId); char *shardMinVal = SPI_getvalue(tuple, tupletable->tupdesc, 2); shardinfo.shardminvalue = strtoi64(shardMinVal, NULL, 10); @@ -356,11 +362,10 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS) distributionColumn); shardinfo.datatype = format_type_be(shardinfo.distributionColumnId); - char *shardSplitMode; - - ScheduleShardSplit(&shardinfo); + + ScheduleShardSplit(&shardinfo , shardSplitMode); ereport(LOG, (errmsg( - "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %ld", + "Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d", shardinfo.shardid, shardinfo.shardminvalue, shardinfo.shardmaxvalue, shardinfo.shardsize, shardinfo.nodeid)));