First formatting attempt

Skipped csql, ruleutils, readfuncs, and functions obviously copied from
PostgreSQL. Seeing how this looks, then continuing.
pull/327/head
Jason Petersen 2016-02-05 13:18:54 -07:00
parent 334f800016
commit fdb37682b2
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
55 changed files with 1391 additions and 1314 deletions

View File

@ -271,17 +271,27 @@ ReceiveCopyData(StringInfo copyData)
switch (messageType) switch (messageType)
{ {
case 'd': /* CopyData */ case 'd': /* CopyData */
{
copyDone = false; copyDone = false;
break; break;
}
case 'c': /* CopyDone */ case 'c': /* CopyDone */
{
copyDone = true; copyDone = true;
break; break;
}
case 'f': /* CopyFail */ case 'f': /* CopyFail */
{
ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY data failed: %s", pq_getmsgstring(copyData)))); errmsg("COPY data failed: %s", pq_getmsgstring(copyData))));
break; break;
}
case 'H': /* Flush */ case 'H': /* Flush */
case 'S': /* Sync */ case 'S': /* Sync */
{
/* /*
* Ignore Flush/Sync for the convenience of client libraries (such * Ignore Flush/Sync for the convenience of client libraries (such
* as libpq) that may send those without noticing that the command * as libpq) that may send those without noticing that the command
@ -289,12 +299,16 @@ ReceiveCopyData(StringInfo copyData)
*/ */
copyDone = false; copyDone = false;
break; break;
}
default: default:
{
ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type 0x%02X during COPY data", errmsg("unexpected message type 0x%02X during COPY data",
messageType))); messageType)));
break; break;
} }
}
return copyDone; return copyDone;
} }

View File

@ -157,7 +157,6 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
queryDesc->plannedstmt = masterSelectPlan; queryDesc->plannedstmt = masterSelectPlan;
eflags |= EXEC_FLAG_CITUS_MASTER_SELECT; eflags |= EXEC_FLAG_CITUS_MASTER_SELECT;
} }
} }
/* if the execution is not done for router executor, drop into standard executor */ /* if the execution is not done for router executor, drop into standard executor */

View File

@ -287,8 +287,9 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
uint32 currentCount = taskExecution->connectPollCount; uint32 currentCount = taskExecution->connectPollCount;
if (currentCount >= maxCount) if (currentCount >= maxCount)
{ {
ereport(WARNING, (errmsg("could not establish asynchronous connection " ereport(WARNING, (errmsg("could not establish asynchronous "
"after %u ms", REMOTE_NODE_CONNECT_TIMEOUT))); "connection after %u ms",
REMOTE_NODE_CONNECT_TIMEOUT)));
taskStatusArray[currentIndex] = EXEC_TASK_FAILED; taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
} }
@ -342,7 +343,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
{ {
List *dataFetchTaskList = task->dependedTaskList; List *dataFetchTaskList = task->dependedTaskList;
int32 dataFetchTaskIndex = taskExecution->dataFetchTaskIndex; int32 dataFetchTaskIndex = taskExecution->dataFetchTaskIndex;
Task *dataFetchTask = (Task *) list_nth(dataFetchTaskList, dataFetchTaskIndex); Task *dataFetchTask = (Task *) list_nth(dataFetchTaskList,
dataFetchTaskIndex);
char *dataFetchQuery = dataFetchTask->queryString; char *dataFetchQuery = dataFetchTask->queryString;
int32 connectionId = connectionIdArray[currentIndex]; int32 connectionId = connectionIdArray[currentIndex];
@ -411,11 +413,13 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
StringInfo computeTaskQuery = makeStringInfo(); StringInfo computeTaskQuery = makeStringInfo();
if (BinaryMasterCopyFormat) if (BinaryMasterCopyFormat)
{ {
appendStringInfo(computeTaskQuery, COPY_QUERY_TO_STDOUT_BINARY, queryString); appendStringInfo(computeTaskQuery, COPY_QUERY_TO_STDOUT_BINARY,
queryString);
} }
else else
{ {
appendStringInfo(computeTaskQuery, COPY_QUERY_TO_STDOUT_TEXT, queryString); appendStringInfo(computeTaskQuery, COPY_QUERY_TO_STDOUT_TEXT,
queryString);
} }
querySent = MultiClientSendQuery(connectionId, computeTaskQuery->data); querySent = MultiClientSendQuery(connectionId, computeTaskQuery->data);
@ -475,7 +479,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution)
else else
{ {
ereport(WARNING, (errcode_for_file_access(), ereport(WARNING, (errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", filename))); errmsg("could not open file \"%s\": %m",
filename)));
taskStatusArray[currentIndex] = EXEC_TASK_FAILED; taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
} }

View File

@ -80,6 +80,7 @@ RouterExecutorStart(QueryDesc *queryDesc, int eflags, Task *task)
queryDesc->estate = executorState; queryDesc->estate = executorState;
#if (PG_VERSION_NUM < 90500) #if (PG_VERSION_NUM < 90500)
/* make sure that upsertQuery is false for versions that UPSERT is not available */ /* make sure that upsertQuery is false for versions that UPSERT is not available */
Assert(task->upsertQuery == false); Assert(task->upsertQuery == false);
#endif #endif
@ -219,9 +220,9 @@ RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, Tas
} }
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
/* /*
* ExecuteDistributedModify is the main entry point for modifying distributed * ExecuteDistributedModify is the main entry point for modifying distributed
* tables. A distributed modification is successful if any placement of the * tables. A distributed modification is successful if any placement of the
@ -532,6 +533,7 @@ StoreQueryResult(PGconn *connection, TupleDesc tupleDescriptor,
return true; return true;
} }
/* /*
* RouterExecutorFinish cleans up after a distributed execution. * RouterExecutorFinish cleans up after a distributed execution.
*/ */

View File

@ -41,7 +41,6 @@ typedef struct TaskMapKey
TaskType taskType; TaskType taskType;
uint64 jobId; uint64 jobId;
uint32 taskId; uint32 taskId;
} TaskMapKey; } TaskMapKey;
@ -53,7 +52,6 @@ typedef struct TaskMapEntry
{ {
TaskMapKey key; TaskMapKey key;
Task *task; Task *task;
} TaskMapEntry; } TaskMapEntry;
@ -83,7 +81,8 @@ static TaskTracker * TrackerHashLookup(HTAB *trackerHash, const char *nodeName,
static TaskExecStatus ManageTaskExecution(TaskTracker *taskTracker, static TaskExecStatus ManageTaskExecution(TaskTracker *taskTracker,
TaskTracker *sourceTaskTracker, TaskTracker *sourceTaskTracker,
Task *task, TaskExecution *taskExecution); Task *task, TaskExecution *taskExecution);
static TransmitExecStatus ManageTransmitExecution(TaskTracker *transmitTracker, Task *task, static TransmitExecStatus ManageTransmitExecution(TaskTracker *transmitTracker,
Task *task,
TaskExecution *taskExecution); TaskExecution *taskExecution);
static bool TaskExecutionsCompleted(List *taskList); static bool TaskExecutionsCompleted(List *taskList);
static StringInfo MapFetchTaskQueryString(Task *mapFetchTask, Task *mapTask); static StringInfo MapFetchTaskQueryString(Task *mapFetchTask, Task *mapTask);
@ -826,7 +825,8 @@ TrackerConnectPoll(TaskTracker *taskTracker)
uint32 nodePort = taskTracker->workerPort; uint32 nodePort = taskTracker->workerPort;
char *nodeDatabase = get_database_name(MyDatabaseId); char *nodeDatabase = get_database_name(MyDatabaseId);
int32 connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase); int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
nodeDatabase);
if (connectionId != INVALID_CONNECTION_ID) if (connectionId != INVALID_CONNECTION_ID)
{ {
taskTracker->connectionId = connectionId; taskTracker->connectionId = connectionId;
@ -869,8 +869,9 @@ TrackerConnectPoll(TaskTracker *taskTracker)
uint32 currentCount = taskTracker->connectPollCount; uint32 currentCount = taskTracker->connectPollCount;
if (currentCount >= maxCount) if (currentCount >= maxCount)
{ {
ereport(WARNING, (errmsg("could not establish asynchronous connection " ereport(WARNING, (errmsg("could not establish asynchronous "
"after %u ms", REMOTE_NODE_CONNECT_TIMEOUT))); "connection after %u ms",
REMOTE_NODE_CONNECT_TIMEOUT)));
taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED; taskTracker->trackerStatus = TRACKER_CONNECTION_FAILED;
@ -1212,7 +1213,8 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
default: default:
{ {
/* we fatal here to avoid leaking client-side resources */ /* we fatal here to avoid leaking client-side resources */
ereport(FATAL, (errmsg("invalid execution status: %d", currentExecutionStatus))); ereport(FATAL, (errmsg("invalid execution status: %d",
currentExecutionStatus)));
break; break;
} }
} }
@ -1327,7 +1329,8 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
else else
{ {
ereport(WARNING, (errcode_for_file_access(), ereport(WARNING, (errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", filename))); errmsg("could not open file \"%s\": %m",
filename)));
nextTransmitStatus = EXEC_TRANSMIT_TRACKER_RETRY; nextTransmitStatus = EXEC_TRANSMIT_TRACKER_RETRY;
} }
@ -1463,7 +1466,8 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
default: default:
{ {
/* we fatal here to avoid leaking client-side resources */ /* we fatal here to avoid leaking client-side resources */
ereport(FATAL, (errmsg("invalid transmit status: %d", currentTransmitStatus))); ereport(FATAL, (errmsg("invalid transmit status: %d",
currentTransmitStatus)));
break; break;
} }
} }
@ -2833,7 +2837,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
if (queryStatus == CLIENT_QUERY_DONE) if (queryStatus == CLIENT_QUERY_DONE)
{ {
ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT ereport(DEBUG4, (errmsg("completed cleanup query for job " UINT64_FORMAT
" on node \"%s:%u\"", jobId, nodeName, nodePort))); " on node \"%s:%u\"", jobId, nodeName,
nodePort)));
/* clear connection for future cleanup queries */ /* clear connection for future cleanup queries */
taskTracker->connectionBusy = false; taskTracker->connectionBusy = false;

View File

@ -905,8 +905,9 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString,
} }
else else
{ {
ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT " on " ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT
"node %s:%d", shardId, workerName, workerPort))); " on node %s:%d", shardId, workerName,
workerPort)));
} }
isFirstPlacement = false; isFirstPlacement = false;

View File

@ -167,7 +167,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
shardPlacementList = ShardPlacementList(shardId); shardPlacementList = ShardPlacementList(shardId);
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement =
(ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = shardPlacement->nodeName; char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort; uint32 workerPort = shardPlacement->nodePort;
bool dropSuccessful = false; bool dropSuccessful = false;
@ -176,14 +177,17 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
char tableType = get_rel_relkind(relationId); char tableType = get_rel_relkind(relationId);
if (tableType == RELKIND_RELATION) if (tableType == RELKIND_RELATION)
{ {
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, quotedShardName); appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
quotedShardName);
} }
else if (tableType == RELKIND_FOREIGN_TABLE) else if (tableType == RELKIND_FOREIGN_TABLE)
{ {
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, quotedShardName); appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND,
quotedShardName);
} }
dropSuccessful = ExecuteRemoteCommand(workerName, workerPort, workerDropQuery); dropSuccessful = ExecuteRemoteCommand(workerName, workerPort,
workerDropQuery);
if (dropSuccessful) if (dropSuccessful)
{ {
droppedPlacementList = lappend(droppedPlacementList, shardPlacement); droppedPlacementList = lappend(droppedPlacementList, shardPlacement);
@ -227,7 +231,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
if (QueryCancelPending) if (QueryCancelPending)
{ {
ereport(WARNING, (errmsg("cancel requests are ignored during shard deletion"))); ereport(WARNING, (errmsg("cancel requests are ignored during shard "
"deletion")));
QueryCancelPending = false; QueryCancelPending = false;
} }

View File

@ -709,11 +709,16 @@ hostname_client_addr(void)
#ifdef HAVE_IPV6 #ifdef HAVE_IPV6
case AF_INET6: case AF_INET6:
#endif #endif
{
break; break;
}
default: default:
{
ereport(ERROR, (errmsg("invalid address family in connection"))); ereport(ERROR, (errmsg("invalid address family in connection")));
break; break;
} }
}
remoteHost = palloc0(remoteHostLen); remoteHost = palloc0(remoteHostLen);

View File

@ -393,6 +393,7 @@ DistributedModifyTask(Query *query)
query->onConflict = RebuildOnConflict(relationId, query->onConflict); query->onConflict = RebuildOnConflict(relationId, query->onConflict);
} }
#else #else
/* always set to false for PG_VERSION_NUM < 90500 */ /* always set to false for PG_VERSION_NUM < 90500 */
upsertQuery = false; upsertQuery = false;
#endif #endif
@ -414,6 +415,7 @@ DistributedModifyTask(Query *query)
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
/* /*
* RebuildOnConflict rebuilds OnConflictExpr for correct deparsing. The function * RebuildOnConflict rebuilds OnConflictExpr for correct deparsing. The function
* makes WHERE clause elements explicit and filters dropped columns * makes WHERE clause elements explicit and filters dropped columns
@ -468,6 +470,8 @@ RebuildOnConflict(Oid relationId, OnConflictExpr *originalOnConflict)
return updatedOnConflict; return updatedOnConflict;
} }
#endif #endif

View File

@ -54,7 +54,8 @@ static bool JoinExprListWalker(Node *node, List **joinList);
static bool ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex); static bool ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex);
static List * MergeShardIntervals(List *leftShardIntervalList, static List * MergeShardIntervals(List *leftShardIntervalList,
List *rightShardIntervalList, JoinType joinType); List *rightShardIntervalList, JoinType joinType);
static bool ShardIntervalsMatch(List *leftShardIntervalList, List *rightShardIntervalList); static bool ShardIntervalsMatch(List *leftShardIntervalList,
List *rightShardIntervalList);
static List * LoadSortedShardIntervalList(Oid relationId); static List * LoadSortedShardIntervalList(Oid relationId);
static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList, static List * JoinOrderForTable(TableEntry *firstTable, List *tableEntryList,
List *joinClauseList); List *joinClauseList);
@ -68,31 +69,41 @@ static List * TableEntryListDifference(List *lhsTableList, List *rhsTableList);
static TableEntry * FindTableEntry(List *tableEntryList, uint32 tableId); static TableEntry * FindTableEntry(List *tableEntryList, uint32 tableId);
/* Local functions forward declarations for join evaluations */ /* Local functions forward declarations for join evaluations */
static JoinOrderNode * EvaluateJoinRules(List *joinedTableList, JoinOrderNode *currentJoinNode, static JoinOrderNode * EvaluateJoinRules(List *joinedTableList,
TableEntry *candidateTable, List *candidateShardList, JoinOrderNode *currentJoinNode,
TableEntry *candidateTable,
List *candidateShardList,
List *joinClauseList, JoinType joinType); List *joinClauseList, JoinType joinType);
static List * RangeTableIdList(List *tableList); static List * RangeTableIdList(List *tableList);
static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType); static RuleEvalFunction JoinRuleEvalFunction(JoinRuleType ruleType);
static char * JoinRuleName(JoinRuleType ruleType); static char * JoinRuleName(JoinRuleType ruleType);
static JoinOrderNode * BroadcastJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * BroadcastJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *candidateShardList, List *applicableJoinClauses, List *candidateShardList,
List *applicableJoinClauses,
JoinType joinType); JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *candidateShardList, List *applicableJoinClauses, List *candidateShardList, List *applicableJoinClauses,
JoinType joinType); JoinType joinType);
static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn, static bool JoinOnColumns(Var *currentPartitioncolumn, Var *candidatePartitionColumn,
List *joinClauseList); List *joinClauseList);
static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode,
List *candidateShardList, List *applicableJoinClauses, TableEntry *candidateTable,
List *candidateShardList,
List *applicableJoinClauses,
JoinType joinType); JoinType joinType);
static JoinOrderNode * DualPartitionJoin(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * DualPartitionJoin(JoinOrderNode *joinNode,
List *candidateShardList, List *applicableJoinClauses, TableEntry *candidateTable,
List *candidateShardList,
List *applicableJoinClauses,
JoinType joinType); JoinType joinType);
static JoinOrderNode * CartesianProduct(JoinOrderNode *joinNode, TableEntry *candidateTable, static JoinOrderNode * CartesianProduct(JoinOrderNode *joinNode,
List *candidateShardList, List *applicableJoinClauses, TableEntry *candidateTable,
List *candidateShardList,
List *applicableJoinClauses,
JoinType joinType); JoinType joinType);
static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType joinRuleType, static JoinOrderNode * MakeJoinOrderNode(TableEntry *tableEntry, JoinRuleType
Var *partitionColumn, char partitionMethod); joinRuleType, Var *partitionColumn,
char partitionMethod);
/* /*
@ -199,7 +210,6 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList)
"query"), "query"),
errdetail("Shards of relations in outer join queries " errdetail("Shards of relations in outer join queries "
"must have 1-to-1 shard partitioning"))); "must have 1-to-1 shard partitioning")));
} }
} }
else else
@ -586,7 +596,8 @@ ShardIntervalsMatch(List *leftShardIntervalList, List *rightShardIntervalList)
nextRightIntervalCell = lnext(rightShardIntervalCell); nextRightIntervalCell = lnext(rightShardIntervalCell);
if (nextRightIntervalCell != NULL) if (nextRightIntervalCell != NULL)
{ {
ShardInterval *nextRightInterval = (ShardInterval *) lfirst(nextRightIntervalCell); ShardInterval *nextRightInterval =
(ShardInterval *) lfirst(nextRightIntervalCell);
shardIntervalsIntersect = ShardIntervalsOverlap(leftInterval, shardIntervalsIntersect = ShardIntervalsOverlap(leftInterval,
nextRightInterval); nextRightInterval);
if (shardIntervalsIntersect) if (shardIntervalsIntersect)

View File

@ -91,7 +91,8 @@ static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode,
/* Local functions forward declarations for aggregate expressions */ /* Local functions forward declarations for aggregate expressions */
static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode,
MultiExtendedOp *masterNode, MultiExtendedOp *workerNode); MultiExtendedOp *masterNode,
MultiExtendedOp *workerNode);
static void TransformSubqueryNode(MultiTable *subqueryNode); static void TransformSubqueryNode(MultiTable *subqueryNode);
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode); static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode);
static Node * MasterAggregateMutator(Node *originalNode, AttrNumber *columnId); static Node * MasterAggregateMutator(Node *originalNode, AttrNumber *columnId);
@ -117,7 +118,8 @@ static void ErrorIfUnsupportedArrayAggregate(Aggref *arrayAggregateExpression);
static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression, static void ErrorIfUnsupportedAggregateDistinct(Aggref *aggregateExpression,
MultiNode *logicalPlanNode); MultiNode *logicalPlanNode);
static Var * AggregateDistinctColumn(Aggref *aggregateExpression); static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode,
Var *distinctColumn); Var *distinctColumn);
static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column); static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
@ -1980,23 +1982,31 @@ CountDistinctHashFunctionName(Oid argumentType)
switch (argumentType) switch (argumentType)
{ {
case INT4OID: case INT4OID:
{
hashFunctionName = pstrdup(HLL_HASH_INTEGER_FUNC_NAME); hashFunctionName = pstrdup(HLL_HASH_INTEGER_FUNC_NAME);
break; break;
}
case INT8OID: case INT8OID:
{
hashFunctionName = pstrdup(HLL_HASH_BIGINT_FUNC_NAME); hashFunctionName = pstrdup(HLL_HASH_BIGINT_FUNC_NAME);
break; break;
}
case TEXTOID: case TEXTOID:
case BPCHAROID: case BPCHAROID:
case VARCHAROID: case VARCHAROID:
{
hashFunctionName = pstrdup(HLL_HASH_TEXT_FUNC_NAME); hashFunctionName = pstrdup(HLL_HASH_TEXT_FUNC_NAME);
break; break;
}
default: default:
{
hashFunctionName = pstrdup(HLL_HASH_ANY_FUNC_NAME); hashFunctionName = pstrdup(HLL_HASH_ANY_FUNC_NAME);
break; break;
} }
}
return hashFunctionName; return hashFunctionName;
} }
@ -3229,6 +3239,7 @@ SupportedLateralQuery(Query *parentQuery, Query *lateralQuery)
CompositeFieldRecursive(outerQueryExpression, parentQuery); CompositeFieldRecursive(outerQueryExpression, parentQuery);
FieldSelect *localCompositeField = FieldSelect *localCompositeField =
CompositeFieldRecursive(localQueryExpression, lateralQuery); CompositeFieldRecursive(localQueryExpression, lateralQuery);
/* /*
* If partition colums are composite fields, add them to list to * If partition colums are composite fields, add them to list to
* check later if all composite fields are used. * check later if all composite fields are used.

View File

@ -285,6 +285,7 @@ MultiPlanTree(Query *queryTree)
else else
{ {
bool hasOuterJoin = false; bool hasOuterJoin = false;
/* /*
* We calculate the join order using the list of tables in the query and * We calculate the join order using the list of tables in the query and
* the join clauses between them. Note that this function owns the table * the join clauses between them. Note that this function owns the table
@ -465,6 +466,7 @@ ErrorIfQueryNotSupported(Query *queryTree)
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
/* HasTablesample returns tree if the query contains tablesample */ /* HasTablesample returns tree if the query contains tablesample */
static bool static bool
HasTablesample(Query *queryTree) HasTablesample(Query *queryTree)
@ -485,6 +487,8 @@ HasTablesample(Query *queryTree)
return hasTablesample; return hasTablesample;
} }
#endif #endif
@ -529,7 +533,8 @@ HasUnsupportedJoinWalker(Node *node, void *context)
* ErrorIfSubqueryNotSupported checks that we can perform distributed planning for * ErrorIfSubqueryNotSupported checks that we can perform distributed planning for
* the given subquery. * the given subquery.
*/ */
static void ErrorIfSubqueryNotSupported(Query *subqueryTree) static void
ErrorIfSubqueryNotSupported(Query *subqueryTree)
{ {
char *errorDetail = NULL; char *errorDetail = NULL;
bool preconditionsSatisfied = true; bool preconditionsSatisfied = true;
@ -587,7 +592,6 @@ HasOuterJoin(Query *queryTree)
static bool static bool
HasOuterJoinWalker(Node *node, void *context) HasOuterJoinWalker(Node *node, void *context)
{ {
bool hasOuterJoin = false; bool hasOuterJoin = false;
if (node == NULL) if (node == NULL)
{ {

View File

@ -155,7 +155,8 @@ static StringInfo DatumArrayString(Datum *datumArray, uint32 datumCount, Oid dat
static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, static Task * CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType,
char *queryString); char *queryString);
static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList); static void UpdateRangeTableAlias(List *rangeTableList, List *fragmentList);
static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment); static Alias * FragmentAlias(RangeTblEntry *rangeTableEntry,
RangeTableFragment *fragment);
static uint64 AnchorShardId(List *fragmentList, uint32 anchorRangeTableId); static uint64 AnchorShardId(List *fragmentList, uint32 anchorRangeTableId);
static List * PruneSqlTaskDependencies(List *sqlTaskList); static List * PruneSqlTaskDependencies(List *sqlTaskList);
static List * AssignTaskList(List *sqlTaskList); static List * AssignTaskList(List *sqlTaskList);
@ -309,6 +310,7 @@ BuildJobTree(MultiTreeRoot *multiTree)
partitionKey, partitionType, partitionKey, partitionType,
baseRelationId, baseRelationId,
JOIN_MAP_MERGE_JOB); JOIN_MAP_MERGE_JOB);
/* reset depended job list */ /* reset depended job list */
loopDependedJobList = NIL; loopDependedJobList = NIL;
loopDependedJobList = list_make1(mapMergeJob); loopDependedJobList = list_make1(mapMergeJob);
@ -1093,7 +1095,8 @@ QueryJoinTree(MultiNode *multiNode, List *dependedJobList, List **rangeTableList
uint32 columnCount = (uint32) list_length(dependedTargetList); uint32 columnCount = (uint32) list_length(dependedTargetList);
List *columnNameList = DerivedColumnNameList(columnCount, dependedJob->jobId); List *columnNameList = DerivedColumnNameList(columnCount, dependedJob->jobId);
RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(multiNode, columnNameList, RangeTblEntry *rangeTableEntry = DerivedRangeTableEntry(multiNode,
columnNameList,
tableIdList); tableIdList);
RangeTblRef *rangeTableRef = makeNode(RangeTblRef); RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
@ -1864,6 +1867,7 @@ SplitPointObject(ShardInterval **shardIntervalArray, uint32 shardIntervalCount)
return splitPointObject; return splitPointObject;
} }
/* ------------------------------------------------------------ /* ------------------------------------------------------------
* Functions that relate to building and assigning tasks follow * Functions that relate to building and assigning tasks follow
* ------------------------------------------------------------ * ------------------------------------------------------------
@ -4046,6 +4050,7 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
return alias; return alias;
} }
/* /*
* AnchorShardId walks over each fragment in the given fragment list, finds the * AnchorShardId walks over each fragment in the given fragment list, finds the
* fragment that corresponds to the given anchor range tableId, and returns this * fragment that corresponds to the given anchor range tableId, and returns this
@ -4984,6 +4989,7 @@ RoundRobinAssignTaskList(List *taskList)
return taskList; return taskList;
} }
/* /*
* RoundRobinReorder implements the core of the round-robin assignment policy. * RoundRobinReorder implements the core of the round-robin assignment policy.
* It takes a task and placement list and rotates a copy of the placement list * It takes a task and placement list and rotates a copy of the placement list
@ -5116,7 +5122,8 @@ ActiveShardPlacementLists(List *taskList)
List *activeShardPlacementList = ActivePlacementList(shardPlacementList); List *activeShardPlacementList = ActivePlacementList(shardPlacementList);
/* sort shard placements by their insertion time */ /* sort shard placements by their insertion time */
activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); activeShardPlacementList = SortList(activeShardPlacementList,
CompareShardPlacements);
shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList); shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList);
} }
@ -5257,7 +5264,8 @@ AssignDualHashTaskList(List *taskList)
uint32 replicaIndex = 0; uint32 replicaIndex = 0;
for (replicaIndex = 0; replicaIndex < ShardReplicationFactor; replicaIndex++) for (replicaIndex = 0; replicaIndex < ShardReplicationFactor; replicaIndex++)
{ {
uint32 assignmentOffset = beginningNodeIndex + assignedTaskIndex + replicaIndex; uint32 assignmentOffset = beginningNodeIndex + assignedTaskIndex +
replicaIndex;
uint32 assignmentIndex = assignmentOffset % workerNodeCount; uint32 assignmentIndex = assignmentOffset % workerNodeCount;
WorkerNode *workerNode = list_nth(workerNodeList, assignmentIndex); WorkerNode *workerNode = list_nth(workerNodeList, assignmentIndex);

View File

@ -205,20 +205,31 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
switch (relationNameListLength) switch (relationNameListLength)
{ {
case 1: case 1:
{
relationNameValue = linitial(relationNameList); relationNameValue = linitial(relationNameList);
break; break;
}
case 2: case 2:
{
relationNameValue = lsecond(relationNameList); relationNameValue = lsecond(relationNameList);
break; break;
}
case 3: case 3:
{
relationNameValue = lthird(relationNameList); relationNameValue = lthird(relationNameList);
break; break;
}
default: default:
{
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
errmsg("improper relation name: \"%s\"", errmsg("improper relation name: \"%s\"",
NameListToString(relationNameList)))); NameListToString(relationNameList))));
break; break;
} }
}
relationName = &(relationNameValue->val.str); relationName = &(relationNameValue->val.str);
AppendShardIdToName(relationName, shardId); AppendShardIdToName(relationName, shardId);

View File

@ -206,9 +206,10 @@ RegisterCitusConfigVariables(void)
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citusdb.expire_cached_shards", "citusdb.expire_cached_shards",
gettext_noop("Enables shard cache expiration if a shard's size on disk has changed. "), gettext_noop("Enables shard cache expiration if a shard's size on disk has "
gettext_noop("When appending to an existing shard, old data may still be cached on " "changed."),
"other workers. This configuration entry activates automatic " gettext_noop("When appending to an existing shard, old data may still be cached "
"on other workers. This configuration entry activates automatic "
"expiration, but should not be used with manual updates to shards."), "expiration, but should not be used with manual updates to shards."),
&ExpireCachedShards, &ExpireCachedShards,
false, false,
@ -488,6 +489,7 @@ RegisterCitusConfigVariables(void)
/* warn about config items in the citusdb namespace that are not registered above */ /* warn about config items in the citusdb namespace that are not registered above */
EmitWarningsOnPlaceholders("citusdb"); EmitWarningsOnPlaceholders("citusdb");
/* Also warn about citus namespace, as that's a very likely misspelling */ /* Also warn about citus namespace, as that's a very likely misspelling */
EmitWarningsOnPlaceholders("citus"); EmitWarningsOnPlaceholders("citus");
} }
@ -515,8 +517,10 @@ NormalizeWorkerListPath(void)
{ {
absoluteFileName = malloc(strlen(DataDir) + strlen(WORKER_LIST_FILENAME) + 2); absoluteFileName = malloc(strlen(DataDir) + strlen(WORKER_LIST_FILENAME) + 2);
if (absoluteFileName == NULL) if (absoluteFileName == NULL)
{
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory"))); errmsg("out of memory")));
}
sprintf(absoluteFileName, "%s/%s", DataDir, WORKER_LIST_FILENAME); sprintf(absoluteFileName, "%s/%s", DataDir, WORKER_LIST_FILENAME);
} }
@ -530,6 +534,7 @@ NormalizeWorkerListPath(void)
"environment variable.\n", progname, ConfigFileName))); "environment variable.\n", progname, ConfigFileName)));
} }
SetConfigOption("citusdb.worker_list_file", absoluteFileName, PGC_POSTMASTER, PGC_S_OVERRIDE); SetConfigOption("citusdb.worker_list_file", absoluteFileName, PGC_POSTMASTER,
PGC_S_OVERRIDE);
free(absoluteFileName); free(absoluteFileName);
} }

View File

@ -273,9 +273,13 @@ GetRangeTblKind(RangeTblEntry *rte)
case RTE_JOIN: case RTE_JOIN:
case RTE_VALUES: case RTE_VALUES:
case RTE_CTE: case RTE_CTE:
{
rteKind = (CitusRTEKind) rte->rtekind; rteKind = (CitusRTEKind) rte->rtekind;
break; break;
}
case RTE_FUNCTION: case RTE_FUNCTION:
{
/* /*
* Extract extra data - correct even if a plain RTE_FUNCTION, not * Extract extra data - correct even if a plain RTE_FUNCTION, not
* an extended one, ExtractRangeTblExtraData handles that case * an extended one, ExtractRangeTblExtraData handles that case
@ -284,6 +288,7 @@ GetRangeTblKind(RangeTblEntry *rte)
ExtractRangeTblExtraData(rte, &rteKind, NULL, NULL, NULL); ExtractRangeTblExtraData(rte, &rteKind, NULL, NULL, NULL);
break; break;
} }
}
return rteKind; return rteKind;
} }

View File

@ -447,22 +447,36 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
switch (attributeForm->attstorage) switch (attributeForm->attstorage)
{ {
case 'p': case 'p':
{
storageName = "PLAIN"; storageName = "PLAIN";
break; break;
}
case 'e': case 'e':
{
storageName = "EXTERNAL"; storageName = "EXTERNAL";
break; break;
}
case 'm': case 'm':
{
storageName = "MAIN"; storageName = "MAIN";
break; break;
}
case 'x': case 'x':
{
storageName = "EXTENDED"; storageName = "EXTENDED";
break; break;
}
default: default:
{
ereport(ERROR, (errmsg("unrecognized storage type: %c", ereport(ERROR, (errmsg("unrecognized storage type: %c",
attributeForm->attstorage))); attributeForm->attstorage)));
break; break;
} }
}
appendStringInfo(&statement, "ALTER COLUMN %s ", appendStringInfo(&statement, "ALTER COLUMN %s ",
quote_identifier(attributeName)); quote_identifier(attributeName));

View File

@ -87,6 +87,7 @@ IsDistributedTable(Oid relationId)
return cacheEntry->isDistributedTable; return cacheEntry->isDistributedTable;
} }
/* /*
* LoadShardInterval reads shard metadata for given shardId from pg_dist_shard, * LoadShardInterval reads shard metadata for given shardId from pg_dist_shard,
* and converts min/max values in these metadata to their properly typed datum * and converts min/max values in these metadata to their properly typed datum
@ -139,6 +140,7 @@ LoadShardInterval(uint64 shardId)
return shardInterval; return shardInterval;
} }
/* /*
* DistributedTableCacheEntry looks up a pg_dist_partition entry for a * DistributedTableCacheEntry looks up a pg_dist_partition entry for a
* relation. * relation.

View File

@ -22,7 +22,8 @@
#include "distributed/multi_resowner.h" #include "distributed/multi_resowner.h"
typedef struct JobDirectoryEntry { typedef struct JobDirectoryEntry
{
ResourceOwner owner; ResourceOwner owner;
uint64 jobId; uint64 jobId;
} JobDirectoryEntry; } JobDirectoryEntry;
@ -91,15 +92,17 @@ ResourceOwnerEnlargeJobDirectories(ResourceOwner owner)
if (RegisteredJobDirectories == NULL) if (RegisteredJobDirectories == NULL)
{ {
newMax = 16; newMax = 16;
RegisteredJobDirectories = (JobDirectoryEntry *) RegisteredJobDirectories =
MemoryContextAlloc(TopMemoryContext, newMax * sizeof(JobDirectoryEntry)); (JobDirectoryEntry *) MemoryContextAlloc(TopMemoryContext,
newMax * sizeof(JobDirectoryEntry));
NumAllocatedJobDirectories = newMax; NumAllocatedJobDirectories = newMax;
} }
else if (NumRegisteredJobDirectories + 1 > NumAllocatedJobDirectories) else if (NumRegisteredJobDirectories + 1 > NumAllocatedJobDirectories)
{ {
newMax = NumAllocatedJobDirectories * 2; newMax = NumAllocatedJobDirectories * 2;
RegisteredJobDirectories = (JobDirectoryEntry *) RegisteredJobDirectories =
repalloc(RegisteredJobDirectories, newMax * sizeof(JobDirectoryEntry)); (JobDirectoryEntry *) repalloc(RegisteredJobDirectories,
newMax * sizeof(JobDirectoryEntry));
NumAllocatedJobDirectories = newMax; NumAllocatedJobDirectories = newMax;
} }
} }
@ -135,7 +138,8 @@ ResourceOwnerForgetJobDirectory(ResourceOwner owner, uint64 jobId)
/* move all later entries one up */ /* move all later entries one up */
while (jobIndex < lastJobIndex) while (jobIndex < lastJobIndex)
{ {
RegisteredJobDirectories[jobIndex] = RegisteredJobDirectories[jobIndex + 1]; RegisteredJobDirectories[jobIndex] =
RegisteredJobDirectories[jobIndex + 1];
jobIndex++; jobIndex++;
} }
NumRegisteredJobDirectories = lastJobIndex; NumRegisteredJobDirectories = lastJobIndex;

View File

@ -494,6 +494,7 @@ TrackerDelayLoop(void)
} }
} }
/* ------------------------------------------------------------ /* ------------------------------------------------------------
* Signal handling and shared hash initialization functions follow * Signal handling and shared hash initialization functions follow
* ------------------------------------------------------------ * ------------------------------------------------------------
@ -579,8 +580,8 @@ TaskTrackerShmemInit(void)
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
/* allocate struct containing task tracker related shared state */ /* allocate struct containing task tracker related shared state */
WorkerTasksSharedState = (WorkerTasksSharedStateData *) WorkerTasksSharedState =
ShmemInitStruct("Worker Task Control", (WorkerTasksSharedStateData *) ShmemInitStruct("Worker Task Control",
sizeof(WorkerTasksSharedStateData), sizeof(WorkerTasksSharedStateData),
&alreadyInitialized); &alreadyInitialized);
@ -607,6 +608,7 @@ TaskTrackerShmemInit(void)
} }
} }
/* ------------------------------------------------------------ /* ------------------------------------------------------------
* Task scheduling and management functions follow * Task scheduling and management functions follow
* ------------------------------------------------------------ * ------------------------------------------------------------

View File

@ -331,7 +331,7 @@ UpdateTask(WorkerTask *workerTask, char *taskCallString)
if (taskStatus == TASK_SUCCEEDED || taskStatus == TASK_CANCEL_REQUESTED || if (taskStatus == TASK_SUCCEEDED || taskStatus == TASK_CANCEL_REQUESTED ||
taskStatus == TASK_CANCELED) taskStatus == TASK_CANCELED)
{ {
; /* nothing to do */ /* nothing to do */
} }
else if (taskStatus == TASK_PERMANENTLY_FAILED) else if (taskStatus == TASK_PERMANENTLY_FAILED)
{ {

View File

@ -53,11 +53,14 @@ static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
static void DeleteFile(const char *filename); static void DeleteFile(const char *filename);
static void FetchTableCommon(text *tableName, uint64 remoteTableSize, static void FetchTableCommon(text *tableName, uint64 remoteTableSize,
ArrayType *nodeNameObject, ArrayType *nodePortObject, ArrayType *nodeNameObject, ArrayType *nodePortObject,
bool (*FetchTableFunction) (const char *, uint32, StringInfo)); bool (*FetchTableFunction)(const char *, uint32,
StringInfo));
static uint64 LocalTableSize(Oid relationId); static uint64 LocalTableSize(Oid relationId);
static uint64 ExtractShardId(StringInfo tableName); static uint64 ExtractShardId(StringInfo tableName);
static bool FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName); static bool FetchRegularTable(const char *nodeName, uint32 nodePort,
static bool FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName); StringInfo tableName);
static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
StringInfo tableName);
static List * TableDDLCommandList(const char *nodeName, uint32 nodePort, static List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
StringInfo tableName); StringInfo tableName);
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
@ -309,7 +312,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort,
} }
else if (copyStatus == CLIENT_COPY_MORE) else if (copyStatus == CLIENT_COPY_MORE)
{ {
; /* remote node will continue to send more data */ /* remote node will continue to send more data */
} }
else else
{ {

View File

@ -256,8 +256,8 @@ JobSchemaName(uint64 jobId)
*/ */
#ifdef HAVE_INTTYPES_H #ifdef HAVE_INTTYPES_H
StringInfo jobSchemaName = makeStringInfo(); StringInfo jobSchemaName = makeStringInfo();
appendStringInfo(jobSchemaName, "%s%0*"PRIu64, appendStringInfo(jobSchemaName, "%s%0*" PRIu64, JOB_SCHEMA_PREFIX,
JOB_SCHEMA_PREFIX, MIN_JOB_DIRNAME_WIDTH, jobId); MIN_JOB_DIRNAME_WIDTH, jobId);
#else #else
StringInfo jobSchemaName = makeStringInfo(); StringInfo jobSchemaName = makeStringInfo();
appendStringInfo(jobSchemaName, "%s%0*llu", appendStringInfo(jobSchemaName, "%s%0*llu",

View File

@ -24,7 +24,8 @@ extern char *pg_get_indexclusterdef_string(Oid indexRelationId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */ /* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer); extern void pg_get_query_def(Query *query, StringInfo buffer);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer); extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo
buffer);
extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_relation_name(Oid relid, List *namespaces);

View File

@ -38,7 +38,6 @@ typedef struct ShardInterval
Datum minValue; /* a shard's typed min value datum */ Datum minValue; /* a shard's typed min value datum */
Datum maxValue; /* a shard's typed max value datum */ Datum maxValue; /* a shard's typed max value datum */
uint64 shardId; uint64 shardId;
} ShardInterval; } ShardInterval;
@ -52,7 +51,6 @@ typedef struct ShardPlacement
RelayFileState shardState; RelayFileState shardState;
char *nodeName; char *nodeName;
uint32 nodePort; uint32 nodePort;
} ShardPlacement; } ShardPlacement;

View File

@ -49,10 +49,10 @@
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
/* Remote call definitions to help with data staging and deletion */ /* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND "SELECT worker_apply_shard_ddl_command \ #define WORKER_APPLY_SHARD_DDL_COMMAND \
("UINT64_FORMAT", %s)" "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPEND_TABLE_TO_SHARD "SELECT worker_append_table_to_shard \ #define WORKER_APPEND_TABLE_TO_SHARD \
(%s, %s, %s, %u)" "SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
#define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s" #define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')"
@ -67,7 +67,6 @@ typedef enum
SHARD_PLACEMENT_INVALID_FIRST = 0, SHARD_PLACEMENT_INVALID_FIRST = 0,
SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1, SHARD_PLACEMENT_LOCAL_NODE_FIRST = 1,
SHARD_PLACEMENT_ROUND_ROBIN = 2 SHARD_PLACEMENT_ROUND_ROBIN = 2
} ShardPlacementPolicyType; } ShardPlacementPolicyType;

View File

@ -24,6 +24,7 @@
#define INVALID_TASK_ID 0 #define INVALID_TASK_ID 0
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
/* reserved alias name for UPSERTs */ /* reserved alias name for UPSERTs */
#define UPSERT_ALIAS "citus_table_alias" #define UPSERT_ALIAS "citus_table_alias"
#endif #endif

View File

@ -29,7 +29,6 @@ typedef enum
CLIENT_CONNECTION_BAD = 1, CLIENT_CONNECTION_BAD = 1,
CLIENT_CONNECTION_BUSY = 2, CLIENT_CONNECTION_BUSY = 2,
CLIENT_CONNECTION_READY = 3 CLIENT_CONNECTION_READY = 3
} ConnectStatus; } ConnectStatus;
@ -40,7 +39,6 @@ typedef enum
CLIENT_RESULT_UNAVAILABLE = 1, CLIENT_RESULT_UNAVAILABLE = 1,
CLIENT_RESULT_BUSY = 2, CLIENT_RESULT_BUSY = 2,
CLIENT_RESULT_READY = 3 CLIENT_RESULT_READY = 3
} ResultStatus; } ResultStatus;
@ -51,7 +49,6 @@ typedef enum
CLIENT_QUERY_FAILED = 1, CLIENT_QUERY_FAILED = 1,
CLIENT_QUERY_DONE = 2, CLIENT_QUERY_DONE = 2,
CLIENT_QUERY_COPY = 3 CLIENT_QUERY_COPY = 3
} QueryStatus; } QueryStatus;
@ -62,7 +59,6 @@ typedef enum
CLIENT_COPY_MORE = 1, CLIENT_COPY_MORE = 1,
CLIENT_COPY_FAILED = 2, CLIENT_COPY_FAILED = 2,
CLIENT_COPY_DONE = 3 CLIENT_COPY_DONE = 3
} CopyStatus; } CopyStatus;
@ -73,7 +69,6 @@ typedef enum
CLIENT_BATCH_QUERY_FAILED = 1, CLIENT_BATCH_QUERY_FAILED = 1,
CLIENT_BATCH_QUERY_CONTINUE = 2, CLIENT_BATCH_QUERY_CONTINUE = 2,
CLIENT_BATCH_QUERY_DONE = 3 CLIENT_BATCH_QUERY_DONE = 3
} BatchQueryStatus; } BatchQueryStatus;

View File

@ -40,7 +40,6 @@ typedef enum JoinRuleType
* RuleNameArray. * RuleNameArray.
*/ */
JOIN_RULE_LAST JOIN_RULE_LAST
} JoinRuleType; } JoinRuleType;
@ -53,7 +52,6 @@ typedef struct TableEntry
{ {
Oid relationId; Oid relationId;
uint32 rangeTableId; uint32 rangeTableId;
} TableEntry; } TableEntry;
@ -72,7 +70,6 @@ typedef struct JoinOrderNode
char partitionMethod; char partitionMethod;
List *joinClauseList; /* not relevant for the first table */ List *joinClauseList; /* not relevant for the first table */
List *shardIntervalList; List *shardIntervalList;
} JoinOrderNode; } JoinOrderNode;

View File

@ -55,7 +55,6 @@ typedef enum
AGGREGATE_SUM = 4, AGGREGATE_SUM = 4,
AGGREGATE_COUNT = 5, AGGREGATE_COUNT = 5,
AGGREGATE_ARRAY_AGG = 6 AGGREGATE_ARRAY_AGG = 6
} AggregateType; } AggregateType;
@ -69,7 +68,6 @@ typedef enum
PUSH_DOWN_VALID = 1, PUSH_DOWN_VALID = 1,
PUSH_DOWN_NOT_VALID = 2, PUSH_DOWN_NOT_VALID = 2,
PUSH_DOWN_SPECIAL_CONDITIONS = 3 PUSH_DOWN_SPECIAL_CONDITIONS = 3
} PushDownStatus; } PushDownStatus;
@ -82,7 +80,6 @@ typedef enum
PULL_UP_INVALID_FIRST = 0, PULL_UP_INVALID_FIRST = 0,
PULL_UP_VALID = 1, PULL_UP_VALID = 1,
PULL_UP_NOT_VALID = 2 PULL_UP_NOT_VALID = 2
} PullUpStatus; } PullUpStatus;
@ -97,8 +94,10 @@ typedef enum
* Please note that the order of elements in this array is tied to the order of * Please note that the order of elements in this array is tied to the order of
* values in the preceding AggregateType enum. This order needs to be preserved. * values in the preceding AggregateType enum. This order needs to be preserved.
*/ */
static const char * const AggregateNames[] = { "invalid", "avg", "min", "max", static const char *const AggregateNames[] = {
"sum", "count", "array_agg" }; "invalid", "avg", "min", "max", "sum",
"count", "array_agg"
};
/* Config variable managed via guc.c */ /* Config variable managed via guc.c */

View File

@ -40,8 +40,8 @@ typedef struct MultiNode
CitusNodeTag type; CitusNodeTag type;
struct MultiNode *parentNode; struct MultiNode *parentNode;
/* child node(s) are defined in unary and binary nodes */
/* child node(s) are defined in unary and binary nodes */
} MultiNode; } MultiNode;
@ -51,7 +51,6 @@ typedef struct MultiUnaryNode
MultiNode node; MultiNode node;
struct MultiNode *childNode; struct MultiNode *childNode;
} MultiUnaryNode; } MultiUnaryNode;
@ -62,7 +61,6 @@ typedef struct MultiBinaryNode
struct MultiNode *leftChildNode; struct MultiNode *leftChildNode;
struct MultiNode *rightChildNode; struct MultiNode *rightChildNode;
} MultiBinaryNode; } MultiBinaryNode;
@ -73,7 +71,6 @@ typedef struct MultiBinaryNode
typedef struct MultiTreeRoot typedef struct MultiTreeRoot
{ {
MultiUnaryNode unaryNode; MultiUnaryNode unaryNode;
} MultiTreeRoot; } MultiTreeRoot;
@ -91,7 +88,6 @@ typedef struct MultiTable
Alias *alias; Alias *alias;
Alias *referenceNames; Alias *referenceNames;
Query *subquery; /* this field is only valid for non-relation subquery types */ Query *subquery; /* this field is only valid for non-relation subquery types */
} MultiTable; } MultiTable;
@ -100,7 +96,6 @@ typedef struct MultiProject
{ {
MultiUnaryNode unaryNode; MultiUnaryNode unaryNode;
List *columnList; List *columnList;
} MultiProject; } MultiProject;
@ -112,7 +107,6 @@ typedef struct MultiProject
typedef struct MultiCollect typedef struct MultiCollect
{ {
MultiUnaryNode unaryNode; MultiUnaryNode unaryNode;
} MultiCollect; } MultiCollect;
@ -125,7 +119,6 @@ typedef struct MultiSelect
{ {
MultiUnaryNode unaryNode; MultiUnaryNode unaryNode;
List *selectClauseList; List *selectClauseList;
} MultiSelect; } MultiSelect;
@ -140,7 +133,6 @@ typedef struct MultiJoin
List *joinClauseList; List *joinClauseList;
JoinRuleType joinRuleType; JoinRuleType joinRuleType;
JoinType joinType; JoinType joinType;
} MultiJoin; } MultiJoin;
@ -150,7 +142,6 @@ typedef struct MultiPartition
MultiUnaryNode unaryNode; MultiUnaryNode unaryNode;
Var *partitionColumn; Var *partitionColumn;
uint32 splitPointTableId; uint32 splitPointTableId;
} MultiPartition; } MultiPartition;
@ -158,7 +149,6 @@ typedef struct MultiPartition
typedef struct MultiCartesianProduct typedef struct MultiCartesianProduct
{ {
MultiBinaryNode binaryNode; MultiBinaryNode binaryNode;
} MultiCartesianProduct; } MultiCartesianProduct;
@ -183,7 +173,6 @@ typedef struct MultiExtendedOp
List *sortClauseList; List *sortClauseList;
Node *limitCount; Node *limitCount;
Node *limitOffset; Node *limitOffset;
} MultiExtendedOp; } MultiExtendedOp;

View File

@ -40,7 +40,8 @@
(" UINT64_FORMAT ", %d, %s, '%s', %d, %d)" (" UINT64_FORMAT ", %d, %s, '%s', %d, %d)"
#define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \ #define MERGE_FILES_INTO_TABLE_COMMAND "SELECT worker_merge_files_into_table \
(" UINT64_FORMAT ", %d, '%s', '%s')" (" UINT64_FORMAT ", %d, '%s', '%s')"
#define MERGE_FILES_AND_RUN_QUERY_COMMAND "SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, '%s', '%s')" #define MERGE_FILES_AND_RUN_QUERY_COMMAND \
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, '%s', '%s')"
typedef enum CitusRTEKind typedef enum CitusRTEKind
@ -62,7 +63,6 @@ typedef enum
PARTITION_INVALID_FIRST = 0, PARTITION_INVALID_FIRST = 0,
RANGE_PARTITION_TYPE = 1, RANGE_PARTITION_TYPE = 1,
HASH_PARTITION_TYPE = 2 HASH_PARTITION_TYPE = 2
} PartitionType; } PartitionType;
@ -77,7 +77,6 @@ typedef enum
MAP_OUTPUT_FETCH_TASK = 5, MAP_OUTPUT_FETCH_TASK = 5,
MERGE_FETCH_TASK = 6, MERGE_FETCH_TASK = 6,
MODIFY_TASK = 7 MODIFY_TASK = 7
} TaskType; } TaskType;
@ -88,7 +87,6 @@ typedef enum
TASK_ASSIGNMENT_GREEDY = 1, TASK_ASSIGNMENT_GREEDY = 1,
TASK_ASSIGNMENT_ROUND_ROBIN = 2, TASK_ASSIGNMENT_ROUND_ROBIN = 2,
TASK_ASSIGNMENT_FIRST_REPLICA = 3 TASK_ASSIGNMENT_FIRST_REPLICA = 3
} TaskAssignmentPolicyType; } TaskAssignmentPolicyType;
@ -99,7 +97,6 @@ typedef enum
JOIN_MAP_MERGE_JOB = 1, JOIN_MAP_MERGE_JOB = 1,
SUBQUERY_MAP_MERGE_JOB = 2, SUBQUERY_MAP_MERGE_JOB = 2,
TOP_LEVEL_WORKER_JOB = 3 TOP_LEVEL_WORKER_JOB = 3
} BoundaryNodeJobType; } BoundaryNodeJobType;
@ -133,7 +130,6 @@ typedef struct MapMergeJob
ShardInterval **sortedShardIntervalArray; /* only applies to range partitioning */ ShardInterval **sortedShardIntervalArray; /* only applies to range partitioning */
List *mapTaskList; List *mapTaskList;
List *mergeTaskList; List *mergeTaskList;
} MapMergeJob; } MapMergeJob;
@ -164,7 +160,6 @@ typedef struct Task
uint64 shardId; /* only applies to shard fetch tasks */ uint64 shardId; /* only applies to shard fetch tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */ bool upsertQuery; /* only applies to modify tasks */
} Task; } Task;
@ -177,7 +172,6 @@ typedef struct RangeTableFragment
CitusRTEKind fragmentType; CitusRTEKind fragmentType;
void *fragmentReference; void *fragmentReference;
uint32 rangeTableId; uint32 rangeTableId;
} RangeTableFragment; } RangeTableFragment;
@ -190,7 +184,6 @@ typedef struct JoinSequenceNode
{ {
uint32 rangeTableId; uint32 rangeTableId;
int32 joiningRangeTableId; int32 joiningRangeTableId;
} JoinSequenceNode; } JoinSequenceNode;
@ -203,7 +196,6 @@ typedef struct MultiPlan
Job *workerJob; Job *workerJob;
Query *masterQuery; Query *masterQuery;
char *masterTableName; char *masterTableName;
} MultiPlan; } MultiPlan;

View File

@ -60,7 +60,6 @@ typedef enum
EXEC_TASK_TRACKER_FAILED = 14, EXEC_TASK_TRACKER_FAILED = 14,
EXEC_SOURCE_TASK_TRACKER_RETRY = 15, EXEC_SOURCE_TASK_TRACKER_RETRY = 15,
EXEC_SOURCE_TASK_TRACKER_FAILED = 16 EXEC_SOURCE_TASK_TRACKER_FAILED = 16
} TaskExecStatus; } TaskExecStatus;
@ -74,7 +73,6 @@ typedef enum
EXEC_TRANSMIT_TRACKER_RETRY = 4, EXEC_TRANSMIT_TRACKER_RETRY = 4,
EXEC_TRANSMIT_TRACKER_FAILED = 5, EXEC_TRANSMIT_TRACKER_FAILED = 5,
EXEC_TRANSMIT_DONE = 6 EXEC_TRANSMIT_DONE = 6
} TransmitExecStatus; } TransmitExecStatus;
@ -86,7 +84,6 @@ typedef enum
TRACKER_CONNECT_POLL = 2, TRACKER_CONNECT_POLL = 2,
TRACKER_CONNECTED = 3, TRACKER_CONNECTED = 3,
TRACKER_CONNECTION_FAILED = 4 TRACKER_CONNECTION_FAILED = 4
} TrackerStatus; } TrackerStatus;
@ -97,7 +94,6 @@ typedef enum
MULTI_EXECUTOR_REAL_TIME = 1, MULTI_EXECUTOR_REAL_TIME = 1,
MULTI_EXECUTOR_TASK_TRACKER = 2, MULTI_EXECUTOR_TASK_TRACKER = 2,
MULTI_EXECUTOR_ROUTER = 3 MULTI_EXECUTOR_ROUTER = 3
} MultiExecutorType; } MultiExecutorType;
@ -107,7 +103,6 @@ typedef enum
CONNECT_ACTION_NONE = 0, CONNECT_ACTION_NONE = 0,
CONNECT_ACTION_OPENED = 1, CONNECT_ACTION_OPENED = 1,
CONNECT_ACTION_CLOSED = 2 CONNECT_ACTION_CLOSED = 2
} ConnectAction; } ConnectAction;
@ -132,7 +127,6 @@ struct TaskExecution
uint32 querySourceNodeIndex; /* only applies to map fetch tasks */ uint32 querySourceNodeIndex; /* only applies to map fetch tasks */
int32 dataFetchTaskIndex; int32 dataFetchTaskIndex;
uint32 failureCount; uint32 failureCount;
}; };
@ -147,7 +141,6 @@ typedef struct TrackerTaskState
uint32 taskId; uint32 taskId;
TaskStatus status; TaskStatus status;
StringInfo taskAssignmentQuery; StringInfo taskAssignmentQuery;
} TrackerTaskState; } TrackerTaskState;
@ -171,7 +164,6 @@ typedef struct TaskTracker
int32 currentTaskIndex; int32 currentTaskIndex;
bool connectionBusy; bool connectionBusy;
TrackerTaskState *connectionBusyOnTask; TrackerTaskState *connectionBusyOnTask;
} TaskTracker; } TaskTracker;
@ -184,7 +176,6 @@ typedef struct WorkerNodeState
uint32 workerPort; uint32 workerPort;
char workerName[WORKER_LENGTH]; char workerName[WORKER_LENGTH];
uint32 openConnectionCount; uint32 openConnectionCount;
} WorkerNodeState; } WorkerNodeState;

View File

@ -35,7 +35,6 @@ typedef enum
FILE_CACHED = 2, FILE_CACHED = 2,
FILE_INACTIVE = 3, FILE_INACTIVE = 3,
FILE_TO_DELETE = 4 FILE_TO_DELETE = 4
} RelayFileState; } RelayFileState;

View File

@ -29,6 +29,7 @@ typedef enum AdvisoryLocktagClass
/* values defined in postgres' lockfuncs.c */ /* values defined in postgres' lockfuncs.c */
ADV_LOCKTAG_CLASS_INT64 = 1, ADV_LOCKTAG_CLASS_INT64 = 1,
ADV_LOCKTAG_CLASS_INT32 = 2, ADV_LOCKTAG_CLASS_INT32 = 2,
/* CitusDB lock types */ /* CitusDB lock types */
ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4, ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4,
ADV_LOCKTAG_CLASS_CITUS_SHARD = 5, ADV_LOCKTAG_CLASS_CITUS_SHARD = 5,

View File

@ -63,7 +63,6 @@ typedef enum
* TASK_STATUS_LAST, should never have their numbers changed. * TASK_STATUS_LAST, should never have their numbers changed.
*/ */
TASK_STATUS_LAST TASK_STATUS_LAST
} TaskStatus; } TaskStatus;
@ -85,7 +84,6 @@ typedef struct WorkerTask
char databaseName[NAMEDATALEN]; /* name to use for local backend connection */ char databaseName[NAMEDATALEN]; /* name to use for local backend connection */
int32 connectionId; /* connection id to local backend */ int32 connectionId; /* connection id to local backend */
uint32 failureCount; /* number of task failures */ uint32 failureCount; /* number of task failures */
} WorkerTask; } WorkerTask;
@ -97,6 +95,7 @@ typedef struct WorkerTasksSharedStateData
{ {
/* Hash table shared by the task tracker and task tracker protocol functions */ /* Hash table shared by the task tracker and task tracker protocol functions */
HTAB *taskHash; HTAB *taskHash;
/* Lock protecting workerNodesHash */ /* Lock protecting workerNodesHash */
LWLock *taskHashLock; LWLock *taskHashLock;
} WorkerTasksSharedStateData; } WorkerTasksSharedStateData;

View File

@ -48,7 +48,6 @@ typedef struct WorkerNode
char workerRack[WORKER_LENGTH]; /* node's network location */ char workerRack[WORKER_LENGTH]; /* node's network location */
bool inWorkerFile; /* is node in current membership file? */ bool inWorkerFile; /* is node in current membership file? */
} WorkerNode; } WorkerNode;

View File

@ -65,7 +65,6 @@ typedef struct RangePartitionContext
FmgrInfo *comparisonFunction; FmgrInfo *comparisonFunction;
Datum *splitPointArray; Datum *splitPointArray;
int32 splitPointCount; int32 splitPointCount;
} RangePartitionContext; } RangePartitionContext;
@ -77,7 +76,6 @@ typedef struct HashPartitionContext
{ {
FmgrInfo *hashFunction; FmgrInfo *hashFunction;
uint32 partitionCount; uint32 partitionCount;
} HashPartitionContext; } HashPartitionContext;
@ -114,7 +112,6 @@ typedef struct FileOutputStream
File fileDescriptor; File fileDescriptor;
StringInfo fileBuffer; StringInfo fileBuffer;
StringInfo filePath; StringInfo filePath;
} FileOutputStream; } FileOutputStream;