diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index dda4eb3de..af319f0ce 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -24,6 +24,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" +#include "distributed/function_call_delegation.h" #include "distributed/metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -46,9 +47,10 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" -static bool CallFuncExprRemotely(CallStmt *callStmt, - DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest); + +/* global variable tracking whether we are in a delegated procedure call */ +bool InDelegatedProcedureCall = false; + /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. @@ -61,28 +63,21 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); - - /* - * If procedure is not distributed or already delegated from another - * node, do not call the procedure remotely. - */ - if (procedure == NULL || !procedure->isDistributed || - IsCitusInitiatedRemoteBackend()) + if (procedure == NULL || !procedure->isDistributed) { return false; } - return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest); -} + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are in a citus-initiated backend handling a CALL to a distributed + * procedure. That means that this is the delegated call. + */ + InDelegatedProcedureCall = true; + return false; + } - -/* - * CallFuncExprRemotely calls a procedure of function on the worker if possible. - */ -static bool -CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest) -{ if (IsMultiStatementTransaction()) { ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction"))); @@ -102,6 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, "be constant expressions"))); return false; } + CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; bool colocatedWithReferenceTable = false; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 44d565a52..33fa28e9a 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -129,6 +129,7 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation, TupleTableSlot *slot, EState *estate); static void ErrorIfTemporaryTable(Oid relationId); +static void ErrorIfForeignTable(Oid relationOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -333,6 +334,7 @@ EnsureCitusTableCanBeCreated(Oid relationOid) EnsureRelationExists(relationOid); EnsureTableOwner(relationOid); ErrorIfTemporaryTable(relationOid); + ErrorIfForeignTable(relationOid); /* * We should do this check here since the codes in the following lines rely @@ -1882,3 +1884,22 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, return false; } + + +/* + * ErrorIfForeignTable errors out if the relation with given relationOid + * is a foreign table. + */ +static void +ErrorIfForeignTable(Oid relationOid) +{ + if (IsForeignTable(relationOid)) + { + char *relname = get_rel_name(relationOid); + char *qualifiedRelname = generate_qualified_relation_name(relationOid); + ereport(ERROR, (errmsg("foreign tables cannot be distributed"), + (errhint("Can add foreign table \"%s\" to metadata by running: " + "SELECT citus_add_local_table_to_metadata($$%s$$);", + relname, qualifiedRelname)))); + } +} diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index f3f231d8c..80801687d 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -404,7 +404,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); + /* send commands to new workers, the current user should a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, + nodePort, + CurrentUserName(), + ddlCommands); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d683a2792..d2d7d9b23 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3340,6 +3340,7 @@ InitializeCopyShardState(CopyShardState *shardState, { ListCell *placementCell = NULL; int failedPlacementCount = 0; + bool hasRemoteCopy = false; MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, @@ -3383,6 +3384,8 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + hasRemoteCopy = true; + MultiConnection *connection = CopyGetPlacementConnection(connectionStateHash, placement, colocatedIntermediateResult); @@ -3427,6 +3430,11 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } + if (hasRemoteCopy) + { + EnsureRemoteTaskExecutionAllowed(); + } + /* * We just error out and code execution should never reach to this * point. This is the case for all tables. diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 83ae78a91..26338570c 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -54,7 +54,6 @@ /* controlled via GUC, should be accessed via GetEnableLocalReferenceForeignKeys() */ bool EnableLocalReferenceForeignKeys = true; - /* Local functions forward declarations for unsupported command checks */ static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement); static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, @@ -1786,6 +1785,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, { return NIL; } + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); QualifyTreeNode((Node *) stmt); ddlJob->targetRelationId = relationId; diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 672a7ce81..520d7f7fd 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -33,7 +33,9 @@ #include "access/attnum.h" #include "access/heapam.h" #include "access/htup_details.h" +#if PG_VERSION_NUM < 140000 #include "access/xact.h" +#endif #include "catalog/catalog.h" #include "catalog/dependency.h" #include "commands/dbcommands.h" @@ -52,7 +54,9 @@ #include "distributed/local_executor.h" #include "distributed/maintenanced.h" #include "distributed/multi_partitioning_utils.h" +#if PG_VERSION_NUM < 140000 #include "distributed/metadata_cache.h" +#endif #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" @@ -91,6 +95,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag); +#if PG_VERSION_NUM >= 140000 +static void set_indexsafe_procflags(void); +#endif static char * SetSearchPathToCurrentSearchPathCommand(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -227,10 +234,20 @@ multi_ProcessUtility(PlannedStmt *pstmt, params, queryEnv, dest, completionTag); StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } } PG_CATCH(); { StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } PG_RE_THROW(); } PG_END_TRY(); @@ -1108,9 +1125,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) /* * Start a new transaction to make sure CONCURRENTLY commands * on localhost do not block waiting for this transaction to finish. + * + * In addition to doing that, we also need to tell other backends + * --including the ones spawned for connections opened to localhost to + * build indexes on shards of this relation-- that concurrent index + * builds can safely ignore us. + * + * Normally, DefineIndex() only does that if index doesn't have any + * predicates (i.e.: where clause) and no index expressions at all. + * However, now that we already called standard process utility, + * index build on the shell table is finished anyway. + * + * The reason behind doing so is that we cannot guarantee not + * grabbing any snapshots via adaptive executor, and the backends + * creating indexes on local shards (if any) might block on waiting + * for current xact of the current backend to finish, which would + * cause self deadlocks that are not detectable. */ if (ddlJob->startNewTransaction) { +#if PG_VERSION_NUM < 140000 + + /* + * Older versions of postgres doesn't have PROC_IN_SAFE_IC flag + * so we cannot use set_indexsafe_procflags in those versions. + * + * For this reason, we do our best to ensure not grabbing any + * snapshots later in the executor. + */ + /* * If cache is not populated, system catalog lookups will cause * the xmin of current backend to change. Then the last phase @@ -1131,8 +1174,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) * will already be in the hash table, hence we won't be holding any snapshots. */ WarmUpConnParamsHash(); +#endif + + /* + * Since it is not certain whether the code-path that we followed + * until reaching here caused grabbing any snapshots or not, we + * need to pop the active snapshot if we had any, to ensure not + * leaking any snapshots. + * + * For example, EnsureCoordinator might return without grabbing + * any snapshots if we didn't receive any invalidation messages + * but the otherwise is also possible. + */ + if (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + } + CommitTransactionCommand(); StartTransactionCommand(); + +#if PG_VERSION_NUM >= 140000 + + /* + * Tell other backends to ignore us, even if we grab any + * snapshots via adaptive executor. + */ + set_indexsafe_procflags(); +#endif } MemoryContext savedContext = CurrentMemoryContext; @@ -1195,6 +1264,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } +#if PG_VERSION_NUM >= 140000 + +/* + * set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags. + * + * The flag is reset automatically at transaction end, so it must be set + * for each transaction. + * + * Copied from pg/src/backend/commands/indexcmds.c + * Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163. + */ +static void +set_indexsafe_procflags(void) +{ + Assert(MyProc->xid == InvalidTransactionId && + MyProc->xmin == InvalidTransactionId); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags |= PROC_IN_SAFE_IC; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); +} + + +#endif + + /* * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements * of shards of a distributed table. The command to be applied is generated by the diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index dc53cdda5..9ca2cbb96 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -36,6 +36,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "storage/ipc.h" #include "utils/hsearch.h" @@ -56,9 +57,7 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -#ifdef USE_ASSERT_CHECKING -static void AssertSingleMetadataConnectionExists(dlist_head *connections); -#endif +static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -420,6 +419,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags) { + List *metadataConnectionCandidateList = NIL; + dlist_iter iter; dlist_foreach(iter, connections) { @@ -473,52 +474,40 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) { /* * The caller requested a metadata connection, and this is not the + * metadata connection. Still, this is a candidate for becoming a * metadata connection. */ + metadataConnectionCandidateList = + lappend(metadataConnectionCandidateList, connection); continue; } - else - { - /* - * Now that we found metadata connection. We do some sanity - * checks. - */ - #ifdef USE_ASSERT_CHECKING - AssertSingleMetadataConnectionExists(connections); - #endif - - /* - * Connection is in use for an ongoing operation. Metadata - * connection cannot be claimed exclusively. - */ - if (connection->claimedExclusively) - { - ereport(ERROR, (errmsg("metadata connections cannot be " - "claimed exclusively"))); - } - } return connection; } - if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections)) + if ((flags & REQUIRE_METADATA_CONNECTION) && + list_length(metadataConnectionCandidateList) > 0) { /* - * Caller asked a metadata connection, and we couldn't find in the - * above list. So, we pick the first connection as the metadata - * connection. + * Caller asked a metadata connection, and we couldn't find a connection + * that has already been used for metadata operations. + * + * So, we pick the first connection as the metadata connection. */ MultiConnection *metadataConnection = - dlist_container(MultiConnection, connectionNode, - dlist_head_node(connections)); + linitial(metadataConnectionCandidateList); + Assert(!metadataConnection->claimedExclusively); /* remember that we use this connection for metadata operations */ metadataConnection->useForMetadataOperations = true; - #ifdef USE_ASSERT_CHECKING - AssertSingleMetadataConnectionExists(connections); - #endif + /* + * We cannot have multiple metadata connections. If we see + * this error, it is likely that there is a bug in connection + * management. + */ + ErrorIfMultipleMetadataConnectionExists(connections); return metadataConnection; } @@ -527,14 +516,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) } -#ifdef USE_ASSERT_CHECKING - /* - * AssertSingleMetadataConnectionExists throws an error if the + * ErrorIfMultipleMetadataConnectionExists throws an error if the * input connection dlist contains more than one metadata connections. */ static void -AssertSingleMetadataConnectionExists(dlist_head *connections) +ErrorIfMultipleMetadataConnectionExists(dlist_head *connections) { bool foundMetadataConnection = false; dlist_iter iter; @@ -556,9 +543,6 @@ AssertSingleMetadataConnectionExists(dlist_head *connections) } -#endif /* USE_ASSERT_CHECKING */ - - /* * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * connections. This is mainly done when citus.node_conninfo changes. @@ -1259,6 +1243,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key } +#if PG_VERSION_NUM < 140000 + /* * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the * conn params for active primary nodes. @@ -1280,6 +1266,9 @@ WarmUpConnParamsHash(void) } +#endif + + /* * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, * if it is not found, it is created. diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 0e9d96fd5..a90e49ced 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1290,6 +1290,12 @@ StartDistributedExecution(DistributedExecution *execution) */ RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList); } + + /* make sure we are not doing remote execution from within a task */ + if (execution->remoteTaskList != NIL) + { + EnsureRemoteTaskExecutionAllowed(); + } } diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 5a535043d..afde1328c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -21,9 +21,11 @@ #include "distributed/citus_custom_scan.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" +#include "distributed/function_call_delegation.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/multi_executor.h" #include "distributed/combine_query_planner.h" @@ -719,3 +721,46 @@ ExecutorBoundParams(void) Assert(ExecutorLevel > 0); return executorBoundParams; } + + +/* + * EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote + * execution from within a task. That could happen when the user calls + * a function in a query that gets pushed down to the worker, and the + * function performs a query on a distributed table. + */ +void +EnsureRemoteTaskExecutionAllowed(void) +{ + if (!InTaskExecution()) + { + /* we are not within a task, distributed execution is allowed */ + return; + } + + ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " + "shard"))); +} + + +/* + * InTaskExecution determines whether we are currently in a task execution. + */ +bool +InTaskExecution(void) +{ + if (LocalExecutorLevel > 0) + { + /* in a local task */ + return true; + } + + /* + * Normally, any query execution within a citus-initiated backend + * is considered a task execution, but an exception is when we + * are in a delegated function/procedure call. + */ + return IsCitusInitiatedRemoteBackend() && + !InDelegatedFunctionCall && + !InDelegatedProcedureCall; +} diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 7acea3623..0a30fdd82 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -992,10 +992,11 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - /* send commands to new workers*/ + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, newWorkerNode->workerPort, - CitusExtensionOwnerName(), + CurrentUserName(), ddlCommands); } } @@ -1208,6 +1209,17 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* + * We currently require the object propagation to happen via superuser, + * see #5139. While activating a node, we sync both metadata and object + * propagation. + * + * In order to have a fully transactional semantics with add/activate + * node operations, we require superuser. Note that for creating + * non-owned objects, we already require a superuser connection. + * By ensuring the current user to be a superuser, we can guarantee + * to send all commands within the same remote transaction. + */ EnsureSuperUser(); /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 639b22f47..b318a3f3c 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -23,6 +23,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commands.h" #include "distributed/cte_inline.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" @@ -71,7 +72,8 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -static bool ListContainsDistributedTableRTE(List *rangeTableList); +static bool ListContainsDistributedTableRTE(List *rangeTableList, + bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); @@ -123,6 +125,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); +static void WarnIfListHasForeignDistributedTable(List *rangeTableList); /* Distributed planner hook */ @@ -149,10 +152,18 @@ distributed_planner(Query *parse, } else if (CitusHasBeenLoaded()) { - needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + bool maybeHasForeignDistributedTable = false; + needsDistributedPlanning = + ListContainsDistributedTableRTE(rangeTableList, + &maybeHasForeignDistributedTable); if (needsDistributedPlanning) { fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); + + if (maybeHasForeignDistributedTable) + { + WarnIfListHasForeignDistributedTable(rangeTableList); + } } } @@ -311,17 +322,19 @@ NeedsDistributedPlanning(Query *query) List *allRTEs = ExtractRangeTableEntryList(query); - return ListContainsDistributedTableRTE(allRTEs); + return ListContainsDistributedTableRTE(allRTEs, NULL); } /* * ListContainsDistributedTableRTE gets a list of range table entries * and returns true if there is at least one distributed relation range - * table entry in the list. + * table entry in the list. The boolean maybeHasForeignDistributedTable + * variable is set to true if the list contains a foreign table. */ static bool -ListContainsDistributedTableRTE(List *rangeTableList) +ListContainsDistributedTableRTE(List *rangeTableList, + bool *maybeHasForeignDistributedTable) { ListCell *rangeTableCell = NULL; @@ -336,6 +349,12 @@ ListContainsDistributedTableRTE(List *rangeTableList) if (IsCitusTable(rangeTableEntry->relid)) { + if (maybeHasForeignDistributedTable != NULL && + IsForeignTable(rangeTableEntry->relid)) + { + *maybeHasForeignDistributedTable = true; + } + return true; } } @@ -2408,3 +2427,37 @@ GetRTEListProperties(List *rangeTableList) return rteListProperties; } + + +/* + * WarnIfListHasForeignDistributedTable iterates the given list and logs a WARNING + * if the given relation is a distributed foreign table. + * We do that because now we only support Citus Local Tables for foreign tables. + */ +static void +WarnIfListHasForeignDistributedTable(List *rangeTableList) +{ + static bool DistributedForeignTableWarningPrompted = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + if (DistributedForeignTableWarningPrompted) + { + return; + } + + Oid relationId = rangeTableEntry->relid; + if (IsForeignTable(relationId) && IsCitusTable(relationId) && + !IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + DistributedForeignTableWarningPrompted = true; + ereport(WARNING, (errmsg( + "support for distributed foreign tables are deprecated, " + "please use Citus managed local tables"), + (errdetail( + "Foreign tables can be added to metadata using UDF: " + "citus_add_local_table_to_metadata()")))); + } + } +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6618e49e1..39cf334c6 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -58,6 +58,10 @@ struct ParamWalkerContext static bool contain_param_walker(Node *node, void *context); +/* global variable keeping track of whether we are in a delegated function call */ +bool InDelegatedFunctionCall = false; + + /* * contain_param_walker scans node for Param nodes. * Ignore the return value, instead check context afterwards. @@ -112,15 +116,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } int32 localGroupId = GetLocalGroupId(); - if (localGroupId != COORDINATOR_GROUP_ID && IsCitusInitiatedRemoteBackend()) - { - /* - * Do not delegate from workers if it is initiated by Citus already. - * It means that this function has already been delegated to this node. - */ - return NULL; - } - if (localGroupId == GROUP_ID_UPGRADING) { /* do not delegate while upgrading */ @@ -218,6 +213,27 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG4, (errmsg("function is distributed"))); } + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are planning a call to a distributed function within a Citus backend, + * that means that this is the delegated call. + */ + InDelegatedFunctionCall = true; + return NULL; + } + + if (localGroupId != COORDINATOR_GROUP_ID) + { + /* + * We are calling a distributed function on a worker node. We currently + * only delegate from the coordinator. + * + * TODO: remove this restriction. + */ + return NULL; + } + /* * Cannot delegate functions for INSERT ... SELECT func(), since they require * coordinated transactions. diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index e0b7d806c..3b5f804b4 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -113,6 +113,12 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort); } + /* pretend we are a regular client to avoid citus-initiated backend checks */ + const char *setAppName = + "SET application_name TO run_commands_on_session_level_connection_to_node"; + + ExecuteCriticalRemoteCommand(singleConnection, setAppName); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index ee9912fe9..4c4958015 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -23,6 +23,7 @@ #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" #include "distributed/distributed_planner.h" +#include "distributed/function_call_delegation.h" #include "distributed/hash_helpers.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" @@ -550,6 +551,7 @@ ResetGlobalVariables() ShouldCoordinatedTransactionUse2PC = false; TransactionModifiedNodeMetadata = false; MetadataSyncOnCommit = false; + InDelegatedFunctionCall = false; ResetWorkerErrorIndication(); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index daf218cf7..00cefead2 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -45,8 +45,9 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery( ShardPlacement *sourceShardPlacement, WorkerNode *workerNode, char transferMode); -static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, - int nodePort); +static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, + char *nodeName, + int nodePort); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); @@ -335,7 +336,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) * table. */ static void -ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) +ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName, + int nodePort) { uint64 shardId = shardInterval->shardId; @@ -350,7 +352,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort); - if (targetPlacement != NULL) { if (targetPlacement->shardState == SHARD_STATE_ACTIVE) @@ -368,8 +369,11 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) get_rel_name(shardInterval->relationId), nodeName, nodePort))); - EnsureNoModificationsHaveBeenDone(); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -586,7 +590,7 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) LockShardDistributionMetadata(shardId, ExclusiveLock); - ReplicateShardToNode(shardInterval, nodeName, nodePort); + ReplicateReferenceTableShardToNode(shardInterval, nodeName, nodePort); } /* create foreign constraints between reference tables */ @@ -594,7 +598,11 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), commandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + commandList); } } } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ead0df8b..1ee18a206 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -18,6 +18,7 @@ #include "tcop/utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/function_call_delegation.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" @@ -37,6 +38,7 @@ extern bool EnableAlterRolePropagation; extern bool EnableAlterRoleSetPropagation; extern bool EnableAlterDatabaseOwner; extern int UtilityHookLevel; +extern bool InDelegatedProcedureCall; /* diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 68dde4fe3..aca9cee0f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -16,6 +16,7 @@ #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -283,5 +284,7 @@ extern void MarkConnectionConnected(MultiConnection *connection); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +#if PG_VERSION_NUM < 140000 extern void WarmUpConnParamsHash(void); +#endif #endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 865ac0ce1..7d3c61aea 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -15,6 +15,14 @@ #include "distributed/multi_physical_planner.h" +/* + * These flags keep track of whether the process is currently in a delegated + * function or procedure call. + */ +extern bool InDelegatedFunctionCall; +extern bool InDelegatedProcedureCall; + + PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 143f5a1c7..3648dbc1b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -149,6 +149,8 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, const char ***parameterValues, bool useOriginalCustomTypeOids); extern ParamListInfo ExecutorBoundParams(void); +extern void EnsureRemoteTaskExecutionAllowed(void); +extern bool InTaskExecution(void); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/test/regress/expected/citus_local_table_triggers.out b/src/test/regress/expected/citus_local_table_triggers.out index ac6906282..1a269c649 100644 --- a/src/test/regress/expected/citus_local_table_triggers.out +++ b/src/test/regress/expected/citus_local_table_triggers.out @@ -392,6 +392,13 @@ BEGIN RETURN NEW; END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -416,6 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig (2 rows) ROLLBACK; +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -436,11 +444,70 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 FOR EACH STATEMENT EXECUTE FUNCTION insert_100();') UPDATE another_citus_local_table SET value=value-1;; NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) +ERROR: cannot execute a distributed query from a query on a shard +ROLLBACK; +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -- we should see two rows with "100" - SELECT * FROM reference_table; -NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.reference_table_1507010 reference_table + SELECT * FROM local_table; + value +--------------------------------------------------------------------- + 100 + 100 +(2 rows) + +ROLLBACK; +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) + -- we should see two rows with "100" + SELECT * FROM local_table; +NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.local_table_1507011 local_table value --------------------------------------------------------------------- 100 @@ -456,11 +523,11 @@ CREATE TABLE par_another_citus_local_table_1 PARTITION OF par_another_citus_loca ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val); ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE; SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true); -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507011'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507012', 'par_another_citus_local_table_1_val_key_1507012')$$) -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507014, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507013, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507012'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507013', 'par_another_citus_local_table_1_val_key_1507013')$$) +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507015, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') citus_add_local_table_to_metadata --------------------------------------------------------------------- @@ -489,7 +556,7 @@ BEGIN; TRUNCATE par_another_citus_local_table CASCADE; NOTICE: truncate cascades to table "par_citus_local_table" NOTICE: truncate cascades to table "par_citus_local_table_1" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" @@ -497,12 +564,12 @@ NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_trigger NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE -- we should see two rows with "100" SELECT * FROM par_reference_table; -NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507015 par_reference_table +NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507016 par_reference_table val --------------------------------------------------------------------- 100 @@ -512,4 +579,4 @@ NOTICE: executing the command locally: SELECT val FROM citus_local_table_trigge ROLLBACK; -- cleanup at exit DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE; -NOTICE: drop cascades to 20 other objects +NOTICE: drop cascades to 22 other objects diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty.out b/src/test/regress/expected/failure_create_distributed_table_non_empty.out index 3de2cd874..13f6cdffa 100644 --- a/src/test/regress/expected/failure_create_distributed_table_non_empty.out +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty.out @@ -872,6 +872,18 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_ 4 (1 row) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 2 +(1 row) + DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); diff --git a/src/test/regress/expected/failure_create_table.out b/src/test/regress/expected/failure_create_table.out index e37060735..14b3daa66 100644 --- a/src/test/regress/expected/failure_create_table.out +++ b/src/test/regress/expected/failure_create_table.out @@ -289,6 +289,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + SELECT count(*) FROM pg_dist_shard; count --------------------------------------------------------------------- @@ -421,7 +427,7 @@ COMMIT; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- - 4 + 0 (1 row) SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index c7ee9d9d1..4cfa1252b 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -12,6 +12,8 @@ SELECT citus.clear_network_traffic(); SET citus.shard_count = 2; SET citus.shard_replication_factor = 2; +-- this test is designed such that no modification lock is acquired +SET citus.allow_modifications_from_workers_to_replicated_tables TO false; CREATE TABLE select_test (key int, value text); SELECT create_distributed_table('select_test', 'key'); create_distributed_table @@ -60,6 +62,12 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following This probably means the server terminated abnormally before or while processing the request. COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + TRUNCATE select_test; -- now the same tests with query cancellation -- put data in shard for which mitm node is first placement @@ -96,6 +104,12 @@ WHERE shardid IN ( 1 (1 row) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail diff --git a/src/test/regress/expected/foreign_tables_mx.out b/src/test/regress/expected/foreign_tables_mx.out index 21c7d6c69..17b1c99b8 100644 --- a/src/test/regress/expected/foreign_tables_mx.out +++ b/src/test/regress/expected/foreign_tables_mx.out @@ -66,16 +66,10 @@ ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check( ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; @@ -85,7 +79,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 @@ -96,7 +90,7 @@ alter foreign table public.foreign_table_newname disable trigger insert_42_trigg INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 @@ -199,36 +193,11 @@ NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table (1 row) +-- both should error out SELECT create_distributed_table('foreign_table','data'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table -NOTICE: dropping the old foreign_tables_schema_mx.foreign_table -NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - +ERROR: foreign tables cannot be distributed SELECT create_reference_table('foreign_table'); - create_reference_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table -NOTICE: dropping the old foreign_tables_schema_mx.foreign_table -NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - +ERROR: foreign tables cannot be distributed INSERT INTO foreign_table_test VALUES (1, 'testt'); SELECT * FROM foreign_table ORDER BY a; data | a diff --git a/src/test/regress/expected/mixed_relkind_tests.out b/src/test/regress/expected/mixed_relkind_tests.out index cb8e50499..20cb6ebac 100644 --- a/src/test/regress/expected/mixed_relkind_tests.out +++ b/src/test/regress/expected/mixed_relkind_tests.out @@ -62,13 +62,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a'); CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; -CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server; -SELECT create_distributed_table('foreign_distributed_table', 'a'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- and insert some data INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); @@ -145,12 +138,6 @@ SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2; 5 | 6 (7 rows) -SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2; - a | b ---------------------------------------------------------------------- - 1 | 1 -(1 row) - SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; ?column? --------------------------------------------------------------------- @@ -378,17 +365,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 455 (1 row) -SELECT COUNT(*) FROM - (SELECT *, random() FROM unlogged_distributed_table) AS foo, - (SELECT *, random() FROM foreign_distributed_table) AS bar -WHERE foo.a = bar.b; -DEBUG: generating subplan XXX_1 for subquery SELECT a, b, random() AS random FROM mixed_relkind_tests.foreign_distributed_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT unlogged_distributed_table.a, unlogged_distributed_table.b, random() AS random FROM mixed_relkind_tests.unlogged_distributed_table) foo, (SELECT intermediate_result.a, intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer, random double precision)) bar WHERE (foo.a OPERATOR(pg_catalog.=) bar.b) - count ---------------------------------------------------------------------- - 0 -(1 row) - UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; DEBUG: Wrapping relation "citus_local_table" "foo" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.citus_local_table foo WHERE true @@ -486,15 +462,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 1014 (1 row) -WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table) - SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a); -DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.foreign_distributed_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.foreign_distributed_table USING (a)) - count ---------------------------------------------------------------------- - 0 -(1 row) - WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table @@ -658,18 +625,6 @@ $Q$); Task Count: 4 (4 rows) -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - coordinator_plan ---------------------------------------------------------------------- - Sort - Sort Key: remote_scan.a, remote_scan.count - -> Custom Scan (Citus Adaptive) - Task Count: 4 -(4 rows) - -- pull to coordinator WINDOW SELECT public.coordinator_plan($Q$ EXPLAIN (COSTS OFF) @@ -686,21 +641,6 @@ $Q$); Task Count: 4 (7 rows) -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - coordinator_plan ---------------------------------------------------------------------- - Sort - Sort Key: remote_scan.a, (count(*) OVER (?)) - -> WindowAgg - -> Sort - Sort Key: remote_scan.worker_column_2 - -> Custom Scan (Citus Adaptive) - Task Count: 4 -(7 rows) - -- FOR UPDATE SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; a | b @@ -737,14 +677,6 @@ BEGIN; --------------------------------------------------------------------- (0 rows) -COMMIT; -BEGIN; - ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE; - SELECT * FROM foreign_distributed_table; - a ---------------------------------------------------------------------- -(0 rows) - COMMIT; -- cleanup at exit DROP SCHEMA mixed_relkind_tests CASCADE; diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index d032a9168..8d9907c34 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -256,6 +256,10 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_ ERROR: permission denied for function master_update_node -- try to manipulate node metadata via privileged user SET ROLE node_metadata_user; +SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); +ERROR: operation is not allowed +HINT: Run the command with a superuser. BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); ?column? @@ -263,43 +267,29 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); - ?column? ----------- - 1 -(1 row) - SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? ---------- 1 (1 row) -SELECT 1 FROM master_add_node('localhost', :worker_2_port); - ?column? ----------- - 1 -(1 row) - -SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port); - ?column? ----------- +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); + ?column? +--------------------------------------------------------------------- 1 (1 row) SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; - master_update_node --------------------- - -(1 row) + master_update_node +--------------------------------------------------------------------- +(0 rows) SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; nodename | nodeport | noderole -----------+----------+----------- localhost | 57637 | primary localhost | 57640 | secondary - localhost | 57641 | primary -(3 rows) +(2 rows) ABORT; \c - postgres - :master_port diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index b4da5f34f..dba77faa8 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -425,14 +425,6 @@ SELECT create_distributed_table('table_range', 'id', 'range'); (1 row) --- test foreign table creation -CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; -SELECT create_distributed_table('table3_groupD', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -458,8 +450,7 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table2_groupc | 6 table1_groupd | 7 table2_groupd | 7 - table3_groupd | 7 -(9 rows) +(8 rows) -- check effects of dropping tables DROP TABLE table1_groupA; @@ -585,13 +576,12 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table2_groupc | 6 table1_groupd | 7 table2_groupd | 7 - table3_groupd | 7 table1_group_none_1 | 8 table2_group_none_1 | 8 table1_group_none_2 | 9 table1_group_none_3 | 10 table1_group_default | 11 -(17 rows) +(16 rows) -- check failing colocate_with options CREATE TABLE table_postgresql( id int ); @@ -621,14 +611,14 @@ ERROR: cannot colocate tables table1_groupe and table_bigint DETAIL: Distribution column types don't match for table1_groupe and table_bigint. -- check worker table schemas \c - - - :worker_1_port -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass; Column | Type | Modifiers --------------------------------------------------------------------- dummy_column | text | id | integer | (2 rows) -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass; Column | Type | Modifiers --------------------------------------------------------------------- id | integer | @@ -685,8 +675,6 @@ ORDER BY table1_groupb | table2_groupb | t table1_groupc | table2_groupc | t table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t table1_groupe | table2_groupe | t table1_groupe | table3_groupe | t table1_groupe | schema_colocation.table4_groupe | t @@ -699,7 +687,7 @@ ORDER BY schema_colocation.table4_groupe | table4_groupe | t table1_group_none_1 | table2_group_none_1 | t table1_groupf | table2_groupf | t -(18 rows) +(16 rows) -- check created shards SELECT @@ -766,71 +754,55 @@ ORDER BY table2_groupd | 1300048 | t | 57638 | 1073741824 | 1610612735 table2_groupd | 1300049 | t | 57637 | 1610612736 | 2147483647 table2_groupd | 1300049 | t | 57638 | 1610612736 | 2147483647 - table3_groupd | 1300050 | f | 57637 | -2147483648 | -1610612737 - table3_groupd | 1300050 | f | 57638 | -2147483648 | -1610612737 - table3_groupd | 1300051 | f | 57637 | -1610612736 | -1073741825 - table3_groupd | 1300051 | f | 57638 | -1610612736 | -1073741825 - table3_groupd | 1300052 | f | 57637 | -1073741824 | -536870913 - table3_groupd | 1300052 | f | 57638 | -1073741824 | -536870913 - table3_groupd | 1300053 | f | 57637 | -536870912 | -1 - table3_groupd | 1300053 | f | 57638 | -536870912 | -1 - table3_groupd | 1300054 | f | 57637 | 0 | 536870911 - table3_groupd | 1300054 | f | 57638 | 0 | 536870911 - table3_groupd | 1300055 | f | 57637 | 536870912 | 1073741823 - table3_groupd | 1300055 | f | 57638 | 536870912 | 1073741823 - table3_groupd | 1300056 | f | 57637 | 1073741824 | 1610612735 - table3_groupd | 1300056 | f | 57638 | 1073741824 | 1610612735 - table3_groupd | 1300057 | f | 57637 | 1610612736 | 2147483647 - table3_groupd | 1300057 | f | 57638 | 1610612736 | 2147483647 - table1_groupe | 1300058 | t | 57637 | -2147483648 | -1 - table1_groupe | 1300058 | t | 57638 | -2147483648 | -1 - table1_groupe | 1300059 | t | 57637 | 0 | 2147483647 - table1_groupe | 1300059 | t | 57638 | 0 | 2147483647 - table2_groupe | 1300060 | t | 57637 | -2147483648 | -1 - table2_groupe | 1300060 | t | 57638 | -2147483648 | -1 - table2_groupe | 1300061 | t | 57637 | 0 | 2147483647 - table2_groupe | 1300061 | t | 57638 | 0 | 2147483647 - table3_groupe | 1300062 | t | 57637 | -2147483648 | -1 - table3_groupe | 1300062 | t | 57638 | -2147483648 | -1 - table3_groupe | 1300063 | t | 57637 | 0 | 2147483647 - table3_groupe | 1300063 | t | 57638 | 0 | 2147483647 - schema_colocation.table4_groupe | 1300064 | t | 57637 | -2147483648 | -1 - schema_colocation.table4_groupe | 1300064 | t | 57638 | -2147483648 | -1 - schema_colocation.table4_groupe | 1300065 | t | 57637 | 0 | 2147483647 - schema_colocation.table4_groupe | 1300065 | t | 57638 | 0 | 2147483647 - table1_group_none_1 | 1300066 | t | 57637 | -2147483648 | -1 - table1_group_none_1 | 1300066 | t | 57638 | -2147483648 | -1 - table1_group_none_1 | 1300067 | t | 57637 | 0 | 2147483647 - table1_group_none_1 | 1300067 | t | 57638 | 0 | 2147483647 - table2_group_none_1 | 1300068 | t | 57637 | -2147483648 | -1 - table2_group_none_1 | 1300068 | t | 57638 | -2147483648 | -1 - table2_group_none_1 | 1300069 | t | 57637 | 0 | 2147483647 - table2_group_none_1 | 1300069 | t | 57638 | 0 | 2147483647 - table1_group_none_2 | 1300070 | t | 57637 | -2147483648 | -1 - table1_group_none_2 | 1300070 | t | 57638 | -2147483648 | -1 - table1_group_none_2 | 1300071 | t | 57637 | 0 | 2147483647 - table1_group_none_2 | 1300071 | t | 57638 | 0 | 2147483647 - table4_groupe | 1300072 | t | 57637 | -2147483648 | -1 - table4_groupe | 1300072 | t | 57638 | -2147483648 | -1 - table4_groupe | 1300073 | t | 57637 | 0 | 2147483647 - table4_groupe | 1300073 | t | 57638 | 0 | 2147483647 - table1_group_none_3 | 1300074 | t | 57637 | -2147483648 | -715827884 - table1_group_none_3 | 1300074 | t | 57638 | -2147483648 | -715827884 - table1_group_none_3 | 1300075 | t | 57637 | -715827883 | 715827881 - table1_group_none_3 | 1300075 | t | 57638 | -715827883 | 715827881 - table1_group_none_3 | 1300076 | t | 57637 | 715827882 | 2147483647 - table1_group_none_3 | 1300076 | t | 57638 | 715827882 | 2147483647 - table1_group_default | 1300077 | t | 57637 | -2147483648 | -715827884 - table1_group_default | 1300077 | t | 57638 | -2147483648 | -715827884 - table1_group_default | 1300078 | t | 57637 | -715827883 | 715827881 - table1_group_default | 1300078 | t | 57638 | -715827883 | 715827881 - table1_group_default | 1300079 | t | 57637 | 715827882 | 2147483647 - table1_group_default | 1300079 | t | 57638 | 715827882 | 2147483647 + table1_groupe | 1300050 | t | 57637 | -2147483648 | -1 + table1_groupe | 1300050 | t | 57638 | -2147483648 | -1 + table1_groupe | 1300051 | t | 57637 | 0 | 2147483647 + table1_groupe | 1300051 | t | 57638 | 0 | 2147483647 + table2_groupe | 1300052 | t | 57637 | -2147483648 | -1 + table2_groupe | 1300052 | t | 57638 | -2147483648 | -1 + table2_groupe | 1300053 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300053 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300054 | t | 57637 | -2147483648 | -1 + table3_groupe | 1300054 | t | 57638 | -2147483648 | -1 + table3_groupe | 1300055 | t | 57637 | 0 | 2147483647 + table3_groupe | 1300055 | t | 57638 | 0 | 2147483647 + schema_colocation.table4_groupe | 1300056 | t | 57637 | -2147483648 | -1 + schema_colocation.table4_groupe | 1300056 | t | 57638 | -2147483648 | -1 + schema_colocation.table4_groupe | 1300057 | t | 57637 | 0 | 2147483647 + schema_colocation.table4_groupe | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300059 | t | 57637 | 0 | 2147483647 + table1_group_none_1 | 1300059 | t | 57638 | 0 | 2147483647 + table2_group_none_1 | 1300060 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300060 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300061 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300061 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300062 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300062 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300063 | t | 57637 | 0 | 2147483647 + table1_group_none_2 | 1300063 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300064 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300064 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300065 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300065 | t | 57638 | 0 | 2147483647 + table1_group_none_3 | 1300066 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300066 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300067 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300067 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300068 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300068 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300069 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300069 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300070 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300070 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300071 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300071 | t | 57638 | 715827882 | 2147483647 table1_groupf | 1300080 | t | 57637 | | table1_groupf | 1300080 | t | 57638 | | table2_groupf | 1300081 | t | 57637 | | table2_groupf | 1300081 | t | 57638 | | -(108 rows) +(92 rows) -- reset colocation ids to test update_distributed_table_colocation ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -862,7 +834,7 @@ ERROR: cannot colocate tables table1_groupd and table1_groupb DETAIL: Shard counts don't match for table1_groupd and table1_groupb. SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupE'); ERROR: cannot colocate tables table1_groupe and table1_groupb -DETAIL: Shard 1300058 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements. +DETAIL: Shard 1300050 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements. SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupF'); ERROR: relation table1_groupf should be a hash distributed table SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupD'); @@ -1355,4 +1327,3 @@ DROP TABLE range_table; DROP TABLE none; DROP TABLE ref; DROP TABLE local_table; -DROP FOREIGN TABLE table3_groupD CASCADE; diff --git a/src/test/regress/expected/multi_create_shards.out b/src/test/regress/expected/multi_create_shards.out index 122aa2081..b7e0778e6 100644 --- a/src/test/regress/expected/multi_create_shards.out +++ b/src/test/regress/expected/multi_create_shards.out @@ -149,44 +149,6 @@ SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r'; 0 (1 row) --- test foreign table creation -CREATE FOREIGN TABLE foreign_table_to_distribute -( - name text, - id bigint -) -SERVER fake_fdw_server; -SET citus.shard_count TO 16; -SET citus.shard_replication_factor TO 1; -SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass - ORDER BY (shardminvalue::integer) ASC; - shardstorage | shardminvalue | shardmaxvalue ---------------------------------------------------------------------- - f | -2147483648 | -1879048193 - f | -1879048192 | -1610612737 - f | -1610612736 | -1342177281 - f | -1342177280 | -1073741825 - f | -1073741824 | -805306369 - f | -805306368 | -536870913 - f | -536870912 | -268435457 - f | -268435456 | -1 - f | 0 | 268435455 - f | 268435456 | 536870911 - f | 536870912 | 805306367 - f | 805306368 | 1073741823 - f | 1073741824 | 1342177279 - f | 1342177280 | 1610612735 - f | 1610612736 | 1879048191 - f | 1879048192 | 2147483647 -(16 rows) - -- test shard creation using weird shard count CREATE TABLE weird_shard_count ( @@ -216,11 +178,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size 613566759 (7 rows) --- cleanup foreign table, related shards and shard placements -DELETE FROM pg_dist_shard_placement - WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass); -DELETE FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; -DELETE FROM pg_dist_partition - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; diff --git a/src/test/regress/expected/multi_generate_ddl_commands.out b/src/test/regress/expected/multi_generate_ddl_commands.out index fa7fdc211..6aae20f9b 100644 --- a/src/test/regress/expected/multi_generate_ddl_commands.out +++ b/src/test/regress/expected/multi_generate_ddl_commands.out @@ -166,62 +166,12 @@ SELECT master_get_table_ddl_events('fiddly_table'); ALTER TABLE public.fiddly_table OWNER TO postgres (3 rows) --- test foreign tables using fake FDW -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8); -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; - table_name | column_name | data_type ---------------------------------------------------------------------- - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610008 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610009 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610010 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610011 | rename_name | character -(4 rows) - -\c - - :master_host :master_port -SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890'); - master_get_table_ddl_events ---------------------------------------------------------------------- - CREATE FOREIGN TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 (id bigint NOT NULL, rename_name character(8) DEFAULT ''::text NOT NULL) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true') - ALTER TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 OWNER TO postgres -(2 rows) - -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; SELECT master_get_table_ddl_events('local_view'); ERROR: local_view is not a regular, foreign or partitioned table -- clean up DROP VIEW IF EXISTS local_view; -DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; - table_name | column_name | data_type ---------------------------------------------------------------------- -(0 rows) - -\c - - :master_host :master_port DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, table_constraint_table, default_value_table, pkey_table, unique_table, clustered_table, fiddly_table; diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 7f077c77a..64b033d41 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 77667f75b..496e735c9 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 4fd13cee8..e77e0c3b5 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 70672b455..657183bc2 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/expected/multi_repair_shards.out b/src/test/regress/expected/multi_repair_shards.out index ba4d2b1b0..c7fc05080 100644 --- a/src/test/regress/expected/multi_repair_shards.out +++ b/src/test/regress/expected/multi_repair_shards.out @@ -92,30 +92,3 @@ SELECT * FROM customer_engagements; 1 | 03-01-2015 | third event (3 rows) --- now do the same test over again with a foreign table -CREATE FOREIGN TABLE remote_engagements ( - id integer, - created_at date, - event_data text -) SERVER fake_fdw_server; --- distribute the table --- create a single shard on the first worker -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('remote_engagements', 'id', 'hash'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- get the newshardid -SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass -\gset --- now, update the second placement as unhealthy -UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group; --- oops! we don't support repairing shards backed by foreign tables -SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); -ERROR: cannot repair shard -DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported. --- clean-up -DROP FOREIGN TABLE remote_engagements CASCADE; diff --git a/src/test/regress/expected/undistribute_table.out b/src/test/regress/expected/undistribute_table.out index c35c12be2..5b646da32 100644 --- a/src/test/regress/expected/undistribute_table.out +++ b/src/test/regress/expected/undistribute_table.out @@ -128,45 +128,6 @@ SELECT undistribute_table('referencing_table'); ERROR: cannot complete operation because table referencing_table has a foreign key HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true) DROP TABLE referenced_table, referencing_table; --- test distributed foreign tables --- we expect errors --- and we need metadata sync off for foreign tables -SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; -NOTICE: dropping metadata on the node (localhost,57638) -NOTICE: dropping metadata on the node (localhost,57637) - stop_metadata_sync_to_node ---------------------------------------------------------------------- - - -(2 rows) - -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for undistribute_table.foreign_table -NOTICE: dropping the old undistribute_table.foreign_table -NOTICE: renaming the new table to undistribute_table.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -DROP FOREIGN TABLE foreign_table; -SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; - start_metadata_sync_to_node ---------------------------------------------------------------------- - - -(2 rows) - -- test partitioned tables CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5); diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 550544a7f..18a45fd26 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -16,12 +16,9 @@ test: failure_add_disable_node test: failure_copy_to_reference test: failure_copy_on_hash test: failure_create_reference_table -test: check_mx -test: turn_mx_off test: failure_create_distributed_table_non_empty test: failure_create_table test: failure_single_select -test: turn_mx_on test: failure_multi_shard_update_delete test: failure_cte_subquery diff --git a/src/test/regress/sql/citus_local_table_triggers.sql b/src/test/regress/sql/citus_local_table_triggers.sql index d091c498b..76b192388 100644 --- a/src/test/regress/sql/citus_local_table_triggers.sql +++ b/src/test/regress/sql/citus_local_table_triggers.sql @@ -268,6 +268,15 @@ BEGIN END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); + +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; + BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -282,7 +291,7 @@ BEGIN; SELECT * FROM reference_table; ROLLBACK; - +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -296,9 +305,47 @@ BEGIN; AFTER UPDATE ON citus_local_table FOR EACH STATEMENT EXECUTE FUNCTION insert_100(); + UPDATE another_citus_local_table SET value=value-1;; +ROLLBACK; + +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + UPDATE another_citus_local_table SET value=value-1;; -- we should see two rows with "100" - SELECT * FROM reference_table; + SELECT * FROM local_table; +ROLLBACK; + +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + UPDATE another_citus_local_table SET value=value-1;; + -- we should see two rows with "100" + SELECT * FROM local_table; ROLLBACK; -- test on partitioned citus local tables diff --git a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql index 21350aee5..29dc7a2d7 100644 --- a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql +++ b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql @@ -304,6 +304,10 @@ BEGIN; SELECT create_distributed_table('test_table', 'id'); COMMIT; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); + DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); @@ -314,7 +318,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pi BEGIN; SELECT create_distributed_table('test_table', 'id'); COMMIT; + SELECT citus.mitmproxy('conn.allow()'); + SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); diff --git a/src/test/regress/sql/failure_create_table.sql b/src/test/regress/sql/failure_create_table.sql index c10c16c30..a4035b431 100644 --- a/src/test/regress/sql/failure_create_table.sql +++ b/src/test/regress/sql/failure_create_table.sql @@ -99,6 +99,7 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").ki SELECT create_distributed_table('test_table','id',colocate_with=>'temp_table'); SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_shard; SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index f39677e1e..9a4a82d12 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -4,6 +4,9 @@ SELECT citus.clear_network_traffic(); SET citus.shard_count = 2; SET citus.shard_replication_factor = 2; +-- this test is designed such that no modification lock is acquired +SET citus.allow_modifications_from_workers_to_replicated_tables TO false; + CREATE TABLE select_test (key int, value text); SELECT create_distributed_table('select_test', 'key'); @@ -23,6 +26,8 @@ INSERT INTO select_test VALUES (3, 'more data'); SELECT * FROM select_test WHERE key = 3; COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + TRUNCATE select_test; -- now the same tests with query cancellation @@ -47,6 +52,8 @@ SELECT DISTINCT shardstate FROM pg_dist_shard_placement WHERE shardid IN ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'select_test'::regclass ); + +SELECT citus.mitmproxy('conn.allow()'); TRUNCATE select_test; -- cancel the second query diff --git a/src/test/regress/sql/foreign_tables_mx.sql b/src/test/regress/sql/foreign_tables_mx.sql index 080f7cff1..fdd391d10 100644 --- a/src/test/regress/sql/foreign_tables_mx.sql +++ b/src/test/regress/sql/foreign_tables_mx.sql @@ -65,17 +65,15 @@ ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; - CREATE TRIGGER insert_42_trigger AFTER DELETE ON public.foreign_table_newname FOR EACH ROW EXECUTE FUNCTION insert_42(); @@ -83,14 +81,14 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; -- disable trigger alter foreign table public.foreign_table_newname disable trigger insert_42_trigger; INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; DROP TRIGGER insert_42_trigger ON public.foreign_table_newname; @@ -135,10 +133,10 @@ ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgre -- test undistributing DELETE FROM foreign_table; SELECT undistribute_table('foreign_table'); + +-- both should error out SELECT create_distributed_table('foreign_table','data'); -SELECT undistribute_table('foreign_table'); SELECT create_reference_table('foreign_table'); -SELECT undistribute_table('foreign_table'); INSERT INTO foreign_table_test VALUES (1, 'testt'); SELECT * FROM foreign_table ORDER BY a; diff --git a/src/test/regress/sql/mixed_relkind_tests.sql b/src/test/regress/sql/mixed_relkind_tests.sql index e4c7e6624..8e258b7d1 100644 --- a/src/test/regress/sql/mixed_relkind_tests.sql +++ b/src/test/regress/sql/mixed_relkind_tests.sql @@ -47,9 +47,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a'); CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; -CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server; -SELECT create_distributed_table('foreign_distributed_table', 'a'); - -- and insert some data INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); @@ -65,7 +62,6 @@ SELECT * FROM partitioned_distributed_table UNION SELECT 1, * FROM postgres_loca SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2; -SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2; SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2; @@ -117,11 +113,6 @@ SELECT COUNT(*) FROM (SELECT *, random() FROM partitioned_distributed_table) AS bar WHERE foo.a = bar.b; -SELECT COUNT(*) FROM - (SELECT *, random() FROM unlogged_distributed_table) AS foo, - (SELECT *, random() FROM foreign_distributed_table) AS bar -WHERE foo.a = bar.b; - UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo; UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a; @@ -161,9 +152,6 @@ WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a); -WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table) - SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a); - WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); @@ -245,22 +233,12 @@ EXPLAIN (COSTS OFF) SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2; $Q$); -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - -- pull to coordinator WINDOW SELECT public.coordinator_plan($Q$ EXPLAIN (COSTS OFF) SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2; $Q$); -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - -- FOR UPDATE SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; @@ -276,10 +254,5 @@ BEGIN; SELECT * FROM partitioned_distributed_table; COMMIT; -BEGIN; - ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE; - SELECT * FROM foreign_distributed_table; -COMMIT; - -- cleanup at exit DROP SCHEMA mixed_relkind_tests CASCADE; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index ff0b6de46..ca571d954 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -122,9 +122,14 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_ -- show that non-admin role can not activate a node SET ROLE node_metadata_user; +SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); +SELECT 1 FROM master_remove_node('localhost', :worker_2_port); +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); +SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; +SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; ABORT; \c - postgres - :master_port diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 0c46538aa..48a8a2cd5 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -202,10 +202,6 @@ SELECT create_distributed_table('table_append', 'id', 'append'); CREATE TABLE table_range ( id int ); SELECT create_distributed_table('table_range', 'id', 'range'); --- test foreign table creation -CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; -SELECT create_distributed_table('table3_groupD', 'id'); - -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -290,8 +286,8 @@ CREATE TABLE table_bigint ( id bigint ); SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); -- check worker table schemas \c - - - :worker_1_port -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass; \c - - - :master_port SET citus.next_shard_id TO 1300080; diff --git a/src/test/regress/sql/multi_create_shards.sql b/src/test/regress/sql/multi_create_shards.sql index 6037fc17b..d7d787e89 100644 --- a/src/test/regress/sql/multi_create_shards.sql +++ b/src/test/regress/sql/multi_create_shards.sql @@ -106,22 +106,6 @@ SELECT sort_names('sumedh', 'jason', 'ozgun'); SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r'; --- test foreign table creation -CREATE FOREIGN TABLE foreign_table_to_distribute -( - name text, - id bigint -) -SERVER fake_fdw_server; - -SET citus.shard_count TO 16; -SET citus.shard_replication_factor TO 1; -SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash'); - -SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass - ORDER BY (shardminvalue::integer) ASC; - -- test shard creation using weird shard count CREATE TABLE weird_shard_count ( @@ -137,14 +121,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size FROM pg_dist_shard WHERE logicalrelid = 'weird_shard_count'::regclass ORDER BY shardminvalue::integer ASC; - --- cleanup foreign table, related shards and shard placements -DELETE FROM pg_dist_shard_placement - WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass); - -DELETE FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; - -DELETE FROM pg_dist_partition - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; diff --git a/src/test/regress/sql/multi_generate_ddl_commands.sql b/src/test/regress/sql/multi_generate_ddl_commands.sql index f46225c73..4237d62b0 100644 --- a/src/test/regress/sql/multi_generate_ddl_commands.sql +++ b/src/test/regress/sql/multi_generate_ddl_commands.sql @@ -116,25 +116,6 @@ ALTER TABLE fiddly_table SELECT master_get_table_ddl_events('fiddly_table'); --- test foreign tables using fake FDW -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); - -SELECT create_distributed_table('foreign_table', 'id'); -ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name; -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8); -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; -\c - - :master_host :master_port - -SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890'); - -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; @@ -142,13 +123,6 @@ SELECT master_get_table_ddl_events('local_view'); -- clean up DROP VIEW IF EXISTS local_view; -DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; -\c - - :master_host :master_port DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, table_constraint_table, default_value_table, pkey_table, unique_table, clustered_table, fiddly_table; diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index 7df194ea7..4728b8948 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -100,9 +100,18 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; + + -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); @@ -112,6 +121,7 @@ call mx_call_proc_custom_types('S', 'A'); select create_distributed_function('mx_call_proc(int,int)'); select create_distributed_function('mx_call_proc_bigint(bigint,bigint)'); select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_proc_copy(int)'); -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. @@ -119,16 +129,19 @@ SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); call mx_call_proc_bigint(4, 2); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1); select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call mx_call_proc(2, 0); call mx_call_proc_custom_types('S', 'A'); +call mx_call_proc_copy(2); -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index b2d26e853..4dfe91322 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -67,9 +67,21 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; + -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +select multi_mx_function_call_delegation.mx_call_copy(2); select squares(4); -- Same for unqualified name @@ -79,6 +91,7 @@ select mx_call_func(2, 0); select create_distributed_function('mx_call_func(int,int)'); select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_func_copy(int)'); select create_distributed_function('squares(int)'); @@ -249,10 +262,15 @@ select mx_call_func(floor(random())::int, 2); -- test forms we don't distribute select * from mx_call_func(2, 0); -select mx_call_func(2, 0) from mx_call_dist_table_1; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0), mx_call_func(0, 2); +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; + DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; diff --git a/src/test/regress/sql/multi_repair_shards.sql b/src/test/regress/sql/multi_repair_shards.sql index 98085b3c9..f910585cb 100644 --- a/src/test/regress/sql/multi_repair_shards.sql +++ b/src/test/regress/sql/multi_repair_shards.sql @@ -80,29 +80,3 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid AND grou -- get the data from the second placement SELECT * FROM customer_engagements; - --- now do the same test over again with a foreign table -CREATE FOREIGN TABLE remote_engagements ( - id integer, - created_at date, - event_data text -) SERVER fake_fdw_server; - --- distribute the table --- create a single shard on the first worker -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('remote_engagements', 'id', 'hash'); - --- get the newshardid -SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass -\gset - --- now, update the second placement as unhealthy -UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group; - --- oops! we don't support repairing shards backed by foreign tables -SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); - --- clean-up -DROP FOREIGN TABLE remote_engagements CASCADE; diff --git a/src/test/regress/sql/undistribute_table.sql b/src/test/regress/sql/undistribute_table.sql index 71e9a72d9..01ec4629d 100644 --- a/src/test/regress/sql/undistribute_table.sql +++ b/src/test/regress/sql/undistribute_table.sql @@ -52,20 +52,6 @@ SELECT undistribute_table('referencing_table'); DROP TABLE referenced_table, referencing_table; --- test distributed foreign tables --- we expect errors --- and we need metadata sync off for foreign tables -SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); -SELECT undistribute_table('foreign_table'); - -DROP FOREIGN TABLE foreign_table; -SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; - -- test partitioned tables CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5);