From e269d990c97ddac8468861285a5ab7877a3fe957 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 20 Sep 2019 13:18:27 +0200 Subject: [PATCH] Cast the distribution argument value when possible --- src/backend/distributed/commands/call.c | 57 +++++++++++++------ src/backend/distributed/commands/multi_copy.c | 6 +- src/include/distributed/commands/multi_copy.h | 2 + 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 86eef856d..863aded67 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -16,6 +16,8 @@ #include "commands/defrem.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -25,6 +27,7 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" +#include "optimizer/clauses.h" #include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "nodes/primnodes.h" @@ -68,10 +71,11 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, { Oid colocatedRelationId = InvalidOid; Const *partitionValue = NULL; + Datum partitionValueDatum = 0; ShardInterval *shardInterval = NULL; List *placementList = NIL; - ListCell *argCell = NULL; DistTableCacheEntry *distTable = NULL; + Var *partitionColumn = NULL; ShardPlacement *placement = NULL; WorkerNode *workerNode = NULL; @@ -95,20 +99,42 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return false; } - foreach(argCell, funcExpr->args) + if (contain_volatile_functions((Node *) funcExpr->args)) { - Node *argNode = (Node *) lfirst(argCell); - if (!IsA(argNode, Const)) - { - ereport(DEBUG2, (errmsg("cannot push down non-constant argument value"))); - return false; - } + ereport(DEBUG2, (errmsg("arguments in a distributed stored procedure must " + "be constant expressions"))); + return false; + } + + distTable = DistributedTableCacheEntry(colocatedRelationId); + partitionColumn = distTable->partitionColumn; + if (partitionColumn == NULL) + { + /* This can happen if colocated with a reference table. Punt for now. */ + ereport(DEBUG1, (errmsg( + "cannot push down CALL for reference tables"))); + return false; } partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex); - distTable = DistributedTableCacheEntry(colocatedRelationId); - shardInterval = FindShardInterval(partitionValue->constvalue, distTable); + if (!IsA(partitionValue, Const)) + { + ereport(DEBUG2, (errmsg("distribution argument value must be a constant"))); + return false; + } + partitionValueDatum = partitionValue->constvalue; + if (partitionValue->consttype != partitionColumn->vartype) + { + CopyCoercionData coercionData; + + ConversionPathForTypes(partitionValue->consttype, partitionColumn->vartype, + &coercionData); + + partitionValueDatum = CoerceColumnValue(partitionValueDatum, &coercionData); + } + + shardInterval = FindShardInterval(partitionValueDatum, distTable); if (shardInterval == NULL) { ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval"))); @@ -118,15 +144,14 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, placementList = FinalizedShardPlacementList(shardInterval->shardId); if (list_length(placementList) != 1) { - /* punt on reference tables for now */ - ereport(DEBUG2, (errmsg( - "cannot push down CALL for reference tables or replicated distributed tables"))); + /* punt on this for now */ + ereport(DEBUG1, (errmsg( + "cannot push down CALL for replicated distributed tables"))); return false; } placement = (ShardPlacement *) linitial(placementList); workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); - if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) { ereport(DEBUG2, (errmsg("there is no worker node with metadata"))); @@ -138,8 +163,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, TupleDesc tupleDesc = CallStmtResultDesc(callStmt); TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc, &TTSOpsMinimalTuple); - Task *task = CitusMakeNode(Task); + task->jobId = INVALID_JOB_ID; task->taskId = 0; task->taskType = DDL_TASK; @@ -150,7 +175,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->relationShardList = NIL; task->taskPlacementList = placementList; - ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, list_make1(task), + ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, true, MaxAdaptiveExecutorPoolSize); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 64a2e1737..70f67b450 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -219,7 +219,6 @@ static int64 RemoteCreateEmptyShard(char *relationName); static void MasterUpdateShardStatistics(uint64 shardId); static void RemoteUpdateShardStatistics(uint64 shardId); -static void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result); static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName); static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor); static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor, @@ -228,7 +227,6 @@ static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor, Oid *finalColumnTypeArray); static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat); -static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath); static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); @@ -1384,7 +1382,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result) * ConversionPathForTypes fills *result with all the data necessary for converting * Datums of type inputType to Datums of type destType. */ -static void +void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result) { Oid coercionFuncId = InvalidOid; @@ -1743,7 +1741,7 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, * CoerceColumnValue follows the instructions in *coercionPath and uses them to convert * inputValue into a Datum of the correct type. */ -static Datum +Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath) { switch (coercionPath->coercionType) diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 4609100fa..45874c5cf 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -143,6 +143,8 @@ extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); extern bool IsCopyResultStmt(CopyStmt *copyStatement); +extern void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result); +extern Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath); #endif /* MULTI_COPY_H */