(1) Functions will be delegated even when present in the scope of an explicit

BEGIN/COMMIT transaction block or in a UDF calling another UDF.
(2) Prohibit/Limit the delegated function not to do a 2PC (or any work on a
remote connection).
(3) Have a safety net to ensure the (2) i.e. we should block the connections
from the delegated procedure or make sure that no 2PC happens on the node.
(4) Such delegated functions are restricted to use only the distributed argument
value.

Note: To limit the scope of the project we are considering only Functions(not
procedures) for the initial work.

DESCRIPTION: Introduce a new flag "force_delegation" in create_distributed_function(),
which will allow a function to be delegated in an explicit transaction block.

Fixes #3265

Once the function is delegated to the worker, on that node during the planning

distributed_planner()
TryToDelegateFunctionCall()
CheckDelegatedFunctionExecution()
EnableInForceDelegatedFuncExecution()
Save the distribution argument (Constant)
ExecutorStart()
CitusBeginScan()
IsShardKeyValueAllowed()
Ensure to not use non-distribution argument.

ExecutorRun()
AdaptiveExecutor()
StartDistributedExecution()
EnsureNoRemoteExecutionFromWorkers()
Ensure all the shards are local to the node in the remoteTaskList.
NonPushableInsertSelectExecScan()
InitializeCopyShardState()
EnsureNoRemoteExecutionFromWorkers()
Ensure all the shards are local to the node in the placementList.

This also fixes a minor issue: Properly handle expressions+parameters in distribution arguments
pull/5633/head^2
Teja Mupparti 2022-01-19 10:44:46 -08:00 committed by Teja Mupparti
parent 65ab34810b
commit 54862f8c22
35 changed files with 2885 additions and 157 deletions

View File

@ -91,6 +91,7 @@ static void DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
char *distributionArgumentName,
Oid distributionArgumentOid,
char *colocateWithTableName,
bool *forceDelegationAddress,
const ObjectAddress *
functionAddress);
static void DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid,
@ -124,6 +125,8 @@ create_distributed_function(PG_FUNCTION_ARGS)
char *distributionArgumentName = NULL;
char *colocateWithTableName = NULL;
bool *forceDelegationAddress = NULL;
bool forceDelegation = false;
/* if called on NULL input, error out */
if (funcOid == InvalidOid)
@ -169,6 +172,17 @@ create_distributed_function(PG_FUNCTION_ARGS)
}
}
/* check if the force_delegation flag is explicitly set (default is NULL) */
if (PG_ARGISNULL(3))
{
forceDelegationAddress = NULL;
}
else
{
forceDelegation = PG_GETARG_BOOL(3);
forceDelegationAddress = &forceDelegation;
}
EnsureCoordinator();
EnsureFunctionOwner(funcOid);
@ -204,6 +218,7 @@ create_distributed_function(PG_FUNCTION_ARGS)
DistributeFunctionWithDistributionArgument(funcOid, distributionArgumentName,
distributionArgumentOid,
colocateWithTableName,
forceDelegationAddress,
&functionAddress);
}
else if (!colocatedWithReferenceTable)
@ -265,6 +280,7 @@ DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
char *distributionArgumentName,
Oid distributionArgumentOid,
char *colocateWithTableName,
bool *forceDelegationAddress,
const ObjectAddress *functionAddress)
{
/* get the argument index, or error out if we cannot find a valid index */
@ -279,7 +295,8 @@ DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
/* record the distribution argument and colocationId */
UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex,
&colocationId);
&colocationId,
forceDelegationAddress);
}
@ -310,7 +327,7 @@ DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid,
}
/* set distribution argument and colocationId to NULL */
UpdateFunctionDistributionInfo(functionAddress, NULL, NULL);
UpdateFunctionDistributionInfo(functionAddress, NULL, NULL, NULL);
}
@ -327,7 +344,8 @@ DistributeFunctionColocatedWithReferenceTable(const ObjectAddress *functionAddre
/* set distribution argument to NULL and colocationId to the reference table colocation id */
int *distributionArgumentIndex = NULL;
UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex,
&colocationId);
&colocationId,
NULL);
}
@ -596,7 +614,8 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
void
UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *distribution_argument_index,
int *colocationId)
int *colocationId,
bool *forceDelegation)
{
const bool indexOK = true;
@ -655,6 +674,18 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
isnull[Anum_pg_dist_object_colocationid - 1] = true;
}
replace[Anum_pg_dist_object_force_delegation - 1] = true;
if (forceDelegation != NULL)
{
values[Anum_pg_dist_object_force_delegation - 1] = BoolGetDatum(
*forceDelegation);
isnull[Anum_pg_dist_object_force_delegation - 1] = false;
}
else
{
isnull[Anum_pg_dist_object_force_delegation - 1] = true;
}
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple);
@ -672,6 +703,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
List *objectAddressList = list_make1((ObjectAddress *) distAddress);
List *distArgumentIndexList = NIL;
List *colocationIdList = NIL;
List *forceDelegationList = NIL;
if (distribution_argument_index == NULL)
{
@ -691,10 +723,20 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
colocationIdList = list_make1_int(*colocationId);
}
if (forceDelegation == NULL)
{
forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
}
else
{
forceDelegationList = list_make1_int(*forceDelegation);
}
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumentIndexList,
colocationIdList);
colocationIdList,
forceDelegationList);
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
}
}

View File

@ -161,9 +161,11 @@
#include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_identifier.h"
#include "distributed/tuple_destination.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "distributed/backend_data.h"
#include "lib/ilist.h"
#include "portability/instr_time.h"
#include "storage/fd.h"

View File

@ -34,14 +34,19 @@
#include "distributed/subplan_execution.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_protocol.h"
#include "distributed/colocation_utils.h"
#include "distributed/function_call_delegation.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "optimizer/clauses.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/datum.h"
extern AllowedDistributionColumn AllowedDistributionColumnValue;
/* functions for creating custom scan nodes */
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
@ -59,6 +64,8 @@ static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
static void SetJobColocationId(Job *job);
static void EnsureForceDelegationDistributionKey(Job *job);
/* create custom scan methods for all executors */
@ -190,6 +197,17 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusBeginModifyScan(node, estate, eflags);
}
/*
* If there is force_delgation functions' distribution argument set,
* enforce it
*/
if (AllowedDistributionColumnValue.isActive)
{
Job *workerJob = scanState->distributedPlan->workerJob;
EnsureForceDelegationDistributionKey(workerJob);
}
/*
* In case of a prepared statement, we will see this distributed plan again
* on the next execution with a higher usage counter.
@ -801,3 +819,96 @@ IsCitusCustomScan(Plan *plan)
return true;
}
/*
* In a Job, given a list of relations, if all them belong to the same
* colocation group, the Job's colocation ID is set to the group ID, else,
* it will be set to INVALID_COLOCATION_ID.
*/
static void
SetJobColocationId(Job *job)
{
uint32 jobColocationId = INVALID_COLOCATION_ID;
if (!job->partitionKeyValue)
{
/* if the Job has no shard key, nothing to do */
return;
}
List *rangeTableList = ExtractRangeTableEntryList(job->jobQuery);
ListCell *rangeTableCell = NULL;
foreach(rangeTableCell, rangeTableList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
Oid relationId = rangeTableEntry->relid;
if (!IsCitusTable(relationId))
{
/* ignore the non distributed table */
continue;
}
uint32 colocationId = TableColocationId(relationId);
if (jobColocationId == INVALID_COLOCATION_ID)
{
/* Initialize the ID */
jobColocationId = colocationId;
}
else if (jobColocationId != colocationId)
{
/* Tables' colocationId is not the same */
jobColocationId = INVALID_COLOCATION_ID;
break;
}
}
job->colocationId = jobColocationId;
}
/*
* Any function with force_delegate flag(true) must ensure that the Job's
* partition key match with the functions' distribution argument.
*/
static void
EnsureForceDelegationDistributionKey(Job *job)
{
/* If the Job has the subquery, punt the shard-key-check to the subquery */
if (job->subqueryPushdown)
{
return;
}
/*
* If the query doesn't have shard key, nothing to check, only exception is when
* the query doesn't have distributed tables but an RTE with intermediate_results
* function (a subquery plan).
*/
if (!job->partitionKeyValue)
{
bool queryContainsDistributedTable =
FindNodeMatchingCheckFunction((Node *) job->jobQuery, IsDistributedTableRTE);
if (!queryContainsDistributedTable)
{
return;
}
}
/* We should match both the key and the colocation ID */
SetJobColocationId(job);
if (!IsShardKeyValueAllowed(job->partitionKeyValue, job->colocationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"queries must filter by the distribution argument in the same "
"colocation group when using the forced function pushdown"),
errhint(
"consider disabling forced delegation through "
"create_distributed_table(..., force_delegation := false)")));
}
}

View File

@ -37,6 +37,7 @@
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_protocol.h"
#include "distributed/function_call_delegation.h"
#include "executor/execdebug.h"
#include "commands/copy.h"
#include "nodes/execnodes.h"
@ -234,6 +235,15 @@ CitusExecutorRun(QueryDesc *queryDesc,
* transactions.
*/
CitusTableCacheFlushInvalidatedEntries();
/*
* Within a 2PC, when a function is delegated to a remote node, we pin
* the distribution argument as the shard key for all the SQL in the
* function's block. The restriction is imposed to not to access other
* nodes from the current node and violate the transactional integrity
* of the 2PC. Now that the query is ending, reset the shard key to NULL.
*/
ResetAllowedShardKeyValue();
}
}
PG_CATCH();
@ -246,6 +256,15 @@ CitusExecutorRun(QueryDesc *queryDesc,
executorBoundParams = savedBoundParams;
ExecutorLevel--;
if (ExecutorLevel == 0 && PlannerLevel == 0)
{
/*
* In case of an exception, reset the pinned shard-key, for more
* details see the function header.
*/
ResetAllowedShardKeyValue();
}
PG_RE_THROW();
}
PG_END_TRY();
@ -761,6 +780,6 @@ InTaskExecution(void)
* are in a delegated function/procedure call.
*/
return IsCitusInitiatedRemoteBackend() &&
!InDelegatedFunctionCall &&
!InTopLevelDelegatedFunctionCall &&
!InDelegatedProcedureCall;
}

View File

@ -178,11 +178,13 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
List *objectAddressList = list_make1((ObjectAddress *) distAddress);
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);
char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumetIndexList,
colocationIdList);
colocationIdList,
forceDelegationList);
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand);
}
}

View File

@ -1363,6 +1363,9 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
1]);
cacheEntry->colocationId =
DatumGetInt32(datumArray[Anum_pg_dist_object_colocationid - 1]);
cacheEntry->forceDelegation =
DatumGetBool(datumArray[Anum_pg_dist_object_force_delegation - 1]);
}
else
{

View File

@ -732,6 +732,7 @@ DistributedObjectMetadataSyncCommandList(void)
List *objectAddressList = NIL;
List *distArgumentIndexList = NIL;
List *colocationIdList = NIL;
List *forceDelegationList = NIL;
/* It is not strictly necessary to read the tuples in order.
* However, it is useful to get consistent behavior, both for regression
@ -767,6 +768,14 @@ DistributedObjectMetadataSyncCommandList(void)
&colocationIdIsNull);
int32 colocationId = DatumGetInt32(colocationIdDatum);
bool forceDelegationIsNull = false;
Datum forceDelegationDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_force_delegation,
pgDistObjectDesc,
&forceDelegationIsNull);
bool forceDelegation = DatumGetBool(forceDelegationDatum);
objectAddressList = lappend(objectAddressList, address);
if (distributionArgumentIndexIsNull)
@ -789,6 +798,15 @@ DistributedObjectMetadataSyncCommandList(void)
{
colocationIdList = lappend_int(colocationIdList, colocationId);
}
if (forceDelegationIsNull)
{
forceDelegationList = lappend_int(forceDelegationList, NO_FORCE_PUSHDOWN);
}
else
{
forceDelegationList = lappend_int(forceDelegationList, forceDelegation);
}
}
systable_endscan_ordered(pgDistObjectScan);
@ -798,7 +816,8 @@ DistributedObjectMetadataSyncCommandList(void)
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumentIndexList,
colocationIdList);
colocationIdList,
forceDelegationList);
List *commandList = list_make1(workerMetadataUpdateCommand);
return commandList;
@ -1000,7 +1019,8 @@ NodeListInsertCommand(List *workerNodeList)
char *
MarkObjectsDistributedCreateCommand(List *addresses,
List *distributionArgumentIndexes,
List *colocationIds)
List *colocationIds,
List *forceDelegations)
{
StringInfo insertDistributedObjectsCommand = makeStringInfo();
@ -1009,7 +1029,7 @@ MarkObjectsDistributedCreateCommand(List *addresses,
appendStringInfo(insertDistributedObjectsCommand,
"WITH distributed_object_data(typetext, objnames, "
"objargs, distargumentindex, colocationid) AS (VALUES ");
"objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ");
bool isFirstObject = true;
for (int currentObjectCounter = 0; currentObjectCounter < list_length(addresses);
@ -1019,6 +1039,7 @@ MarkObjectsDistributedCreateCommand(List *addresses,
int distributionArgumentIndex = list_nth_int(distributionArgumentIndexes,
currentObjectCounter);
int colocationId = list_nth_int(colocationIds, currentObjectCounter);
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
List *names = NIL;
List *args = NIL;
char *objectType = NULL;
@ -1074,15 +1095,18 @@ MarkObjectsDistributedCreateCommand(List *addresses,
appendStringInfo(insertDistributedObjectsCommand, "%d, ",
distributionArgumentIndex);
appendStringInfo(insertDistributedObjectsCommand, "%d)",
appendStringInfo(insertDistributedObjectsCommand, "%d, ",
colocationId);
appendStringInfo(insertDistributedObjectsCommand, "%s)",
forceDelegation ? "true" : "false");
}
appendStringInfo(insertDistributedObjectsCommand, ") ");
appendStringInfo(insertDistributedObjectsCommand,
"SELECT citus_internal_add_object_metadata("
"typetext, objnames, objargs, distargumentindex::int, colocationid::int) "
"typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) "
"FROM distributed_object_data;");
return insertDistributedObjectsCommand->data;
@ -1101,6 +1125,7 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
ArrayType *argsArray = PG_GETARG_ARRAYTYPE_P(2);
int distributionArgumentIndex = PG_GETARG_INT32(3);
int colocationId = PG_GETARG_INT32(4);
bool forceDelegation = PG_GETARG_INT32(5);
if (!ShouldSkipMetadataChecks())
{
@ -1142,9 +1167,14 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
NULL :
&colocationId;
bool *forceDelegationAddress =
forceDelegation == false ?
NULL :
&forceDelegation;
UpdateFunctionDistributionInfo(&objectAddress,
distributionArgumentIndexAddress,
colocationIdAddress);
colocationIdAddress,
forceDelegationAddress);
}
SetLocalEnableDependencyCreation(prevDependencyCreationValue);

View File

@ -17,7 +17,7 @@
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/metadata_utility.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
@ -26,7 +26,7 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/metadata_utility.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
@ -41,6 +41,7 @@
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/primnodes.h"
#include "nodes/print.h"
#include "optimizer/clauses.h"
#include "parser/parse_coerce.h"
#include "parser/parsetree.h"
@ -55,7 +56,18 @@ struct ParamWalkerContext
ParamKind paramKind;
};
extern AllowedDistributionColumn AllowedDistributionColumnValue;
static bool contain_param_walker(Node *node, void *context);
static void CheckDelegatedFunctionExecution(DistObjectCacheEntry *procedure,
FuncExpr *funcExpr);
static bool IsQuerySimple(Query *query);
static FuncExpr * FunctionInFromClause(List *fromlist, Query *query);
static void EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId);
/* global variable keeping track of whether we are in a delegated function call */
bool InTopLevelDelegatedFunctionCall = false;
/* global variable keeping track of whether we are in a delegated function call */
@ -84,13 +96,12 @@ contain_param_walker(Node *node, void *context)
pwcontext->hasParam = true;
pwcontext->paramKind = paramNode->paramkind;
if (paramNode->paramkind == PARAM_EXEC)
{
return true;
}
return paramNode->paramkind == PARAM_EXEC;
}
else
{
return expression_tree_walker((Node *) node, contain_param_walker, context);
}
return false;
}
@ -106,8 +117,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
{
bool colocatedWithReferenceTable = false;
ShardPlacement *placement = NULL;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
struct ParamWalkerContext walkerParamContext = { 0 };
bool inTransactionBlock = false;
if (!CitusHasBeenLoaded() || !CheckCitusVersion(DEBUG4))
{
@ -147,60 +158,96 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL;
}
FuncExpr *fromFuncExpr = NULL;
if (joinTree->fromlist != NIL)
{
if (list_length(joinTree->fromlist) != 1)
{
/* e.g. SELECT ... FROM rel1, rel2. */
Assert(list_length(joinTree->fromlist) > 1);
return NULL;
}
/*
* In pg12's planning phase empty FROMs are represented with an RTE_RESULT.
* When we arrive here, standard_planner has already been called which calls
* replace_empty_jointree() which replaces empty fromlist with a list of
* single RTE_RESULT RangleTableRef node.
*/
if (list_length(joinTree->fromlist) == 1)
{
RangeTblRef *reference = linitial(joinTree->fromlist);
RangeTblRef *reference = linitial(joinTree->fromlist);
if (IsA(reference, RangeTblRef))
{
RangeTblEntry *rtentry = rt_fetch(reference->rtindex,
planContext->query->rtable);
if (rtentry->rtekind != RTE_RESULT)
{
/* e.g. SELECT f() FROM rel */
return NULL;
}
}
else
if (IsA(reference, RangeTblRef))
{
RangeTblEntry *rtentry = rt_fetch(reference->rtindex,
planContext->query->rtable);
if (rtentry->rtekind == RTE_FUNCTION)
{
/*
* e.g. IsA(reference, JoinExpr). This is explicit join expressions
* like INNER JOIN, NATURAL JOIN, ...
* Look for a function in the FROM clause.
*/
fromFuncExpr = FunctionInFromClause(joinTree->fromlist,
planContext->query);
}
else if (rtentry->rtekind != RTE_RESULT)
{
/* e.g. SELECT f() FROM rel */
ereport(DEBUG4, (errmsg("FromList item is not empty")));
return NULL;
}
}
else
{
/* e.g. SELECT ... FROM rel1, rel2. */
Assert(list_length(joinTree->fromlist) > 1);
/*
* e.g. IsA(reference, JoinExpr). This is explicit join expressions
* like INNER JOIN, NATURAL JOIN, ...
*/
return NULL;
}
}
FuncExpr *targetFuncExpr = NULL;
List *targetList = planContext->query->targetList;
if (list_length(planContext->query->targetList) != 1)
int targetListLen = list_length(targetList);
if (targetListLen == 1)
{
/* multiple target list items */
TargetEntry *targetEntry = (TargetEntry *) linitial(targetList);
if (IsA(targetEntry->expr, FuncExpr))
{
/* function from the SELECT clause e.g. SELECT fn() FROM */
targetFuncExpr = (FuncExpr *) targetEntry->expr;
}
}
/*
* Look for one of:
* SELECT fn(...);
* SELECT ... FROM fn(...);
*/
FuncExpr *funcExpr = NULL;
if (targetFuncExpr != NULL)
{
if (fromFuncExpr != NULL)
{
/* query is of the form: SELECT fn() FROM fn() */
return NULL;
}
/* query is of the form: SELECT fn(); */
funcExpr = targetFuncExpr;
}
else if (fromFuncExpr != NULL)
{
/* query is of the form: SELECT ... FROM fn(); */
funcExpr = fromFuncExpr;
}
else
{
/* query does not have a function call in SELECT or FROM */
return NULL;
}
TargetEntry *targetEntry = (TargetEntry *) linitial(targetList);
if (!IsA(targetEntry->expr, FuncExpr))
{
/* target list item is not a function call */
return NULL;
}
FuncExpr *funcExpr = (FuncExpr *) targetEntry->expr;
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
funcExpr->funcid, 0);
if (procedure == NULL || !procedure->isDistributed)
@ -215,11 +262,44 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
if (IsCitusInitiatedRemoteBackend())
{
bool isFunctionForceDelegated = procedure->forceDelegation;
/*
* We are planning a call to a distributed function within a Citus backend,
* that means that this is the delegated call.
* that means that this is the delegated call. If the function is forcefully
* delegated, capture the distribution argument.
*/
InDelegatedFunctionCall = true;
if (isFunctionForceDelegated)
{
CheckDelegatedFunctionExecution(procedure, funcExpr);
}
/* Are we planning the top function call? */
if (ExecutorLevel == 0 && PlannerLevel == 1)
{
/*
* InTopLevelDelegatedFunctionCall flag grants the levy
* to do remote tasks from a delegated function.
*/
if (!isFunctionForceDelegated)
{
/*
* we are planning a regular delegated call, we
* are allowed to do remote execution.
*/
InTopLevelDelegatedFunctionCall = true;
}
else if (!IsMultiStatementTransaction())
{
/*
* we are planning a force-delegated call, we
* are allowed to do remote execution if there
* is no explicit BEGIN-END transaction.
*/
InTopLevelDelegatedFunctionCall = true;
}
}
return NULL;
}
@ -244,12 +324,76 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL;
}
if (fromFuncExpr && !IsMultiStatementTransaction())
{
/*
* For now, let's not push the function from the FROM clause unless it's in a
* multistatement transaction with the forceDelegation flag ON.
*/
ereport(DEBUG2, (errmsg("function from the FROM clause is not pushed")));
return NULL;
}
/* dissuade the planner from trying a generic plan with parameters */
(void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker,
&walkerParamContext);
if (walkerParamContext.hasParam)
{
if (walkerParamContext.paramKind == PARAM_EXTERN)
{
/* Don't log a message, we should end up here again without a parameter */
DissuadePlannerFromUsingPlan(planContext->plan);
}
else
{
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
"not contain subqueries")));
}
return NULL;
}
if (IsMultiStatementTransaction())
{
/* cannot delegate function calls in a multi-statement transaction */
ereport(DEBUG1, (errmsg("not pushing down function calls in "
"a multi-statement transaction")));
return NULL;
if (!procedure->forceDelegation)
{
/* cannot delegate function calls in a multi-statement transaction */
ereport(DEBUG1, (errmsg("not pushing down function calls in "
"a multi-statement transaction")));
return NULL;
}
else
{
Node *partitionValueNode = (Node *) list_nth(funcExpr->args,
procedure->distributionArgIndex);
if (!IsA(partitionValueNode, Const))
{
ereport(DEBUG1, (errmsg("distribution argument value must be a "
"constant when using force_delegation flag")));
return NULL;
}
/*
* If the expression is simple, such as, SELECT fn() in
* PL/PgSQL code, PL engine is doing simple expression
* evaluation, which can't interpret the CustomScan Node.
* Function from FROM clause is not simple, so it's ok.
*/
if (MaybeExecutingUDF() && IsQuerySimple(planContext->query) && !fromFuncExpr)
{
ereport(DEBUG1, (errmsg("Skipping delegation of function "
"from a PL/PgSQL simple expression")));
return NULL;
}
/*
* When is flag is on, delegate the function call in a multi-statement
* transaction but with restrictions.
*/
ereport(DEBUG1, (errmsg("pushing down function call in "
"a multi-statement transaction")));
inTransactionBlock = true;
}
}
if (contain_volatile_functions((Node *) funcExpr->args))
@ -300,7 +444,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
/* return if we could not find a placement */
if (placement == NULL)
{
return false;
return NULL;
}
WorkerNode *workerNode = FindWorkerNode(placement->nodeName, placement->nodePort);
@ -312,6 +456,12 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
}
else if (workerNode->groupId == GetLocalGroupId())
{
/* If the force_pushdown flag is set, capture the distribution argument */
if (procedure->forceDelegation)
{
CheckDelegatedFunctionExecution(procedure, funcExpr);
}
/*
* Two reasons for this:
* (a) It would lead to infinite recursion as the node would
@ -323,27 +473,28 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL;
}
(void) expression_tree_walker((Node *) funcExpr->args, contain_param_walker,
&walkerParamContext);
if (walkerParamContext.hasParam)
{
if (walkerParamContext.paramKind == PARAM_EXTERN)
{
/* Don't log a message, we should end up here again without a parameter */
DissuadePlannerFromUsingPlan(planContext->plan);
}
else
{
ereport(DEBUG1, (errmsg("arguments in a distributed function must "
"not contain subqueries")));
}
return NULL;
}
ereport(DEBUG1, (errmsg("pushing down the function call")));
Task *task = CitusMakeNode(Task);
task->taskType = READ_TASK;
/*
* In a multi-statement block the function should be part of the sorrounding
* transaction, at this time, not knowing the operations in the function, it
* is safe to assume that it's a write task.
*
* TODO: We should compile the function to see the internals of the function
* and find if this has read-only tasks, does it involve doing a remote task
* or queries involving non-distribution column, etc.
*/
if (inTransactionBlock)
{
task->taskType = MODIFY_TASK;
}
else
{
task->taskType = READ_TASK;
}
task->taskPlacementList = list_make1(placement);
SetTaskQueryIfShouldLazyDeparse(task, planContext->query);
task->anchorShardId = placement->shardId;
@ -354,7 +505,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
job->jobQuery = planContext->query;
job->taskList = list_make1(task);
distributedPlan = CitusMakeNode(DistributedPlan);
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->workerJob = job;
distributedPlan->combineQuery = NULL;
distributedPlan->expectResults = true;
@ -465,3 +616,184 @@ ShardPlacementForFunctionColocatedWithReferenceTable(CitusTableCacheEntry *cache
return (ShardPlacement *) linitial(placementList);
}
/*
* Checks to see if the procedure is being executed on a worker after delegated
* by the coordinator. If the flag forceDelegation is set, capture the distribution
* argument value, to be used by the planner to make sure that function uses only
* the colocated shards of the distribution argument.
*/
void
CheckDelegatedFunctionExecution(DistObjectCacheEntry *procedure, FuncExpr *funcExpr)
{
Assert(procedure->forceDelegation);
/*
* On the coordinator PartiallyEvaluateExpression() descends into an
* expression tree to evaluate expressions that can be resolved to a
* constant. Expressions containing a Var are skipped, since the value
* of the Var is not known on the coordinator.
*/
Node *partitionValueNode = (Node *) list_nth(funcExpr->args,
procedure->distributionArgIndex);
Assert(partitionValueNode);
partitionValueNode = strip_implicit_coercions(partitionValueNode);
if (IsA(partitionValueNode, Param))
{
Param *partitionParam = (Param *) partitionValueNode;
if (partitionParam->paramkind == PARAM_EXTERN)
{
/* we should end up here again without a parameter */
return;
}
}
if (IsA(partitionValueNode, Const))
{
Const *partitionValueConst = (Const *) partitionValueNode;
ereport(DEBUG1, (errmsg("Pushdown argument: %s", pretty_format_node_dump(
nodeToString(partitionValueNode)))));
EnableInForceDelegatedFuncExecution(partitionValueConst, procedure->colocationId);
}
}
/*
* Function returns true if the query is simple enough to skip the full executor
* It checks only for expressions in the query clauses, and not WHERE and FROM
* lists.
*/
static bool
IsQuerySimple(Query *query)
{
if (query->hasAggs ||
query->hasWindowFuncs ||
query->hasTargetSRFs ||
query->hasSubLinks ||
query->cteList ||
query->groupClause ||
query->groupingSets ||
query->havingQual ||
query->windowClause ||
query->distinctClause ||
query->sortClause ||
query->limitOffset ||
query->limitCount ||
query->setOperations)
{
return false;
}
return true;
}
/*
* Look for a function in the FROM clause.
*/
static FuncExpr *
FunctionInFromClause(List *fromlist, Query *query)
{
if (list_length(fromlist) != 1)
{
/* We are looking for a single function */
return NULL;
}
RangeTblRef *reference = linitial(fromlist);
if (!IsA(reference, RangeTblRef))
{
/* Skip if there is no RTE */
return NULL;
}
RangeTblEntry *rtentry = rt_fetch(reference->rtindex, query->rtable);
if (rtentry->rtekind != RTE_FUNCTION)
{
return NULL;
}
if (list_length(rtentry->functions) != 1)
{
/* Skip if RTE isn't a single FuncExpr */
return NULL;
}
RangeTblFunction *rtfunc = (RangeTblFunction *) linitial(rtentry->functions);
if (!IsA(rtfunc->funcexpr, FuncExpr))
{
/* Skip if RTE isn't a simple FuncExpr */
return NULL;
}
return (FuncExpr *) rtfunc->funcexpr;
}
/*
* Sets a flag to true indicating that the current node is executing a delegated
* function call, using forceDelegation, within a distributed transaction issued
* by the coordinator. Also, saves the distribution argument.
*/
static void
EnableInForceDelegatedFuncExecution(Const *distArgument, uint32 colocationId)
{
/*
* The saved distribution argument need to persist through the life
* of the query, both during the planning (where we save) and execution
* (where we compare)
*/
MemoryContext oldcontext = MemoryContextSwitchTo(TopTransactionContext);
ereport(DEBUG1, errmsg("Saving Distribution Argument: %s:%d",
pretty_format_node_dump(nodeToString(distArgument)),
colocationId));
AllowedDistributionColumnValue.distributionColumnValue = copyObject(distArgument);
AllowedDistributionColumnValue.colocationId = colocationId;
AllowedDistributionColumnValue.isActive = true;
MemoryContextSwitchTo(oldcontext);
}
/*
* Within a 2PC, when a function is delegated to a remote node, we pin
* the distribution argument as the shard key for all the SQL in the
* function's block. The restriction is imposed to not to access other
* nodes from the current node and violate the transactional integrity of
* the 2PC. Reset the distribution argument value once the function ends.
*/
void
ResetAllowedShardKeyValue(void)
{
if (AllowedDistributionColumnValue.isActive)
{
pfree(AllowedDistributionColumnValue.distributionColumnValue);
AllowedDistributionColumnValue.isActive = false;
}
InTopLevelDelegatedFunctionCall = false;
}
/*
* Function returns true if the current shard key in the adaptive executor
* matches the saved distribution argument of a force_delegation function.
*/
bool
IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId)
{
Assert(AllowedDistributionColumnValue.isActive);
ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d",
pretty_format_node_dump(
nodeToString(
AllowedDistributionColumnValue.distributionColumnValue)),
pretty_format_node_dump(nodeToString(shardKey)),
AllowedDistributionColumnValue.colocationId, colocationId));
return (equal(AllowedDistributionColumnValue.distributionColumnValue, shardKey) &&
(AllowedDistributionColumnValue.colocationId == colocationId));
}

View File

@ -2,6 +2,7 @@
-- bump version to 11.0-1
#include "udfs/citus_disable_node/11.0-1.sql"
#include "udfs/create_distributed_function/11.0-1.sql"
#include "udfs/citus_check_connection_to_node/11.0-1.sql"
#include "udfs/citus_check_cluster_node_health/11.0-1.sql"
@ -20,6 +21,7 @@ DROP FUNCTION pg_catalog.master_append_table_to_shard(bigint, text, text, intege
-- all existing citus local tables are auto converted
-- none of the other tables can have auto-converted as true
ALTER TABLE pg_catalog.pg_dist_partition ADD COLUMN autoconverted boolean DEFAULT false;
ALTER TABLE citus.pg_dist_object ADD COLUMN force_delegation bool DEFAULT NULL;
UPDATE pg_catalog.pg_dist_partition SET autoconverted = TRUE WHERE partmethod = 'n' AND repmodel = 's';
REVOKE ALL ON FUNCTION start_metadata_sync_to_node(text, integer) FROM PUBLIC;

View File

@ -1,5 +1,6 @@
-- citus--11.0-1--10.2-4
DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text, bool);
CREATE FUNCTION pg_catalog.master_apply_delete_command(text)
RETURNS integer
LANGUAGE C STRICT
@ -43,7 +44,7 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege
DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);
DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer);
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean);
DROP FUNCTION pg_catalog.citus_run_local_command(text);
DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text);
@ -80,3 +81,5 @@ ORDER BY 1,2;
DROP FUNCTION pg_catalog.citus_shards_on_worker();
DROP FUNCTION pg_catalog.citus_shard_indexes_on_worker();
#include "../udfs/create_distributed_function/9.0-1.sql"
ALTER TABLE citus.pg_dist_object DROP COLUMN force_delegation;

View File

@ -3,11 +3,12 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_object_metadata(
objNames text[],
objArgs text[],
distribution_argument_index int,
colocationid int)
colocationid int,
force_delegation bool)
RETURNS void
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(text,text[],text[],int,int) IS
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(text,text[],text[],int,int,bool) IS
'Inserts distributed object into pg_dist_object';

View File

@ -3,11 +3,12 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_object_metadata(
objNames text[],
objArgs text[],
distribution_argument_index int,
colocationid int)
colocationid int,
force_delegation bool)
RETURNS void
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(text,text[],text[],int,int) IS
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(text,text[],text[],int,int,bool) IS
'Inserts distributed object into pg_dist_object';

View File

@ -0,0 +1,15 @@
DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text);
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function(function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default',
force_delegation bool DEFAULT NULL)
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION pg_catalog.create_distributed_function(function_name regprocedure,
distribution_arg_name text,
colocate_with text,
force_delegation bool)
IS 'creates a distributed function';

View File

@ -1,11 +1,15 @@
CREATE OR REPLACE FUNCTION create_distributed_function(function_name regprocedure,
DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text);
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_function(function_name regprocedure,
distribution_arg_name text DEFAULT NULL,
colocate_with text DEFAULT 'default')
colocate_with text DEFAULT 'default',
force_delegation bool DEFAULT NULL)
RETURNS void
LANGUAGE C CALLED ON NULL INPUT
AS 'MODULE_PATHNAME', $$create_distributed_function$$;
COMMENT ON FUNCTION create_distributed_function(function_name regprocedure,
COMMENT ON FUNCTION pg_catalog.create_distributed_function(function_name regprocedure,
distribution_arg_name text,
colocate_with text)
colocate_with text,
force_delegation bool)
IS 'creates a distributed function';

View File

@ -39,10 +39,13 @@
#include "distributed/subplan_execution.h"
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#include "distributed/commands.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/datum.h"
#include "storage/fd.h"
#include "nodes/print.h"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
@ -103,6 +106,12 @@ MemoryContext CommitContext = NULL;
*/
bool ShouldCoordinatedTransactionUse2PC = false;
/*
* Distribution function argument (along with colocationId) when delegated
* using forceDelegation flag.
*/
AllowedDistributionColumn AllowedDistributionColumnValue;
/* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true;
@ -119,10 +128,10 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
static bool MaybeExecutingUDF(void);
static void ResetGlobalVariables(void);
static bool SwallowErrors(void (*func)(void));
static void ForceAllInProgressConnectionsToClose(void);
static void EnsurePrepareTransactionIsAllowed(void);
/*
@ -460,12 +469,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE:
{
if (InCoordinatedTransaction())
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use 2PC in transactions involving "
"multiple servers")));
}
EnsurePrepareTransactionIsAllowed();
break;
}
}
@ -551,8 +555,9 @@ ResetGlobalVariables()
ShouldCoordinatedTransactionUse2PC = false;
TransactionModifiedNodeMetadata = false;
MetadataSyncOnCommit = false;
InDelegatedFunctionCall = false;
InTopLevelDelegatedFunctionCall = false;
ResetWorkerErrorIndication();
AllowedDistributionColumnValue.isActive = false;
}
@ -786,7 +791,7 @@ IsMultiStatementTransaction(void)
* If the planner is being called from the executor, then we may also be in
* a UDF.
*/
static bool
bool
MaybeExecutingUDF(void)
{
return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
@ -803,3 +808,31 @@ TriggerMetadataSyncOnCommit(void)
{
MetadataSyncOnCommit = true;
}
/*
* Function raises an exception, if the current backend started a coordinated
* transaction and got a PREPARE event to become a participant in a 2PC
* transaction coordinated by another node.
*/
static void
EnsurePrepareTransactionIsAllowed(void)
{
if (!InCoordinatedTransaction())
{
/* If the backend has not started a coordinated transaction. */
return;
}
if (IsCitusInitiatedRemoteBackend())
{
/*
* If this is a Citus-initiated backend.
*/
return;
}
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use 2PC in transactions involving "
"multiple servers")));
}

View File

@ -506,7 +506,8 @@ extern char * GenerateBackupNameForProcCollision(const ObjectAddress *address);
extern ObjectWithArgs * ObjectWithArgsFromOid(Oid funcOid);
extern void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
int *distribution_argument_index,
int *colocationId);
int *colocationId,
bool *forceDelegation);
/* vacuum.c - forward declarations */
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);

View File

@ -19,11 +19,11 @@
* These flags keep track of whether the process is currently in a delegated
* function or procedure call.
*/
extern bool InDelegatedFunctionCall;
extern bool InTopLevelDelegatedFunctionCall;
extern bool InDelegatedProcedureCall;
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);
extern void ResetAllowedShardKeyValue(void);
extern bool IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId);
#endif /* FUNCTION_CALL_DELEGATION_H */

View File

@ -16,6 +16,7 @@
#include "catalog/objectaddress.h"
#define INVALID_DISTRIBUTION_ARGUMENT_INDEX -1
#define NO_FORCE_PUSHDOWN 0
extern bool ObjectExists(const ObjectAddress *address);
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);

View File

@ -35,6 +35,7 @@ typedef struct FormData_pg_dist_object
uint32 distribution_argument_index; /* only valid for distributed functions/procedures */
uint32 colocationid; /* only valid for distributed functions/procedures */
boolean forced_pushdown; /* only valid for distributed functions */
#endif
} FormData_pg_dist_object;
@ -49,7 +50,7 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
* compiler constants for pg_dist_object
* ----------------
*/
#define Natts_pg_dist_object 8
#define Natts_pg_dist_object 9
#define Anum_pg_dist_object_classid 1
#define Anum_pg_dist_object_objid 2
#define Anum_pg_dist_object_objsubid 3
@ -58,5 +59,6 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
#define Anum_pg_dist_object_object_args 6
#define Anum_pg_dist_object_distribution_argument_index 7
#define Anum_pg_dist_object_colocationid 8
#define Anum_pg_dist_object_force_delegation 9
#endif /* PG_DIST_OBJECT_H */

View File

@ -115,6 +115,7 @@ typedef struct DistObjectCacheEntry
int distributionArgIndex;
int colocationId;
bool forceDelegation;
} DistObjectCacheEntry;
typedef enum

View File

@ -36,7 +36,8 @@ extern List * MetadataCreateCommands(void);
extern List * MetadataDropCommands(void);
extern char * MarkObjectsDistributedCreateCommand(List *addresses,
List *distributionArgumentIndexes,
List *colocationIds);
List *colocationIds,
List *forceDelegations);
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(const char *schemaName,
const char *tableName);

View File

@ -199,7 +199,6 @@ extern bool IsCitusTableRTE(Node *node);
extern bool IsDistributedOrReferenceTableRTE(Node *node);
extern bool IsDistributedTableRTE(Node *node);
extern bool IsReferenceTableRTE(Node *node);
extern bool QueryContainsDistributedTableRTE(Query *query);
extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte);
extern bool ContainsReadIntermediateResultFunction(Node *node);
extern bool ContainsReadIntermediateResultArrayFunction(Node *node);

View File

@ -161,6 +161,7 @@ typedef struct Job
* query.
*/
bool parametersInJobQueryResolved;
uint32 colocationId; /* common colocation group ID of the relations */
} Job;

View File

@ -12,6 +12,12 @@
#include "lib/ilist.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "lib/stringinfo.h"
#include "nodes/primnodes.h"
/* forward declare, to avoid recursive includes */
struct DistObjectCacheEntry;
/* describes what kind of modifications have occurred in the current transaction */
typedef enum
@ -53,6 +59,19 @@ typedef struct SubXactContext
StringInfo setLocalCmds;
} SubXactContext;
/*
* Function delegated with force_delegation call enforces the distribution argument
* along with the colocationId. The latter one is equally important to not allow
* the same partition key value into another distributed table which is not co-located
* and therefore might be on a different node.
*/
typedef struct AllowedDistributionColumn
{
Const *distributionColumnValue;
uint32 colocationId;
bool isActive;
} AllowedDistributionColumn;
/*
* GUC that determines whether a SELECT in a transaction block should also run in
* a transaction block on the worker.
@ -100,6 +119,8 @@ extern void Use2PCForCoordinatedTransaction(void);
extern bool GetCoordinatedTransactionShouldUse2PC(void);
extern bool IsMultiStatementTransaction(void);
extern void EnsureDistributedTransactionId(void);
extern bool MaybeExecutingUDF(void);
/* initialization function(s) */
extern void InitializeTransactionManagement(void);

View File

@ -599,7 +599,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
metadata_command
---------------------------------------------------------------------
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0, false), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0, false), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0, false), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50, false), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52, false), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52, false), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0, false), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0, false), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0, false), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0, false), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0, false), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
(1 row)
-- valid distribution with distribution_arg_index

File diff suppressed because it is too large Load Diff

View File

@ -556,9 +556,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: unrecognized object type "non_existing_type"
ROLLBACK;
-- check the sanity of distributionArgumentIndex and colocationId
@ -571,9 +571,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: distribution_argument_index must be between 0 and 100
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -585,9 +585,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: colocationId must be a positive number
ROLLBACK;
-- check with non-existing object
@ -600,9 +600,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: role "non_existing_user" does not exist
ROLLBACK;
-- since citus_internal_add_object_metadata is strict function returns NULL
@ -616,9 +616,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
citus_internal_add_object_metadata
---------------------------------------------------------------------
@ -640,9 +640,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
CREATE TABLE publication_test_table(id int);
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: Object type 29 can not be distributed by Citus
ROLLBACK;
-- Show that citus_internal_add_object_metadata checks the priviliges
@ -659,9 +659,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
AS $$ SELECT $1 $$
LANGUAGE SQL;
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('function', ARRAY['distribution_test_function']::text[], ARRAY['integer']::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('function', ARRAY['distribution_test_function']::text[], ARRAY['integer']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ERROR: must be owner of function distribution_test_function
ROLLBACK;
-- we do not allow wrong partmethod

View File

@ -997,18 +997,20 @@ SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
function citus_disable_node(text,integer) void |
function create_distributed_function(regprocedure,text,text) void |
function master_append_table_to_shard(bigint,text,text,integer) real |
function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record |
| function citus_check_cluster_node_health() SETOF record
| function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void
| function citus_run_local_command(text) void
| function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void
| function worker_drop_sequence_dependency(text) void
(12 rows)
(14 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -8,7 +8,8 @@ SELECT attrelid::regclass, attname, atthasmissing, attmissingval
FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass)
'pg_dist_partition'::regclass,
'citus.pg_dist_object'::regclass)
ORDER BY attrelid, attname;
attrelid | attname | atthasmissing | attmissingval
---------------------------------------------------------------------

View File

@ -48,7 +48,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
(6 rows)
-- this function is dropped in Citus10, added here for tests
@ -106,7 +106,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(19 rows)
@ -133,7 +133,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(20 rows)
@ -161,7 +161,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(20 rows)
@ -195,7 +195,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(20 rows)
@ -222,7 +222,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(20 rows)
@ -1752,7 +1752,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
TRUNCATE citus.pg_dist_object
TRUNCATE pg_dist_node CASCADE
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[], -1, 0), ('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 5, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 5, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 5, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 5, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310020, 1, 0, 1, 100020), (1310021, 1, 0, 5, 100021), (1310022, 1, 0, 1, 100022), (1310023, 1, 0, 5, 100023), (1310024, 1, 0, 1, 100024)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310025, 1, 0, 1, 100025), (1310026, 1, 0, 5, 100026), (1310027, 1, 0, 1, 100027), (1310028, 1, 0, 5, 100028), (1310029, 1, 0, 1, 100029)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;

View File

@ -69,7 +69,7 @@ ORDER BY 1;
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.upgrade_columnar_storage(regclass)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
@ -120,7 +120,7 @@ ORDER BY 1;
function coord_combine_agg(oid,cstring,anyelement)
function coord_combine_agg_ffunc(internal,oid,cstring,anyelement)
function coord_combine_agg_sfunc(internal,oid,cstring,anyelement)
function create_distributed_function(regprocedure,text,text)
function create_distributed_function(regprocedure,text,text,boolean)
function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
function create_intermediate_result(text,text)
function create_reference_table(regclass)

View File

@ -68,6 +68,7 @@ test: multi_basic_queries cross_join multi_complex_expressions multi_subquery mu
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
test: sql_procedure multi_function_in_join row_types materialized_view
test: multi_subquery_in_where_reference_clause adaptive_executor propagate_set_commands geqo
test: forcedelegation_functions
# this should be run alone as it gets too many clients
test: join_pushdown
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message

View File

@ -0,0 +1,666 @@
SET citus.log_remote_commands TO OFF;
DROP SCHEMA IF EXISTS forcepushdown_schema CASCADE;
CREATE SCHEMA forcepushdown_schema;
SET search_path TO 'forcepushdown_schema';
SET citus.shard_replication_factor = 1;
SET citus.shard_count = 32;
SET citus.next_shard_id TO 900000;
CREATE TABLE test_forcepushdown(intcol int PRIMARY KEY, data char(50) default 'default');
SELECT create_distributed_table('test_forcepushdown', 'intcol', colocate_with := 'none');
--
--Table in a different colocation group
--
CREATE TABLE test_forcepushdown_noncolocate(intcol int PRIMARY KEY);
SELECT create_distributed_table('test_forcepushdown_noncolocate', 'intcol', colocate_with := 'none');
CREATE FUNCTION insert_data(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (a);
END;
$fn$;
CREATE FUNCTION insert_data_non_distarg(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (a+1);
END;
$fn$;
CREATE FUNCTION update_data_nonlocal(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
UPDATE forcepushdown_schema.test_forcepushdown SET data = 'non-default';
END;
$fn$;
CREATE FUNCTION insert_data_noncolocation(a int)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
-- Insert into a different table than the function is colocated with
INSERT INTO forcepushdown_schema.test_forcepushdown_noncolocate VALUES (a);
END;
$fn$;
SELECT create_distributed_function(
'insert_data(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
SELECT create_distributed_function(
'insert_data_non_distarg(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
SELECT create_distributed_function(
'update_data_nonlocal(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
SELECT create_distributed_function(
'insert_data_noncolocation(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
SET client_min_messages TO DEBUG1;
--SET citus.log_remote_commands TO on;
SELECT public.wait_until_metadata_sync(30000);
SELECT 'Transaction with no errors' Testing;
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (1);
-- This call will insert both the rows locally on the remote worker
SELECT insert_data(2);
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (3);
COMMIT;
SELECT 'Transaction with duplicate error in the remote function' Testing;
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (4);
-- This call will fail with duplicate error on the remote worker
SELECT insert_data(3);
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (5);
COMMIT;
SELECT 'Transaction with duplicate error in the local statement' Testing;
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (6);
-- This call will insert both the rows locally on the remote worker
SELECT insert_data(7);
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (8);
-- This will fail
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (8);
COMMIT;
SELECT 'Transaction with function using non-distribution argument' Testing;
BEGIN;
-- This should fail
SELECT insert_data_non_distarg(9);
COMMIT;
SELECT 'Transaction with function doing remote connection' Testing;
BEGIN;
-- This statement will pass
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (11);
-- This call will try to update rows locally and on remote node(s)
SELECT update_data_nonlocal(12);
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (13);
COMMIT;
SELECT 'Transaction with no errors but with a rollback' Testing;
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (14);
-- This call will insert both the rows locally on the remote worker
SELECT insert_data(15);
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (16);
ROLLBACK;
--
-- Add function with pushdown=true in the targetList of a query
--
BEGIN;
-- Query gets delegated to the node of the shard xx_900001 for the key=1,
-- and the function inserts value (1+17) locally on the shard xx_900031
SELECT insert_data(intcol+17) from test_forcepushdown where intcol = 1;
-- This will fail with duplicate error as the function already inserted
-- the value(1+17)
SELECT insert_data(18);
COMMIT;
--
-- Access a table with the same shard key as distribution argument but in a
-- different colocation group.
--
BEGIN;
SELECT insert_data_noncolocation(19);
COMMIT;
SELECT insert_data_noncolocation(19);
-- This should have only the first 3 rows as all other transactions were rolled back.
SELECT * FROM forcepushdown_schema.test_forcepushdown ORDER BY 1;
--
-- Nested call, function with pushdown=false calling function with pushdown=true
--
CREATE TABLE test_nested (id int, name text);
SELECT create_distributed_table('test_nested','id');
INSERT INTO test_nested VALUES (100,'hundred');
INSERT INTO test_nested VALUES (200,'twohundred');
INSERT INTO test_nested VALUES (300,'threehundred');
INSERT INTO test_nested VALUES (400,'fourhundred');
INSERT INTO test_nested VALUES (512,'fivetwelve');
CREATE OR REPLACE FUNCTION inner_force_delegation_function(int)
RETURNS NUMERIC AS $$
DECLARE ret_val NUMERIC;
BEGIN
SELECT max(id)::numeric+1 INTO ret_val FROM forcepushdown_schema.test_nested WHERE id = $1;
RAISE NOTICE 'inner_force_delegation_function():%', ret_val;
RETURN ret_val;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION func_calls_forcepush_func()
RETURNS NUMERIC AS $$
DECLARE incremented_val NUMERIC;
BEGIN
-- Constant distribution argument
SELECT inner_force_delegation_function INTO incremented_val FROM inner_force_delegation_function(100);
RETURN incremented_val;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('func_calls_forcepush_func()');
SELECT create_distributed_function('inner_force_delegation_function(int)', '$1', colocate_with := 'test_nested', force_delegation := true);
SELECT public.wait_until_metadata_sync(30000);
BEGIN;
SELECT func_calls_forcepush_func();
COMMIT;
SELECT func_calls_forcepush_func();
CREATE OR REPLACE FUNCTION get_val()
RETURNS INT AS $$
BEGIN
RETURN 100::INT;
END;
$$ LANGUAGE plpgsql;
--
-- UDF calling another UDF in a FROM clause
-- fn()
-- {
-- select res into var from fn();
-- }
--
CREATE OR REPLACE FUNCTION func_calls_forcepush_func_infrom()
RETURNS NUMERIC AS $$
DECLARE incremented_val NUMERIC;
DECLARE add_val INT;
BEGIN
add_val := get_val();
SELECT inner_force_delegation_function INTO incremented_val FROM inner_force_delegation_function(add_val + 100);
RETURN incremented_val;
END;
$$ LANGUAGE plpgsql;
SELECT func_calls_forcepush_func_infrom();
BEGIN;
SELECT func_calls_forcepush_func_infrom();
COMMIT;
--
-- UDF calling another UDF in the SELECT targetList
-- fn()
-- {
-- select fn() into var;
-- }
--
CREATE OR REPLACE FUNCTION func_calls_forcepush_func_intarget()
RETURNS NUMERIC AS $$
DECLARE incremented_val NUMERIC;
DECLARE add_val INT;
BEGIN
add_val := get_val();
SELECT inner_force_delegation_function(100 + 100) INTO incremented_val OFFSET 0;
RETURN incremented_val;
END;
$$ LANGUAGE plpgsql;
SELECT func_calls_forcepush_func_intarget();
BEGIN;
SELECT func_calls_forcepush_func_intarget();
COMMIT;
--
-- Recursive function call with pushdown=true
--
CREATE OR REPLACE FUNCTION test_recursive(inp integer)
RETURNS INT AS $$
DECLARE var INT;
BEGIN
RAISE NOTICE 'input:%', inp;
if (inp > 1) then
inp := inp - 1;
var := forcepushdown_schema.test_recursive(inp);
RETURN var;
else
RETURN inp;
END if;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('test_recursive(int)', '$1', colocate_with := 'test_nested', force_delegation := true);
BEGIN;
SELECT test_recursive(5);
END;
--
-- Distributed function gets delegated indirectly (as part of a query)
--
BEGIN;
-- Query lands on the shard with key = 300(shard __900089) and the function inserts locally
SELECT inner_force_delegation_function(id) FROM test_nested WHERE id = 300;
-- Query lands on the shard with key = 300(shard __900089) and the function inserts remotely
SELECT insert_data_non_distarg(id) FROM test_nested WHERE id = 300;
END;
--
-- Non constant distribution arguments
--
-- Param(PARAM_EXEC) node e.g. SELECT fn((SELECT col from test_nested where col=val))
BEGIN;
SELECT inner_force_delegation_function((SELECT id+112 FROM test_nested WHERE id=400));
END;
CREATE OR REPLACE FUNCTION test_non_constant(x int, y bigint)
RETURNS int
AS $$
DECLARE
BEGIN
RAISE NOTICE 'test_non_constant: % %', x, y;
RETURN x + y;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function(
'test_non_constant(int,bigint)',
'$1',
colocate_with := 'test_forcepushdown',
force_delegation := true);
SELECT count(*) FROM test_nested;
-- Result should print 99, count(*) from test_nested
WITH c AS (SELECT count(*) FROM test_nested),
b as (SELECT test_non_constant(99::int, (SELECT COUNT FROM c)))
SELECT COUNT(*) FROM b;
CREATE TABLE emp (
empname text NOT NULL,
salary integer
);
CREATE TABLE emp_audit(
operation char(1) NOT NULL,
stamp timestamp NOT NULL,
userid text NOT NULL,
empname text NOT NULL,
salary integer
);
SELECT create_distributed_table('emp','empname');
SELECT create_distributed_table('emp_audit','empname');
CREATE OR REPLACE FUNCTION inner_emp(empname text)
RETURNS void
AS $$
DECLARE
BEGIN
INSERT INTO emp VALUES (empname, 33);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION outer_emp()
RETURNS void
AS $$
DECLARE
BEGIN
PERFORM inner_emp('hello');
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('inner_emp(text)','empname', force_delegation := true);
SELECT outer_emp();
SELECT * from emp;
--
-- INSERT..SELECT
--
CREATE FUNCTION insert_select_data(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown SELECT(a+1);
END;
$fn$;
SELECT create_distributed_function(
'insert_select_data(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
-- Function lands on worker1 and issues COPY ... INSERT on the worker2 into the shard_900021
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (30);
-- This will fail
SELECT insert_select_data(20);
COMMIT;
-- Function lands on worker2 and issues COPY ... INSERT on the same node into the shard_900029
BEGIN;
-- This will pass
SELECT insert_select_data(21);
END;
-- Function lands on worker2 and issues COPY ... INSERT on the worker1 into the shard_900028
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (30);
-- This will fail
SELECT insert_select_data(22);
END;
-- Functions lands on worker1 and issues COPY ... INSERT on the worker2 into the shard_900021
-- This will pass as there is no surrounding transaction
SELECT insert_select_data(20);
-- (21+1) and (20+1) should appear
SELECT * FROM forcepushdown_schema.test_forcepushdown ORDER BY 1;
CREATE FUNCTION insert_select_data_nonlocal(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown(intcol)
SELECT intcol FROM forcepushdown_schema.test_forcepushdown_noncolocate;
END;
$fn$;
SELECT create_distributed_function(
'insert_select_data_nonlocal(int)', 'a',
colocate_with := 'test_forcepushdown',
force_delegation := true
);
INSERT INTO forcepushdown_schema.test_forcepushdown_noncolocate VALUES (30);
INSERT INTO forcepushdown_schema.test_forcepushdown_noncolocate VALUES (31);
INSERT INTO forcepushdown_schema.test_forcepushdown_noncolocate VALUES (32);
BEGIN;
INSERT INTO forcepushdown_schema.test_forcepushdown VALUES (40);
-- This will fail
SELECT insert_select_data_nonlocal(41);
COMMIT;
-- Above 3 rows (30, 31, 32) should appear now
SELECT insert_select_data_nonlocal(40);
SELECT * FROM forcepushdown_schema.test_forcepushdown ORDER BY 1;
CREATE TABLE test_forcepushdown_char(data char(50) PRIMARY KEY);
SELECT create_distributed_table('test_forcepushdown_char', 'data', colocate_with := 'none');
CREATE TABLE test_forcepushdown_varchar(data varchar PRIMARY KEY);
SELECT create_distributed_table('test_forcepushdown_varchar', 'data', colocate_with := 'none');
CREATE TABLE test_forcepushdown_text(data text PRIMARY KEY);
SELECT create_distributed_table('test_forcepushdown_text', 'data', colocate_with := 'none');
CREATE FUNCTION insert_data_char(a char(50))
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown_char VALUES (a);
END;
$fn$;
SELECT create_distributed_function(
'insert_data_char(char)', 'a',
colocate_with := 'test_forcepushdown_char',
force_delegation := true
);
CREATE FUNCTION insert_data_varchar(a varchar)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown_varchar VALUES (a);
END;
$fn$;
SELECT create_distributed_function(
'insert_data_varchar(varchar)', 'a',
colocate_with := 'test_forcepushdown_varchar',
force_delegation := true
);
CREATE FUNCTION insert_data_text(a text)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO forcepushdown_schema.test_forcepushdown_text VALUES (a);
END;
$fn$;
SELECT create_distributed_function(
'insert_data_text(text)', 'a',
colocate_with := 'test_forcepushdown_text',
force_delegation := true
);
SELECT insert_data_varchar('VARCHAR');
BEGIN;
SELECT insert_data_varchar('VARCHAR2');
COMMIT;
SELECT insert_data_text('TEXT');
BEGIN;
SELECT insert_data_text('TEXT2');
COMMIT;
-- Char is failing as the datatype is represented differently in the
-- PL/PgSQL and the exec engine.
SELECT insert_data_char('CHAR');
BEGIN;
SELECT insert_data_char('CHAR');
COMMIT;
SELECT * FROM test_forcepushdown_char ORDER BY 1;
SELECT * FROM test_forcepushdown_varchar ORDER BY 1;
SELECT * FROM test_forcepushdown_text ORDER BY 1;
-- Test sub query
CREATE TABLE test_subquery(data int, result int);
SELECT create_distributed_table('test_subquery', 'data', colocate_with := 'none');
CREATE TABLE test_non_colocated(id int);
SELECT create_distributed_table('test_non_colocated', 'id', colocate_with := 'none');
CREATE FUNCTION select_data(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE var INT;
BEGIN
SELECT result INTO var FROM forcepushdown_schema.test_subquery WHERE data =
(SELECT data FROM forcepushdown_schema.test_subquery WHERE data = a);
RAISE NOTICE 'Result: %', var;
END;
$fn$;
SELECT create_distributed_function(
'select_data(int)', 'a',
colocate_with := 'test_subquery',
force_delegation := true
);
CREATE FUNCTION select_data_noncolocate(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE var INT;
BEGIN
-- Key is the same but colocation ID is different
SELECT data INTO var FROM forcepushdown_schema.test_subquery WHERE data =
(SELECT id FROM forcepushdown_schema.test_non_colocated WHERE id = a);
RAISE NOTICE 'Result: %', var;
END;
$fn$;
SELECT create_distributed_function(
'select_data_noncolocate(int)', 'a',
colocate_with := 'test_subquery',
force_delegation := true
);
CREATE FUNCTION insert_select_data_cte1(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE var INT;
BEGIN
WITH ins AS (INSERT INTO forcepushdown_schema.test_subquery VALUES (a) RETURNING data)
SELECT ins.data INTO var FROM ins;
RAISE NOTICE 'Result: %', var;
END;
$fn$;
SELECT create_distributed_function(
'insert_select_data_cte1(int)', 'a',
colocate_with := 'test_subquery',
force_delegation := true
);
CREATE FUNCTION insert_select_data_cte2(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE var INT;
BEGIN
WITH ins AS (INSERT INTO forcepushdown_schema.test_subquery VALUES (a) RETURNING data)
SELECT ins.data INTO var FROM forcepushdown_schema.test_subquery, ins WHERE forcepushdown_schema.test_subquery.data = a;
RAISE NOTICE 'Result: %', var;
END;
$fn$;
SELECT create_distributed_function(
'insert_select_data_cte2(int)', 'a',
colocate_with := 'test_subquery',
force_delegation := true
);
CREATE FUNCTION insert_data_cte_nondist(a integer)
RETURNS void LANGUAGE plpgsql AS $fn$
DECLARE var INT;
BEGIN
-- Inserting a non-distribution argument (a+1)
WITH ins AS (INSERT INTO forcepushdown_schema.test_subquery VALUES (a+1) RETURNING data)
SELECT ins.data INTO var FROM forcepushdown_schema.test_subquery, ins WHERE forcepushdown_schema.test_subquery.data = a;
RAISE NOTICE 'Result: %', var;
END;
$fn$;
SELECT create_distributed_function(
'insert_data_cte_nondist(int)', 'a',
colocate_with := 'test_subquery',
force_delegation := true
);
INSERT INTO forcepushdown_schema.test_subquery VALUES(100, -1);
-- This should pass
SELECT select_data(100);
BEGIN;
SELECT select_data(100);
END;
-- This should fail
SELECT select_data_noncolocate(100);
BEGIN;
SELECT select_data_noncolocate(100);
END;
-- This should pass
SELECT insert_select_data_cte1(200);
BEGIN;
SELECT insert_select_data_cte1(200);
COMMIT;
-- This should pass
SELECT insert_select_data_cte2(300);
BEGIN;
SELECT insert_select_data_cte2(300);
COMMIT;
-- This should fail
SELECT insert_data_cte_nondist(400);
BEGIN;
SELECT insert_data_cte_nondist(400);
COMMIT;
-- Rows 100, 200, 300 should be seen
SELECT * FROM forcepushdown_schema.test_subquery ORDER BY 1;
-- Query with targetList greater than 1
-- Function from FROM clause is not delegated outside of a BEGIN (for now)
SELECT 1,2,3 FROM select_data(100);
BEGIN;
-- Function from FROM clause is delegated
SELECT 1,2,3 FROM select_data(100);
END;
-- Test prepared statements
CREATE TABLE table_test_prepare(i int, j bigint);
SELECT create_distributed_table('table_test_prepare', 'i', colocate_with := 'none');
DROP FUNCTION test_prepare(int, int);
CREATE OR REPLACE FUNCTION test_prepare(x int, y int)
RETURNS bigint
AS $$
DECLARE
BEGIN
INSERT INTO forcepushdown_schema.table_test_prepare VALUES (x, y);
INSERT INTO forcepushdown_schema.table_test_prepare VALUES (y, x);
RETURN x + y;
END;
$$ LANGUAGE plpgsql;
SELECT create_distributed_function('test_prepare(int,int)','x',force_delegation :=true, colocate_with := 'table_test_prepare');
DROP FUNCTION outer_test_prepare(int, int);
CREATE OR REPLACE FUNCTION outer_test_prepare(x int, y int)
RETURNS void
AS $$
DECLARE
v int;
BEGIN
PERFORM FROM test_prepare(x, y);
PERFORM 1, 1 + a FROM test_prepare(x + 1, y + 1) a;
END;
$$ LANGUAGE plpgsql;
-- First 5 get delegated and succeeds
BEGIN;
SELECT outer_test_prepare(1,1);
SELECT outer_test_prepare(1,1);
SELECT outer_test_prepare(1,1);
SELECT outer_test_prepare(1,1);
SELECT outer_test_prepare(1,1);
-- All the above gets delegated and should see 5 * 4 rows
SELECT COUNT(*) FROM table_test_prepare;
-- 6th execution will be generic plan and should get delegated
SELECT outer_test_prepare(1,1);
SELECT outer_test_prepare(1,1);
END;
-- Fails as expected
SELECT outer_test_prepare(1,2);
SELECT COUNT(*) FROM table_test_prepare;
RESET client_min_messages;
SET citus.log_remote_commands TO off;
DROP SCHEMA forcepushdown_schema CASCADE;

View File

@ -346,9 +346,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('non_existing_type', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
-- check the sanity of distributionArgumentIndex and colocationId
@ -356,18 +356,18 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -100, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], -1, -1, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
-- check with non-existing object
@ -375,9 +375,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['non_existing_user']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
-- since citus_internal_add_object_metadata is strict function returns NULL
@ -386,9 +386,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('role', ARRAY['metadata_sync_helper_role']::text[], ARRAY[]::text[], 0, NULL::int, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
\c - postgres - :worker_1_port
@ -404,9 +404,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
CREATE PUBLICATION publication_test FOR TABLE publication_test_table;
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('publication', ARRAY['publication_test']::text[], ARRAY[]::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
-- Show that citus_internal_add_object_metadata checks the priviliges
@ -420,9 +420,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
LANGUAGE SQL;
SET ROLE metadata_sync_helper_role;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid)
AS (VALUES ('function', ARRAY['distribution_test_function']::text[], ARRAY['integer']::text[], -1, 0))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation)
AS (VALUES ('function', ARRAY['distribution_test_function']::text[], ARRAY['integer']::text[], -1, 0, false))
SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) FROM distributed_object_data;
ROLLBACK;
-- we do not allow wrong partmethod

View File

@ -9,5 +9,6 @@ SELECT attrelid::regclass, attname, atthasmissing, attmissingval
FROM pg_attribute
WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass,
'pg_dist_rebalance_strategy'::regclass,
'pg_dist_partition'::regclass)
'pg_dist_partition'::regclass,
'citus.pg_dist_object'::regclass)
ORDER BY attrelid, attname;