mirror of https://github.com/citusdata/citus.git
Added ShardtransferMode and removed duplicates from SplitPoints
parent
88cc7faa52
commit
3d02752b10
|
@ -36,7 +36,7 @@ typedef struct ShardInfoData
|
|||
int64 shardminvalue;
|
||||
int64 shardmaxvalue;
|
||||
int64 shardid;
|
||||
int64 nodeid;
|
||||
int32 nodeid;
|
||||
char *tablename;
|
||||
char *distributionColumn;
|
||||
char *datatype;
|
||||
|
@ -77,7 +77,7 @@ ErrorOnConcurrentOperation()
|
|||
* For a given SplitPoints , it creates the SQL query for the shard Splitting
|
||||
*/
|
||||
StringInfo
|
||||
GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints)
|
||||
GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode)
|
||||
{
|
||||
StringInfo splitQuery = makeStringInfo();
|
||||
|
||||
|
@ -87,16 +87,16 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints)
|
|||
|
||||
for (int i = 0; i < length - 1; i++)
|
||||
{
|
||||
appendStringInfo(splitQuery, "'%ld',", DatumGetInt64(list_nth(SplitPoints, i)));
|
||||
appendStringInfo(splitQuery, "'%d',", list_nth_int(SplitPoints, i));
|
||||
}
|
||||
appendStringInfo(splitQuery, "'%ld'], ARRAY[", DatumGetInt64(list_nth(SplitPoints,
|
||||
length - 1)));
|
||||
appendStringInfo(splitQuery, "'%d'], ARRAY[", list_nth_int(SplitPoints,
|
||||
length - 1));
|
||||
|
||||
for (int i = 0; i < length; i++)
|
||||
{
|
||||
appendStringInfo(splitQuery, "%ld,", shardinfo->nodeid);
|
||||
appendStringInfo(splitQuery, "%d,", shardinfo->nodeid);
|
||||
}
|
||||
appendStringInfo(splitQuery, "%ld], 'block_writes')", shardinfo->nodeid);
|
||||
appendStringInfo(splitQuery, "%d], %s)", shardinfo->nodeid, quote_literal_cstr(shardSplitMode));
|
||||
|
||||
return splitQuery;
|
||||
}
|
||||
|
@ -106,10 +106,10 @@ GetShardSplitQuery(ShardInfo shardinfo, List *SplitPoints)
|
|||
* It creates a background job for citus_split_shard_by_split_points and executes it in background.
|
||||
*/
|
||||
void
|
||||
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints)
|
||||
ExecuteSplitBackgroundJob(int64 jobid, ShardInfo shardinfo, List *SplitPoints , char* shardSplitMode)
|
||||
{
|
||||
StringInfo splitQuery = makeStringInfo();
|
||||
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints);
|
||||
splitQuery = GetShardSplitQuery(shardinfo, SplitPoints , shardSplitMode);
|
||||
ereport(LOG, (errmsg(splitQuery->data)));
|
||||
int32 nodesInvolved[1];
|
||||
nodesInvolved[0] = shardinfo->nodeid;
|
||||
|
@ -193,7 +193,8 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
SPI_exec(CommonValueQuery->data, 0);
|
||||
MemoryContext spiContext = CurrentMemoryContext;
|
||||
int64 rowCount = SPI_processed;
|
||||
int64 average, hashedValue;
|
||||
int64 average;
|
||||
int32 hashedValue;
|
||||
ereport(LOG, errmsg("%ld", rowCount));
|
||||
|
||||
if (rowCount > 0)
|
||||
|
@ -212,20 +213,20 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
varcollid,
|
||||
tenantIdDatum);
|
||||
hashedValue = DatumGetInt32(hashedValueDatum);
|
||||
ereport(LOG, errmsg("%ld", hashedValue));
|
||||
ereport(LOG, errmsg("%d", hashedValue));
|
||||
MemoryContextSwitchTo(originalContext);
|
||||
if (hashedValue == shardinfo->shardminvalue)
|
||||
{
|
||||
SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue));
|
||||
SplitPoints = lappend_unique_int(SplitPoints,hashedValue);
|
||||
}
|
||||
else if (hashedValue == shardinfo->shardmaxvalue)
|
||||
{
|
||||
SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1));
|
||||
SplitPoints = lappend_unique_int(SplitPoints, hashedValue - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue - 1));
|
||||
SplitPoints = lappend(SplitPoints, Int64GetDatum(hashedValue));
|
||||
SplitPoints = lappend_unique_int(SplitPoints, hashedValue - 1);
|
||||
SplitPoints = lappend_unique_int(SplitPoints, hashedValue);
|
||||
}
|
||||
MemoryContextSwitchTo(spiContext);
|
||||
}
|
||||
|
@ -247,7 +248,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
{
|
||||
if (shardinfo->shardminvalue <= average)
|
||||
{
|
||||
SplitPoints = lappend(SplitPoints, Int64GetDatum(average));
|
||||
SplitPoints = lappend_int(SplitPoints, average);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -260,7 +261,7 @@ FindShardSplitPoints(ShardInfo shardinfo)
|
|||
* split and then executes the background job for the shard split.
|
||||
*/
|
||||
void
|
||||
ScheduleShardSplit(ShardInfo shardinfo)
|
||||
ScheduleShardSplit(ShardInfo shardinfo , char* shardSplitMode)
|
||||
{
|
||||
List *SplitPoints = FindShardSplitPoints(shardinfo);
|
||||
if (list_length(SplitPoints) > 0)
|
||||
|
@ -268,7 +269,8 @@ ScheduleShardSplit(ShardInfo shardinfo)
|
|||
ErrorOnConcurrentOperation();
|
||||
int64 jobId = CreateBackgroundJob("Automatic Shard Split",
|
||||
"Split using SplitPoints List");
|
||||
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints);
|
||||
ereport(LOG,errmsg("%s",GetShardSplitQuery(shardinfo,SplitPoints,shardSplitMode)->data));
|
||||
ExecuteSplitBackgroundJob(jobId, shardinfo, SplitPoints , shardSplitMode);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -304,8 +306,12 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
|||
MaxShardSize
|
||||
);
|
||||
|
||||
ereport(LOG, errmsg("%s", query->data));
|
||||
|
||||
ereport(LOG, errmsg("%s", query->data));
|
||||
char *shardSplitMode;
|
||||
Oid shardTransferModeOid = PG_GETARG_OID(0);
|
||||
Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardTransferModeOid);
|
||||
shardSplitMode = DatumGetCString(enumLabelDatum);
|
||||
ereport(LOG,errmsg("%s",shardSplitMode));
|
||||
if (SPI_connect() != SPI_OK_CONNECT)
|
||||
{
|
||||
elog(ERROR, "SPI_connect to the query failed");
|
||||
|
@ -331,7 +337,7 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
|||
shardinfo.shardsize = DatumGetInt64(shardSize);
|
||||
|
||||
Datum nodeId = SPI_getbinval(tuple, tupletable->tupdesc, 5, &isnull);
|
||||
shardinfo.nodeid = DatumGetInt64(nodeId);
|
||||
shardinfo.nodeid = DatumGetInt32(nodeId);
|
||||
|
||||
char *shardMinVal = SPI_getvalue(tuple, tupletable->tupdesc, 2);
|
||||
shardinfo.shardminvalue = strtoi64(shardMinVal, NULL, 10);
|
||||
|
@ -356,11 +362,10 @@ citus_auto_shard_split_start(PG_FUNCTION_ARGS)
|
|||
distributionColumn);
|
||||
shardinfo.datatype = format_type_be(shardinfo.distributionColumnId);
|
||||
|
||||
char *shardSplitMode;
|
||||
|
||||
ScheduleShardSplit(&shardinfo);
|
||||
|
||||
ScheduleShardSplit(&shardinfo , shardSplitMode);
|
||||
ereport(LOG, (errmsg(
|
||||
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %ld",
|
||||
"Shard ID: %ld,ShardMinValue: %ld, ShardMaxValue: %ld , totalSize: %ld , nodeId: %d",
|
||||
shardinfo.shardid, shardinfo.shardminvalue,
|
||||
shardinfo.shardmaxvalue,
|
||||
shardinfo.shardsize, shardinfo.nodeid)));
|
||||
|
|
Loading…
Reference in New Issue