Cast the distribution argument value when possible

pull/3026/head
Marco Slot 2019-09-20 13:18:27 +02:00 committed by Philip Dubé
parent c95d46b4f3
commit e269d990c9
3 changed files with 45 additions and 20 deletions

View File

@ -16,6 +16,8 @@
#include "commands/defrem.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
@ -25,6 +27,7 @@
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "optimizer/clauses.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
@ -68,10 +71,11 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
{
Oid colocatedRelationId = InvalidOid;
Const *partitionValue = NULL;
Datum partitionValueDatum = 0;
ShardInterval *shardInterval = NULL;
List *placementList = NIL;
ListCell *argCell = NULL;
DistTableCacheEntry *distTable = NULL;
Var *partitionColumn = NULL;
ShardPlacement *placement = NULL;
WorkerNode *workerNode = NULL;
@ -95,20 +99,42 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
return false;
}
foreach(argCell, funcExpr->args)
if (contain_volatile_functions((Node *) funcExpr->args))
{
Node *argNode = (Node *) lfirst(argCell);
if (!IsA(argNode, Const))
{
ereport(DEBUG2, (errmsg("cannot push down non-constant argument value")));
return false;
}
ereport(DEBUG2, (errmsg("arguments in a distributed stored procedure must "
"be constant expressions")));
return false;
}
distTable = DistributedTableCacheEntry(colocatedRelationId);
partitionColumn = distTable->partitionColumn;
if (partitionColumn == NULL)
{
/* This can happen if colocated with a reference table. Punt for now. */
ereport(DEBUG1, (errmsg(
"cannot push down CALL for reference tables")));
return false;
}
partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex);
distTable = DistributedTableCacheEntry(colocatedRelationId);
shardInterval = FindShardInterval(partitionValue->constvalue, distTable);
if (!IsA(partitionValue, Const))
{
ereport(DEBUG2, (errmsg("distribution argument value must be a constant")));
return false;
}
partitionValueDatum = partitionValue->constvalue;
if (partitionValue->consttype != partitionColumn->vartype)
{
CopyCoercionData coercionData;
ConversionPathForTypes(partitionValue->consttype, partitionColumn->vartype,
&coercionData);
partitionValueDatum = CoerceColumnValue(partitionValueDatum, &coercionData);
}
shardInterval = FindShardInterval(partitionValueDatum, distTable);
if (shardInterval == NULL)
{
ereport(DEBUG2, (errmsg("cannot push down call, failed to find shard interval")));
@ -118,15 +144,14 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
placementList = FinalizedShardPlacementList(shardInterval->shardId);
if (list_length(placementList) != 1)
{
/* punt on reference tables for now */
ereport(DEBUG2, (errmsg(
"cannot push down CALL for reference tables or replicated distributed tables")));
/* punt on this for now */
ereport(DEBUG1, (errmsg(
"cannot push down CALL for replicated distributed tables")));
return false;
}
placement = (ShardPlacement *) linitial(placementList);
workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced)
{
ereport(DEBUG2, (errmsg("there is no worker node with metadata")));
@ -138,8 +163,8 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc,
&TTSOpsMinimalTuple);
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = 0;
task->taskType = DDL_TASK;
@ -150,7 +175,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
task->relationShardList = NIL;
task->taskPlacementList = placementList;
ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, list_make1(task),
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
tupleDesc, tupleStore, true,
MaxAdaptiveExecutorPoolSize);

View File

@ -219,7 +219,6 @@ static int64 RemoteCreateEmptyShard(char *relationName);
static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId);
static void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result);
static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName);
static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor);
static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor,
@ -228,7 +227,6 @@ static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor,
Oid *finalColumnTypeArray);
static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
bool binaryFormat);
static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath);
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
@ -1384,7 +1382,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
* ConversionPathForTypes fills *result with all the data necessary for converting
* Datums of type inputType to Datums of type destType.
*/
static void
void
ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result)
{
Oid coercionFuncId = InvalidOid;
@ -1743,7 +1741,7 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
* CoerceColumnValue follows the instructions in *coercionPath and uses them to convert
* inputValue into a Datum of the correct type.
*/
static Datum
Datum
CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath)
{
switch (coercionPath->coercionType)

View File

@ -143,6 +143,8 @@ extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
const char *queryString);
extern void CheckCopyPermissions(CopyStmt *copyStatement);
extern bool IsCopyResultStmt(CopyStmt *copyStatement);
extern void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result);
extern Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath);
#endif /* MULTI_COPY_H */