diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 1a204cfe4..ef7fd887d 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -26,6 +26,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" #include "distributed/adaptive_executor.h" +#include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/shard_pruning.h" #include "distributed/tuple_destination.h" @@ -85,13 +86,6 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, return false; } - if (procedure->distributionArgIndex < 0 || - procedure->distributionArgIndex >= list_length(funcExpr->args)) - { - ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index"))); - return false; - } - if (contain_volatile_functions((Node *) funcExpr->args)) { ereport(DEBUG1, (errmsg("arguments in a distributed stored procedure must " @@ -101,51 +95,35 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; + bool colocatedWithReferenceTable = false; if (partitionColumn == NULL) { /* This can happen if colocated with a reference table. Punt for now. */ ereport(DEBUG1, (errmsg( - "cannot push down CALL for reference tables"))); + "will push down CALL for reference tables"))); + colocatedWithReferenceTable = true; + Assert(IsReferenceTable(colocatedRelationId)); + } + + ShardPlacement *placement = NULL; + if (colocatedWithReferenceTable) + { + placement = ShardPlacementForFunctionColocatedWithReferenceTable(distTable); + } + else + { + placement = + ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr, + partitionColumn, distTable, + NULL); + } + + /* return if we could not find a placement */ + if (placement == NULL) + { return false; } - Node *partitionValueNode = (Node *) list_nth(funcExpr->args, - procedure->distributionArgIndex); - partitionValueNode = strip_implicit_coercions(partitionValueNode); - if (!IsA(partitionValueNode, Const)) - { - ereport(DEBUG1, (errmsg("distribution argument value must be a constant"))); - return false; - } - - Const *partitionValue = (Const *) partitionValueNode; - if (partitionValue->consttype != partitionColumn->vartype) - { - bool missingOk = false; - - partitionValue = - TransformPartitionRestrictionValue(partitionColumn, partitionValue, - missingOk); - } - - Datum partitionValueDatum = partitionValue->constvalue; - ShardInterval *shardInterval = FindShardInterval(partitionValueDatum, distTable); - if (shardInterval == NULL) - { - ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval"))); - return false; - } - - List *placementList = ActiveShardPlacementList(shardInterval->shardId); - if (list_length(placementList) != 1) - { - /* punt on this for now */ - ereport(DEBUG1, (errmsg( - "cannot push down CALL for replicated distributed tables"))); - return false; - } - - ShardPlacement *placement = (ShardPlacement *) linitial(placementList); WorkerNode *workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) { @@ -175,7 +153,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->dependentTaskList = NIL; task->anchorShardId = placement->shardId; task->relationShardList = NIL; - task->taskPlacementList = placementList; + task->taskPlacementList = list_make1(placement); /* * We are delegating the distributed transaction to the worker, so we diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index c27d8f671..963d04808 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -46,6 +46,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/namespace_utils.h" +#include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/version_compat.h" #include "distributed/worker_create_or_replace.h" @@ -89,6 +90,18 @@ static ObjectAddress FunctionToObjectAddress(ObjectType objectType, static void ErrorIfUnsupportedAlterFunctionStmt(AlterFunctionStmt *stmt); static void ErrorIfFunctionDependsOnExtension(const ObjectAddress *functionAddress); static char * quote_qualified_func_name(Oid funcOid); +static void DistributeFunctionWithDistributionArgument(RegProcedure funcOid, + char *distributionArgumentName, + Oid distributionArgumentOid, + char *colocateWithTableName, + const ObjectAddress * + functionAddress); +static void DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid, + char *colocateWithTableName, + const ObjectAddress * + functionAddress); +static void DistributeFunctionColocatedWithReferenceTable(const + ObjectAddress *functionAddress); PG_FUNCTION_INFO_V1(create_distributed_function); @@ -109,9 +122,8 @@ create_distributed_function(PG_FUNCTION_ARGS) StringInfoData ddlCommand = { 0 }; ObjectAddress functionAddress = { 0 }; - int distributionArgumentIndex = -1; Oid distributionArgumentOid = InvalidOid; - int colocationId = -1; + bool colocatedWithReferenceTable = false; char *distributionArgumentName = NULL; char *colocateWithTableName = NULL; @@ -150,6 +162,13 @@ create_distributed_function(PG_FUNCTION_ARGS) { colocateWithText = PG_GETARG_TEXT_P(2); colocateWithTableName = text_to_cstring(colocateWithText); + + /* check if the colocation belongs to a reference table */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + { + Oid colocationRelationId = ResolveRelationId(colocateWithText, false); + colocatedWithReferenceTable = IsReferenceTable(colocationRelationId); + } } EnsureCoordinator(); @@ -174,53 +193,117 @@ create_distributed_function(PG_FUNCTION_ARGS) MarkObjectDistributed(&functionAddress); - if (distributionArgumentName == NULL) + if (distributionArgumentName != NULL) { - /* cannot provide colocate_with without distribution_arg_name */ - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) - { - char *functionName = get_func_name(funcOid); - - - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot distribute the function \"%s\" since the " - "distribution argument is not valid ", functionName), - errhint("To provide \"colocate_with\" option, the" - " distribution argument parameter should also " - "be provided"))); - } - - /* set distribution argument and colocationId to NULL */ - UpdateFunctionDistributionInfo(&functionAddress, NULL, NULL); + DistributeFunctionWithDistributionArgument(funcOid, distributionArgumentName, + distributionArgumentOid, + colocateWithTableName, + &functionAddress); } - else if (distributionArgumentName != NULL) + else if (!colocatedWithReferenceTable) { - /* get the argument index, or error out if we cannot find a valid index */ - distributionArgumentIndex = - GetDistributionArgIndex(funcOid, distributionArgumentName, - &distributionArgumentOid); - - /* get the colocation id, or error out if we cannot find an appropriate one */ - colocationId = - GetFunctionColocationId(funcOid, colocateWithTableName, - distributionArgumentOid); - - /* if provided, make sure to record the distribution argument and colocationId */ - UpdateFunctionDistributionInfo(&functionAddress, &distributionArgumentIndex, - &colocationId); - - /* - * Once we have at least one distributed function/procedure with distribution - * argument, we sync the metadata to nodes so that the function/procedure - * delegation can be handled locally on the nodes. - */ - TriggerSyncMetadataToPrimaryNodes(); + DistributeFunctionColocatedWithDistributedTable(funcOid, colocateWithTableName, + &functionAddress); + } + else if (colocatedWithReferenceTable) + { + DistributeFunctionColocatedWithReferenceTable(&functionAddress); } PG_RETURN_VOID(); } +/* + * DistributeFunctionWithDistributionArgument updates pg_dist_object records for + * a function/procedure that has a distribution argument, and triggers metadata + * sync so that the functions can be delegated on workers. + */ +static void +DistributeFunctionWithDistributionArgument(RegProcedure funcOid, + char *distributionArgumentName, + Oid distributionArgumentOid, + char *colocateWithTableName, + const ObjectAddress *functionAddress) +{ + /* get the argument index, or error out if we cannot find a valid index */ + int distributionArgumentIndex = + GetDistributionArgIndex(funcOid, distributionArgumentName, + &distributionArgumentOid); + + /* get the colocation id, or error out if we cannot find an appropriate one */ + int colocationId = + GetFunctionColocationId(funcOid, colocateWithTableName, + distributionArgumentOid); + + /* record the distribution argument and colocationId */ + UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex, + &colocationId); + + /* + * Once we have at least one distributed function/procedure with distribution + * argument, we sync the metadata to nodes so that the function/procedure + * delegation can be handled locally on the nodes. + */ + TriggerSyncMetadataToPrimaryNodes(); +} + + +/* + * DistributeFunctionColocatedWithDistributedTable updates pg_dist_object records for + * a function/procedure that is colocated with a distributed table. + */ +static void +DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid, + char *colocateWithTableName, + const ObjectAddress *functionAddress) +{ + /* + * cannot provide colocate_with without distribution_arg_name when the function + * is not collocated with a reference table + */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + { + char *functionName = get_func_name(funcOid); + + + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot distribute the function \"%s\" since the " + "distribution argument is not valid ", functionName), + errhint("To provide \"colocate_with\" option with a" + " distributed table, the distribution argument" + " parameter should also be provided"))); + } + + /* set distribution argument and colocationId to NULL */ + UpdateFunctionDistributionInfo(functionAddress, NULL, NULL); +} + + +/* + * DistributeFunctionColocatedWithReferenceTable updates pg_dist_object records for + * a function/procedure that is colocated with a reference table. + */ +static void +DistributeFunctionColocatedWithReferenceTable(const ObjectAddress *functionAddress) +{ + /* get the reference table colocation id */ + int colocationId = CreateReferenceTableColocationId(); + + /* set distribution argument to NULL and colocationId to the reference table colocation id */ + int *distributionArgumentIndex = NULL; + UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex, + &colocationId); + + /* + * Once we have at least one distributed function/procedure that reads + * from a reference table, we sync the metadata to nodes so that the + * function/procedure delegation can be handled locally on the nodes. + */ + TriggerSyncMetadataToPrimaryNodes(); +} + + /* * CreateFunctionDDLCommandsIdempotent returns a list of DDL statements (const char *) to be * executed on a node to recreate the function addressed by the functionAddress. @@ -419,7 +502,8 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp char sourceDistributionMethod = sourceTableEntry->partitionMethod; char sourceReplicationModel = sourceTableEntry->replicationModel; - if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) + if (sourceDistributionMethod != DISTRIBUTE_BY_HASH && + sourceDistributionMethod != DISTRIBUTE_BY_NONE) { char *functionName = get_func_name(functionOid); char *sourceRelationName = get_rel_name(sourceRelationId); @@ -427,8 +511,21 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot colocate function \"%s\" and table \"%s\" because " "colocate_with option is only supported for hash " - "distributed tables.", functionName, - sourceRelationName))); + "distributed tables and reference tables.", + functionName, sourceRelationName))); + } + + if (sourceDistributionMethod == DISTRIBUTE_BY_NONE && + distributionColumnType != InvalidOid) + { + char *functionName = get_func_name(functionOid); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot colocate function \"%s\" and table \"%s\" because " + "distribution arguments are not supported when " + "colocating with reference tables.", + functionName, sourceRelationName))); } if (sourceReplicationModel != REPLICATION_MODEL_STREAMING) diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 53774bc09..e2ba1cbcb 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -407,6 +407,31 @@ CompareShardPlacementsByWorker(const void *leftElement, const void *rightElement } +/* + * CompareShardPlacementsByGroupId compares two shard placements by their + * group id. + */ +int +CompareShardPlacementsByGroupId(const void *leftElement, const void *rightElement) +{ + const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); + const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); + + if (leftPlacement->groupId > rightPlacement->groupId) + { + return 1; + } + else if (leftPlacement->groupId < rightPlacement->groupId) + { + return -1; + } + else + { + return 0; + } +} + + /* * TableShardReplicationFactor returns the current replication factor of the * given relation by looking into shard placements. It errors out if there diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 7f4d678b1..533a26689 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -28,6 +28,7 @@ #include "distributed/insert_select_planner.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" @@ -58,6 +59,7 @@ struct ParamWalkerContext static bool contain_param_walker(Node *node, void *context); + /* * contain_param_walker scans node for Param nodes. * Ignore the return value, instead check context afterwards. @@ -105,10 +107,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) FuncExpr *funcExpr = NULL; DistObjectCacheEntry *procedure = NULL; Oid colocatedRelationId = InvalidOid; - Const *partitionValue = NULL; - Datum partitionValueDatum = 0; - ShardInterval *shardInterval = NULL; - List *placementList = NIL; + bool colocatedWithReferenceTable = false; CitusTableCacheEntry *distTable = NULL; Var *partitionColumn = NULL; ShardPlacement *placement = NULL; @@ -246,13 +245,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - if (procedure->distributionArgIndex < 0 || - procedure->distributionArgIndex >= list_length(funcExpr->args)) - { - ereport(DEBUG1, (errmsg("function call does not have a distribution argument"))); - return NULL; - } - if (contain_volatile_functions((Node *) funcExpr->args)) { ereport(DEBUG1, (errmsg("arguments in a distributed function must " @@ -271,30 +263,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) partitionColumn = distTable->partitionColumn; if (partitionColumn == NULL) { - /* This can happen if colocated with a reference table. Punt for now. */ - ereport(DEBUG1, (errmsg( - "cannnot push down function call for reference tables"))); - return NULL; - } - - partitionValue = (Const *) list_nth(funcExpr->args, procedure->distributionArgIndex); - - if (IsA(partitionValue, Param)) - { - Param *partitionParam = (Param *) partitionValue; - - if (partitionParam->paramkind == PARAM_EXTERN) - { - /* Don't log a message, we should end up here again without a parameter */ - DissuadePlannerFromUsingPlan(planContext->plan); - return NULL; - } - } - - if (!IsA(partitionValue, Const)) - { - ereport(DEBUG1, (errmsg("distribution argument value must be a constant"))); - return NULL; + colocatedWithReferenceTable = true; } /* @@ -308,33 +277,24 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - partitionValueDatum = partitionValue->constvalue; - - if (partitionValue->consttype != partitionColumn->vartype) + if (colocatedWithReferenceTable) { - bool missingOk = false; - partitionValue = - TransformPartitionRestrictionValue(partitionColumn, partitionValue, - missingOk); + placement = ShardPlacementForFunctionColocatedWithReferenceTable(distTable); + } + else + { + placement = ShardPlacementForFunctionColocatedWithDistTable(procedure, funcExpr, + partitionColumn, + distTable, + planContext->plan); } - shardInterval = FindShardInterval(partitionValueDatum, distTable); - if (shardInterval == NULL) + /* return if we could not find a placement */ + if (placement == NULL) { - ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval"))); - return NULL; + return false; } - placementList = ActiveShardPlacementList(shardInterval->shardId); - if (list_length(placementList) != 1) - { - /* punt on this for now */ - ereport(DEBUG1, (errmsg( - "cannot push down function call for replicated distributed tables"))); - return NULL; - } - - placement = (ShardPlacement *) linitial(placementList); workerNode = FindWorkerNode(placement->nodeName, placement->nodePort); if (workerNode == NULL || !workerNode->hasMetadata || !workerNode->metadataSynced) @@ -364,9 +324,9 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) task = CitusMakeNode(Task); task->taskType = READ_TASK; - task->taskPlacementList = placementList; + task->taskPlacementList = list_make1(placement); SetTaskQueryIfShouldLazyDeparse(task, planContext->query); - task->anchorShardId = shardInterval->shardId; + task->anchorShardId = placement->shardId; task->replicationModel = distTable->replicationModel; job = CitusMakeNode(Job); @@ -384,3 +344,104 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return FinalizePlan(planContext->plan, distributedPlan); } + + +/* + * ShardPlacementForFunctionColocatedWithDistTable decides on a placement + * for delegating a procedure call that accesses a distributed table. + */ +ShardPlacement * +ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure, + FuncExpr *funcExpr, + Var *partitionColumn, + CitusTableCacheEntry *cacheEntry, + PlannedStmt *plan) +{ + if (procedure->distributionArgIndex < 0 || + procedure->distributionArgIndex >= list_length(funcExpr->args)) + { + ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index"))); + return NULL; + } + + Node *partitionValueNode = (Node *) list_nth(funcExpr->args, + procedure->distributionArgIndex); + partitionValueNode = strip_implicit_coercions(partitionValueNode); + + if (IsA(partitionValueNode, Param)) + { + Param *partitionParam = (Param *) partitionValueNode; + + if (partitionParam->paramkind == PARAM_EXTERN) + { + /* Don't log a message, we should end up here again without a parameter */ + DissuadePlannerFromUsingPlan(plan); + return NULL; + } + } + + if (!IsA(partitionValueNode, Const)) + { + ereport(DEBUG1, (errmsg("distribution argument value must be a constant"))); + return NULL; + } + + Const *partitionValue = (Const *) partitionValueNode; + + if (partitionValue->consttype != partitionColumn->vartype) + { + bool missingOk = false; + partitionValue = + TransformPartitionRestrictionValue(partitionColumn, partitionValue, + missingOk); + } + + Datum partitionValueDatum = partitionValue->constvalue; + ShardInterval *shardInterval = FindShardInterval(partitionValueDatum, cacheEntry); + if (shardInterval == NULL) + { + ereport(DEBUG1, (errmsg("cannot push down call, failed to find shard interval"))); + return NULL; + } + + List *placementList = ActiveShardPlacementList(shardInterval->shardId); + if (list_length(placementList) != 1) + { + /* punt on this for now */ + ereport(DEBUG1, (errmsg( + "cannot push down function call for replicated distributed tables"))); + return NULL; + } + + return linitial(placementList); +} + + +/* + * ShardPlacementForFunctionColocatedWithReferenceTable decides on a placement for delegating + * a function call that reads from a reference table. + * + * If citus.task_assignment_policy is set to round-robin, we assign a different placement + * on consecutive runs. Otherwise the function returns the first placement available. + */ +ShardPlacement * +ShardPlacementForFunctionColocatedWithReferenceTable(CitusTableCacheEntry *cacheEntry) +{ + const ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0]; + const uint64 referenceTableShardId = shardInterval->shardId; + + /* Get the list of active shard placements ordered by the groupid */ + List *placementList = ActiveShardPlacementList(referenceTableShardId); + placementList = SortList(placementList, CompareShardPlacementsByGroupId); + + /* do not try to delegate to coordinator even if it is in metadata */ + placementList = RemoveCoordinatorPlacementIfNotSingleNode(placementList); + + if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) + { + /* reorder the placement list */ + placementList = RoundRobinReorder(placementList); + } + + return (ShardPlacement *) linitial(placementList); +} diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 42f69cc89..f1efcef02 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1,3 +1,4 @@ + /*------------------------------------------------------------------------- * * multi_router_planner.c @@ -164,7 +165,6 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved); static bool RowLocksOnRelations(Node *node, List **rtiLockList); -static List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, @@ -1836,7 +1836,7 @@ ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, * If the list has a single element or no placements on the coordinator, the list * returned is unmodified. */ -static List * +List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList) { ListCell *placementCell = NULL; diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 9e28dadd8..4c4f8b0b0 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -149,6 +149,14 @@ extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray, Oid shardIntervalCollation, FmgrInfo *shardIntervalSortCompareFunction); +extern ShardPlacement * ShardPlacementForFunctionColocatedWithReferenceTable( + CitusTableCacheEntry *cacheEntry); +extern ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable( + DistObjectCacheEntry *procedure, FuncExpr *funcExpr, Var *partitionColumn, + CitusTableCacheEntry + *cacheEntry, + PlannedStmt *plan); + extern bool CitusHasBeenLoaded(void); extern bool CheckCitusVersion(int elevel); extern bool CheckAvailableVersion(int elevel); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 84b6ddf82..9c5019af4 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -114,6 +114,7 @@ extern List * AllShardPlacementsOnNodeGroup(int32 groupId); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery); +extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, @@ -161,6 +162,8 @@ extern Datum StringToDatum(char *inputString, Oid dataType); extern char * DatumToString(Datum datum, Oid dataType); extern int CompareShardPlacementsByWorker(const void *leftElement, const void *rightElement); +extern int CompareShardPlacementsByGroupId(const void *leftElement, + const void *rightElement); extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray, Oid intervalTypeId, diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 71973f2cb..7cfbcbbdb 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -686,20 +686,11 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val -- colocate_with cannot be used without distribution key SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_2'); ERROR: cannot distribute the function "eq_with_param_names" since the distribution argument is not valid -HINT: To provide "colocate_with" option, the distribution argument parameter should also be provided +HINT: To provide "colocate_with" option with a distributed table, the distribution argument parameter should also be provided -- a function cannot be colocated with a local table CREATE TABLE replicated_table_func_test_3 (a macaddr8); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); ERROR: relation replicated_table_func_test_3 is not distributed --- a function cannot be colocated with a reference table -SELECT create_reference_table('replicated_table_func_test_3'); - create_reference_table ---------------------------------------------------------------------- - -(1 row) - -SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); -ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test_3" because colocate_with option is only supported for hash distributed tables. -- finally, colocate the function with a distributed table SET citus.shard_replication_factor TO 1; CREATE TABLE replicated_table_func_test_4 (a macaddr); @@ -743,6 +734,22 @@ WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass t (1 row) +-- a function cannot be colocated with a reference table when a distribution column is provided +SELECT create_reference_table('replicated_table_func_test_3'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); +ERROR: cannot colocate function "eq_with_param_names" and table "replicated_table_func_test_3" because distribution arguments are not supported when colocating with reference tables. +-- a function can be colocated with a reference table when the distribution argument is omitted +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_3'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- function with a macaddr8 dist. arg can be colocated with macaddr -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 7b2024deb..a3cd98351 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -37,7 +37,7 @@ SELECT verify_metadata('localhost', :worker_1_port), t | t (1 row) -CREATE TABLE ref(a int); +CREATE TABLE ref(groupid int); SELECT create_reference_table('ref'); create_reference_table --------------------------------------------------------------------- @@ -92,6 +92,54 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM mx_add_coo 0 (1 row) +-- test that distributed functions also use local execution +CREATE OR REPLACE FUNCTION my_group_id() +RETURNS void +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO mx_add_coordinator.ref(groupid) VALUES (gid); +END; +$$; +SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); +DEBUG: switching to sequential query execution mode +DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); +DEBUG: pushing down the function call + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); +DEBUG: pushing down the function call + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +NOTICE: executing the command locally: SELECT DISTINCT groupid FROM mx_add_coordinator.ref_7000000 ref ORDER BY groupid + groupid +--------------------------------------------------------------------- + 14 +(1 row) + +TRUNCATE TABLE ref; +NOTICE: executing the command locally: TRUNCATE TABLE mx_add_coordinator.ref_xxxxx CASCADE -- for round-robin policy, always go to workers SET citus.task_assignment_policy TO "round-robin"; SELECT count(*) FROM ref; @@ -118,13 +166,47 @@ DEBUG: Creating router plan 0 (1 row) +SELECT my_group_id(); +DEBUG: pushing down the function call + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); +DEBUG: pushing down the function call + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); +DEBUG: pushing down the function call + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan + groupid +--------------------------------------------------------------------- + 14 + 18 +(2 rows) + +TRUNCATE TABLE ref; +NOTICE: executing the command locally: TRUNCATE TABLE mx_add_coordinator.ref_xxxxx CASCADE -- modifications always go through local shard as well as remote ones INSERT INTO ref VALUES (1); DEBUG: Creating router plan -NOTICE: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1) +NOTICE: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (groupid) VALUES (1) -- get it ready for the next executions TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE mx_add_coordinator.ref_xxxxx CASCADE +ALTER TABLE ref RENAME COLUMN groupid TO a; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (7000000, 'mx_add_coordinator', 'ALTER TABLE ref RENAME COLUMN groupid TO a;') -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public; @@ -183,9 +265,7 @@ SELECT verify_metadata('localhost', :worker_1_port), t | t (1 row) +SET client_min_messages TO error; DROP SCHEMA mx_add_coordinator CASCADE; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table ref -drop cascades to table ref_7000000 SET search_path TO DEFAULT; RESET client_min_messages; diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 968324bf4..0453bb752 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -59,6 +59,56 @@ select create_distributed_table('mx_call_dist_table_enum', 'key'); (1 row) insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +-- test that a distributed function can be colocated with a reference table +CREATE TABLE ref(groupid int); +SELECT create_reference_table('ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE PROCEDURE my_group_id_proc() +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO ref(groupid) VALUES (gid); +END; +$$; +SELECT create_distributed_function('my_group_id_proc()', colocate_with := 'ref'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 +(1 row) + +TRUNCATE TABLE ref; +-- test round robin task assignment policy uses different workers on consecutive procedure calls. +SET citus.task_assignment_policy TO 'round-robin'; +CALL my_group_id_proc(); +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 + 18 +(2 rows) + +TRUNCATE TABLE ref; +RESET citus.task_assignment_policy; CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$ BEGIN @@ -281,24 +331,19 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment 29 (1 row) --- We don't currently support colocating with reference tables -select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +-- We support colocating with reference tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, NULL); colocate_proc_with_table --------------------------------------------------------------------- (1 row) call multi_mx_call.mx_call_proc(2, 0); -DEBUG: cannot push down CALL for reference tables -DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment +DEBUG: will push down CALL for reference tables +DEBUG: pushing down the procedure y --------------------------------------------------------------------- - 29 + 28 (1 row) -- We don't currently support colocating with replicated tables @@ -309,7 +354,7 @@ select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_replica'::re (1 row) call multi_mx_call.mx_call_proc(2, 0); -DEBUG: cannot push down CALL for replicated distributed tables +DEBUG: cannot push down function call for replicated distributed tables DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_call.mx_call_dist_table_1 t1 join multi_mx_call.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment @@ -485,4 +530,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line 8 at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 11 other objects +NOTICE: drop cascades to 13 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 9702e9779..10e5bf231 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -280,7 +280,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass (1 row) select mx_call_func(2, 0); -DEBUG: function call does not have a distribution argument +DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment @@ -299,7 +299,7 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_1'::regclass (1 row) select mx_call_func(2, 0); -DEBUG: function call does not have a distribution argument +DEBUG: cannot push down invalid distribution_argument_index DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment @@ -319,16 +319,10 @@ select colocate_proc_with_table('mx_call_func', 'mx_call_dist_table_ref'::regcla (1 row) select mx_call_func(2, 0); -DEBUG: cannnot push down function call for reference tables -DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (3 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) -CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" -PL/pgSQL function mx_call_func(integer,integer) line 8 at assignment +DEBUG: pushing down the function call mx_call_func --------------------------------------------------------------------- - 29 + 28 (1 row) -- We don't currently support colocating with replicated tables diff --git a/src/test/regress/expected/multi_mx_function_table_reference.out b/src/test/regress/expected/multi_mx_function_table_reference.out index 0abf6ca1a..2dce631d3 100644 --- a/src/test/regress/expected/multi_mx_function_table_reference.out +++ b/src/test/regress/expected/multi_mx_function_table_reference.out @@ -82,11 +82,89 @@ SELECT run_command_on_workers($$SELECT atttypmod FROM pg_attribute WHERE attnum (localhost,57638,t,262152) (2 rows) +-- test that a distributed function can be colocated with a reference table +CREATE TABLE ref(groupid int); +SELECT create_reference_table('ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION my_group_id() +RETURNS void +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO ref(groupid) VALUES (gid); +END; +$$; +SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 +(1 row) + +TRUNCATE TABLE ref; +-- test round robin task assignment policy uses different workers on consecutive function calls. +SET citus.task_assignment_policy TO 'round-robin'; +SELECT my_group_id(); + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT my_group_id(); + my_group_id +--------------------------------------------------------------------- + +(1 row) + +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; + groupid +--------------------------------------------------------------------- + 14 + 18 +(2 rows) + +TRUNCATE TABLE ref; -- clean up after testing +RESET citus.task_assignment_policy; DROP SCHEMA function_table_reference CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table zoop_table drop cascades to function zoop(integer) +drop cascades to table ref +drop cascades to function my_group_id() -- make sure the worker is added at the end irregardless of anything failing to not make -- subsequent tests fail as well. All artifacts created during this test should have been -- dropped by the drop cascade above. diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 57ec04ed7..cc688c4ef 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -398,10 +398,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colo CREATE TABLE replicated_table_func_test_3 (a macaddr8); SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); --- a function cannot be colocated with a reference table -SELECT create_reference_table('replicated_table_func_test_3'); -SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); - -- finally, colocate the function with a distributed table SET citus.shard_replication_factor TO 1; CREATE TABLE replicated_table_func_test_4 (a macaddr); @@ -423,6 +419,13 @@ FROM pg_dist_partition, citus.pg_dist_object as objects WHERE pg_dist_partition.logicalrelid = 'replicated_table_func_test_4'::regclass AND objects.objid = 'eq_with_param_names(macaddr, macaddr)'::regprocedure; +-- a function cannot be colocated with a reference table when a distribution column is provided +SELECT create_reference_table('replicated_table_func_test_3'); +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', 'val1', colocate_with:='replicated_table_func_test_3'); + +-- a function can be colocated with a reference table when the distribution argument is omitted +SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', colocate_with:='replicated_table_func_test_3'); + -- function with a macaddr8 dist. arg can be colocated with macaddr -- column of a distributed table. In general, if there is a coercion -- path, we rely on postgres for implicit coersions, and users for explicit coersions diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index 4b7821672..a8bab6ce0 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -21,7 +21,7 @@ SELECT wait_until_metadata_sync(30000); SELECT verify_metadata('localhost', :worker_1_port), verify_metadata('localhost', :worker_2_port); -CREATE TABLE ref(a int); +CREATE TABLE ref(groupid int); SELECT create_reference_table('ref'); -- alter role from mx worker isn't propagated @@ -44,17 +44,44 @@ SET client_min_messages TO DEBUG; SELECT count(*) FROM ref; SELECT count(*) FROM ref; +-- test that distributed functions also use local execution +CREATE OR REPLACE FUNCTION my_group_id() +RETURNS void +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO mx_add_coordinator.ref(groupid) VALUES (gid); +END; +$$; +SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); +SELECT my_group_id(); +SELECT my_group_id(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; -- for round-robin policy, always go to workers SET citus.task_assignment_policy TO "round-robin"; SELECT count(*) FROM ref; SELECT count(*) FROM ref; SELECT count(*) FROM ref; +SELECT my_group_id(); +SELECT my_group_id(); +SELECT my_group_id(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; + -- modifications always go through local shard as well as remote ones INSERT INTO ref VALUES (1); -- get it ready for the next executions TRUNCATE ref; +ALTER TABLE ref RENAME COLUMN groupid TO a; -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port @@ -86,6 +113,7 @@ SELECT wait_until_metadata_sync(30000); SELECT verify_metadata('localhost', :worker_1_port), verify_metadata('localhost', :worker_2_port); +SET client_min_messages TO error; DROP SCHEMA mx_add_coordinator CASCADE; SET search_path TO DEFAULT; RESET client_min_messages; diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index 1cf074f15..b39965b99 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -40,6 +40,40 @@ create table mx_call_dist_table_enum(id int, key mx_call_enum); select create_distributed_table('mx_call_dist_table_enum', 'key'); insert into mx_call_dist_table_enum values (1,'S'),(2,'A'),(3,'D'),(4,'F'); +-- test that a distributed function can be colocated with a reference table +CREATE TABLE ref(groupid int); +SELECT create_reference_table('ref'); + +CREATE OR REPLACE PROCEDURE my_group_id_proc() +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO ref(groupid) VALUES (gid); +END; +$$; + +SELECT create_distributed_function('my_group_id_proc()', colocate_with := 'ref'); + +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; + +-- test round robin task assignment policy uses different workers on consecutive procedure calls. +SET citus.task_assignment_policy TO 'round-robin'; +CALL my_group_id_proc(); +CALL my_group_id_proc(); +CALL my_group_id_proc(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; + +RESET citus.task_assignment_policy; CREATE PROCEDURE mx_call_proc(x int, INOUT y int) LANGUAGE plpgsql AS $$ @@ -120,8 +154,8 @@ call multi_mx_call.mx_call_proc(2, 0); select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 2); call multi_mx_call.mx_call_proc(2, 0); --- We don't currently support colocating with reference tables -select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, 1); +-- We support colocating with reference tables +select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_ref'::regclass, NULL); call multi_mx_call.mx_call_proc(2, 0); -- We don't currently support colocating with replicated tables diff --git a/src/test/regress/sql/multi_mx_function_table_reference.sql b/src/test/regress/sql/multi_mx_function_table_reference.sql index 59647e8f5..e6efaff15 100644 --- a/src/test/regress/sql/multi_mx_function_table_reference.sql +++ b/src/test/regress/sql/multi_mx_function_table_reference.sql @@ -50,7 +50,42 @@ SELECT public.wait_until_metadata_sync(30000); -- see numerictypmodin in postgres for how typmod is derived SELECT run_command_on_workers($$SELECT atttypmod FROM pg_attribute WHERE attnum = 2 AND attrelid = (SELECT typrelid FROM pg_type WHERE typname = 'zoop_table');$$); +-- test that a distributed function can be colocated with a reference table +CREATE TABLE ref(groupid int); +SELECT create_reference_table('ref'); + +CREATE OR REPLACE FUNCTION my_group_id() +RETURNS void +LANGUAGE plpgsql +SET search_path FROM CURRENT +AS $$ +DECLARE + gid int; +BEGIN + SELECT groupid INTO gid + FROM pg_dist_local_group; + + INSERT INTO ref(groupid) VALUES (gid); +END; +$$; + +SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); + +SELECT my_group_id(); +SELECT my_group_id(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; + +-- test round robin task assignment policy uses different workers on consecutive function calls. +SET citus.task_assignment_policy TO 'round-robin'; +SELECT my_group_id(); +SELECT my_group_id(); +SELECT my_group_id(); +SELECT DISTINCT(groupid) FROM ref ORDER BY 1; +TRUNCATE TABLE ref; + -- clean up after testing +RESET citus.task_assignment_policy; DROP SCHEMA function_table_reference CASCADE; -- make sure the worker is added at the end irregardless of anything failing to not make