Merge branch 'master' into velioglu/table_wo_seq_prototype

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-11 11:35:32 +03:00
commit 2b513c4100
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
50 changed files with 886 additions and 665 deletions

View File

@ -24,6 +24,7 @@
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/function_call_delegation.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -46,9 +47,10 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
static bool CallFuncExprRemotely(CallStmt *callStmt,
DistObjectCacheEntry *procedure, /* global variable tracking whether we are in a delegated procedure call */
FuncExpr *funcExpr, DestReceiver *dest); bool InDelegatedProcedureCall = false;
/* /*
* CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible.
@ -61,28 +63,21 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
functionId, 0); functionId, 0);
if (procedure == NULL || !procedure->isDistributed)
/*
* If procedure is not distributed or already delegated from another
* node, do not call the procedure remotely.
*/
if (procedure == NULL || !procedure->isDistributed ||
IsCitusInitiatedRemoteBackend())
{ {
return false; return false;
} }
return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest); if (IsCitusInitiatedRemoteBackend())
} {
/*
* We are in a citus-initiated backend handling a CALL to a distributed
* procedure. That means that this is the delegated call.
*/
InDelegatedProcedureCall = true;
return false;
}
/*
* CallFuncExprRemotely calls a procedure of function on the worker if possible.
*/
static bool
CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
FuncExpr *funcExpr, DestReceiver *dest)
{
if (IsMultiStatementTransaction()) if (IsMultiStatementTransaction())
{ {
ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction"))); ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction")));
@ -102,6 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
"be constant expressions"))); "be constant expressions")));
return false; return false;
} }
CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId);
Var *partitionColumn = distTable->partitionColumn; Var *partitionColumn = distTable->partitionColumn;
bool colocatedWithReferenceTable = false; bool colocatedWithReferenceTable = false;

View File

@ -129,6 +129,7 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
TupleTableSlot *slot, TupleTableSlot *slot,
EState *estate); EState *estate);
static void ErrorIfTemporaryTable(Oid relationId); static void ErrorIfTemporaryTable(Oid relationId);
static void ErrorIfForeignTable(Oid relationOid);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -333,6 +334,7 @@ EnsureCitusTableCanBeCreated(Oid relationOid)
EnsureRelationExists(relationOid); EnsureRelationExists(relationOid);
EnsureTableOwner(relationOid); EnsureTableOwner(relationOid);
ErrorIfTemporaryTable(relationOid); ErrorIfTemporaryTable(relationOid);
ErrorIfForeignTable(relationOid);
/* /*
* We should do this check here since the codes in the following lines rely * We should do this check here since the codes in the following lines rely
@ -1882,3 +1884,22 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
return false; return false;
} }
/*
* ErrorIfForeignTable errors out if the relation with given relationOid
* is a foreign table.
*/
static void
ErrorIfForeignTable(Oid relationOid)
{
if (IsForeignTable(relationOid))
{
char *relname = get_rel_name(relationOid);
char *qualifiedRelname = generate_qualified_relation_name(relationOid);
ereport(ERROR, (errmsg("foreign tables cannot be distributed"),
(errhint("Can add foreign table \"%s\" to metadata by running: "
"SELECT citus_add_local_table_to_metadata($$%s$$);",
relname, qualifiedRelname))));
}
}

View File

@ -404,7 +404,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands); /* send commands to new workers, the current user should a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName,
nodePort,
CurrentUserName(),
ddlCommands);
} }

View File

@ -3340,6 +3340,7 @@ InitializeCopyShardState(CopyShardState *shardState,
{ {
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
int failedPlacementCount = 0; int failedPlacementCount = 0;
bool hasRemoteCopy = false;
MemoryContext localContext = MemoryContext localContext =
AllocSetContextCreateExtended(CurrentMemoryContext, AllocSetContextCreateExtended(CurrentMemoryContext,
@ -3383,6 +3384,8 @@ InitializeCopyShardState(CopyShardState *shardState,
continue; continue;
} }
hasRemoteCopy = true;
MultiConnection *connection = MultiConnection *connection =
CopyGetPlacementConnection(connectionStateHash, placement, CopyGetPlacementConnection(connectionStateHash, placement,
colocatedIntermediateResult); colocatedIntermediateResult);
@ -3427,6 +3430,11 @@ InitializeCopyShardState(CopyShardState *shardState,
ereport(ERROR, (errmsg("could not connect to any active placements"))); ereport(ERROR, (errmsg("could not connect to any active placements")));
} }
if (hasRemoteCopy)
{
EnsureRemoteTaskExecutionAllowed();
}
/* /*
* We just error out and code execution should never reach to this * We just error out and code execution should never reach to this
* point. This is the case for all tables. * point. This is the case for all tables.

View File

@ -54,7 +54,6 @@
/* controlled via GUC, should be accessed via GetEnableLocalReferenceForeignKeys() */ /* controlled via GUC, should be accessed via GetEnableLocalReferenceForeignKeys() */
bool EnableLocalReferenceForeignKeys = true; bool EnableLocalReferenceForeignKeys = true;
/* Local functions forward declarations for unsupported command checks */ /* Local functions forward declarations for unsupported command checks */
static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement); static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement);
static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement,
@ -1786,6 +1785,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
{ {
return NIL; return NIL;
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
ddlJob->targetRelationId = relationId; ddlJob->targetRelationId = relationId;

View File

@ -33,7 +33,9 @@
#include "access/attnum.h" #include "access/attnum.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#if PG_VERSION_NUM < 140000
#include "access/xact.h" #include "access/xact.h"
#endif
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
@ -52,7 +54,9 @@
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#if PG_VERSION_NUM < 140000
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#endif
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -91,6 +95,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt,
struct QueryEnvironment *queryEnv, struct QueryEnvironment *queryEnv,
DestReceiver *dest, DestReceiver *dest,
QueryCompletionCompat *completionTag); QueryCompletionCompat *completionTag);
#if PG_VERSION_NUM >= 140000
static void set_indexsafe_procflags(void);
#endif
static char * SetSearchPathToCurrentSearchPathCommand(void); static char * SetSearchPathToCurrentSearchPathCommand(void);
static char * CurrentSearchPath(void); static char * CurrentSearchPath(void);
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
@ -227,10 +234,20 @@ multi_ProcessUtility(PlannedStmt *pstmt,
params, queryEnv, dest, completionTag); params, queryEnv, dest, completionTag);
StoredProcedureLevel -= 1; StoredProcedureLevel -= 1;
if (InDelegatedProcedureCall && StoredProcedureLevel == 0)
{
InDelegatedProcedureCall = false;
}
} }
PG_CATCH(); PG_CATCH();
{ {
StoredProcedureLevel -= 1; StoredProcedureLevel -= 1;
if (InDelegatedProcedureCall && StoredProcedureLevel == 0)
{
InDelegatedProcedureCall = false;
}
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
@ -1108,9 +1125,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
/* /*
* Start a new transaction to make sure CONCURRENTLY commands * Start a new transaction to make sure CONCURRENTLY commands
* on localhost do not block waiting for this transaction to finish. * on localhost do not block waiting for this transaction to finish.
*
* In addition to doing that, we also need to tell other backends
* --including the ones spawned for connections opened to localhost to
* build indexes on shards of this relation-- that concurrent index
* builds can safely ignore us.
*
* Normally, DefineIndex() only does that if index doesn't have any
* predicates (i.e.: where clause) and no index expressions at all.
* However, now that we already called standard process utility,
* index build on the shell table is finished anyway.
*
* The reason behind doing so is that we cannot guarantee not
* grabbing any snapshots via adaptive executor, and the backends
* creating indexes on local shards (if any) might block on waiting
* for current xact of the current backend to finish, which would
* cause self deadlocks that are not detectable.
*/ */
if (ddlJob->startNewTransaction) if (ddlJob->startNewTransaction)
{ {
#if PG_VERSION_NUM < 140000
/*
* Older versions of postgres doesn't have PROC_IN_SAFE_IC flag
* so we cannot use set_indexsafe_procflags in those versions.
*
* For this reason, we do our best to ensure not grabbing any
* snapshots later in the executor.
*/
/* /*
* If cache is not populated, system catalog lookups will cause * If cache is not populated, system catalog lookups will cause
* the xmin of current backend to change. Then the last phase * the xmin of current backend to change. Then the last phase
@ -1131,8 +1174,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
* will already be in the hash table, hence we won't be holding any snapshots. * will already be in the hash table, hence we won't be holding any snapshots.
*/ */
WarmUpConnParamsHash(); WarmUpConnParamsHash();
#endif
/*
* Since it is not certain whether the code-path that we followed
* until reaching here caused grabbing any snapshots or not, we
* need to pop the active snapshot if we had any, to ensure not
* leaking any snapshots.
*
* For example, EnsureCoordinator might return without grabbing
* any snapshots if we didn't receive any invalidation messages
* but the otherwise is also possible.
*/
if (ActiveSnapshotSet())
{
PopActiveSnapshot();
}
CommitTransactionCommand(); CommitTransactionCommand();
StartTransactionCommand(); StartTransactionCommand();
#if PG_VERSION_NUM >= 140000
/*
* Tell other backends to ignore us, even if we grab any
* snapshots via adaptive executor.
*/
set_indexsafe_procflags();
#endif
} }
MemoryContext savedContext = CurrentMemoryContext; MemoryContext savedContext = CurrentMemoryContext;
@ -1195,6 +1264,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
} }
#if PG_VERSION_NUM >= 140000
/*
* set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags.
*
* The flag is reset automatically at transaction end, so it must be set
* for each transaction.
*
* Copied from pg/src/backend/commands/indexcmds.c
* Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163.
*/
static void
set_indexsafe_procflags(void)
{
Assert(MyProc->xid == InvalidTransactionId &&
MyProc->xmin == InvalidTransactionId);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyProc->statusFlags |= PROC_IN_SAFE_IC;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
LWLockRelease(ProcArrayLock);
}
#endif
/* /*
* CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
* of shards of a distributed table. The command to be applied is generated by the * of shards of a distributed table. The command to be applied is generated by the

View File

@ -36,6 +36,7 @@
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h" #include "distributed/worker_log_messages.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "pg_config.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -56,9 +57,7 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn, static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key); ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
#ifdef USE_ASSERT_CHECKING static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections);
static void AssertSingleMetadataConnectionExists(dlist_head *connections);
#endif
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -420,6 +419,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
static MultiConnection * static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags) FindAvailableConnection(dlist_head *connections, uint32 flags)
{ {
List *metadataConnectionCandidateList = NIL;
dlist_iter iter; dlist_iter iter;
dlist_foreach(iter, connections) dlist_foreach(iter, connections)
{ {
@ -473,52 +474,40 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
{ {
/* /*
* The caller requested a metadata connection, and this is not the * The caller requested a metadata connection, and this is not the
* metadata connection. Still, this is a candidate for becoming a
* metadata connection. * metadata connection.
*/ */
metadataConnectionCandidateList =
lappend(metadataConnectionCandidateList, connection);
continue; continue;
} }
else
{
/*
* Now that we found metadata connection. We do some sanity
* checks.
*/
#ifdef USE_ASSERT_CHECKING
AssertSingleMetadataConnectionExists(connections);
#endif
/*
* Connection is in use for an ongoing operation. Metadata
* connection cannot be claimed exclusively.
*/
if (connection->claimedExclusively)
{
ereport(ERROR, (errmsg("metadata connections cannot be "
"claimed exclusively")));
}
}
return connection; return connection;
} }
if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections)) if ((flags & REQUIRE_METADATA_CONNECTION) &&
list_length(metadataConnectionCandidateList) > 0)
{ {
/* /*
* Caller asked a metadata connection, and we couldn't find in the * Caller asked a metadata connection, and we couldn't find a connection
* above list. So, we pick the first connection as the metadata * that has already been used for metadata operations.
* connection. *
* So, we pick the first connection as the metadata connection.
*/ */
MultiConnection *metadataConnection = MultiConnection *metadataConnection =
dlist_container(MultiConnection, connectionNode, linitial(metadataConnectionCandidateList);
dlist_head_node(connections));
Assert(!metadataConnection->claimedExclusively);
/* remember that we use this connection for metadata operations */ /* remember that we use this connection for metadata operations */
metadataConnection->useForMetadataOperations = true; metadataConnection->useForMetadataOperations = true;
#ifdef USE_ASSERT_CHECKING /*
AssertSingleMetadataConnectionExists(connections); * We cannot have multiple metadata connections. If we see
#endif * this error, it is likely that there is a bug in connection
* management.
*/
ErrorIfMultipleMetadataConnectionExists(connections);
return metadataConnection; return metadataConnection;
} }
@ -527,14 +516,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
} }
#ifdef USE_ASSERT_CHECKING
/* /*
* AssertSingleMetadataConnectionExists throws an error if the * ErrorIfMultipleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections. * input connection dlist contains more than one metadata connections.
*/ */
static void static void
AssertSingleMetadataConnectionExists(dlist_head *connections) ErrorIfMultipleMetadataConnectionExists(dlist_head *connections)
{ {
bool foundMetadataConnection = false; bool foundMetadataConnection = false;
dlist_iter iter; dlist_iter iter;
@ -556,9 +543,6 @@ AssertSingleMetadataConnectionExists(dlist_head *connections)
} }
#endif /* USE_ASSERT_CHECKING */
/* /*
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
* connections. This is mainly done when citus.node_conninfo changes. * connections. This is mainly done when citus.node_conninfo changes.
@ -1259,6 +1243,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
} }
#if PG_VERSION_NUM < 140000
/* /*
* WarmUpConnParamsHash warms up the ConnParamsHash by loading all the * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
* conn params for active primary nodes. * conn params for active primary nodes.
@ -1280,6 +1266,9 @@ WarmUpConnParamsHash(void)
} }
#endif
/* /*
* FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
* if it is not found, it is created. * if it is not found, it is created.

View File

@ -1290,6 +1290,12 @@ StartDistributedExecution(DistributedExecution *execution)
*/ */
RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList); RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList);
} }
/* make sure we are not doing remote execution from within a task */
if (execution->remoteTaskList != NIL)
{
EnsureRemoteTaskExecutionAllowed();
}
} }

View File

@ -21,9 +21,11 @@
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/commands/multi_copy.h" #include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_executor.h" #include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/combine_query_planner.h" #include "distributed/combine_query_planner.h"
@ -719,3 +721,46 @@ ExecutorBoundParams(void)
Assert(ExecutorLevel > 0); Assert(ExecutorLevel > 0);
return executorBoundParams; return executorBoundParams;
} }
/*
* EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote
* execution from within a task. That could happen when the user calls
* a function in a query that gets pushed down to the worker, and the
* function performs a query on a distributed table.
*/
void
EnsureRemoteTaskExecutionAllowed(void)
{
if (!InTaskExecution())
{
/* we are not within a task, distributed execution is allowed */
return;
}
ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a "
"shard")));
}
/*
* InTaskExecution determines whether we are currently in a task execution.
*/
bool
InTaskExecution(void)
{
if (LocalExecutorLevel > 0)
{
/* in a local task */
return true;
}
/*
* Normally, any query execution within a citus-initiated backend
* is considered a task execution, but an exception is when we
* are in a delegated function/procedure call.
*/
return IsCitusInitiatedRemoteBackend() &&
!InDelegatedFunctionCall &&
!InDelegatedProcedureCall;
}

View File

@ -992,10 +992,11 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers*/ /* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort, newWorkerNode->workerPort,
CitusExtensionOwnerName(), CurrentUserName(),
ddlCommands); ddlCommands);
} }
} }
@ -1208,6 +1209,17 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
/*
* We currently require the object propagation to happen via superuser,
* see #5139. While activating a node, we sync both metadata and object
* propagation.
*
* In order to have a fully transactional semantics with add/activate
* node operations, we require superuser. Note that for creating
* non-owned objects, we already require a superuser connection.
* By ensuring the current user to be a superuser, we can guarantee
* to send all commands within the same remote transaction.
*/
EnsureSuperUser(); EnsureSuperUser();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */

View File

@ -23,6 +23,7 @@
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/cte_inline.h" #include "distributed/cte_inline.h"
#include "distributed/function_call_delegation.h" #include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
@ -71,7 +72,8 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */ /* keep track of planner call stack levels */
int PlannerLevel = 0; int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static bool IsUpdateOrDelete(Query *query); static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt( static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext); DistributedPlanningContext *planContext);
@ -123,6 +125,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext
int rteIdCounter); int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList); static RTEListProperties * GetRTEListProperties(List *rangeTableList);
static List * TranslatedVars(PlannerInfo *root, int relationIndex); static List * TranslatedVars(PlannerInfo *root, int relationIndex);
static void WarnIfListHasForeignDistributedTable(List *rangeTableList);
/* Distributed planner hook */ /* Distributed planner hook */
@ -149,10 +152,18 @@ distributed_planner(Query *parse,
} }
else if (CitusHasBeenLoaded()) else if (CitusHasBeenLoaded())
{ {
needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); bool maybeHasForeignDistributedTable = false;
needsDistributedPlanning =
ListContainsDistributedTableRTE(rangeTableList,
&maybeHasForeignDistributedTable);
if (needsDistributedPlanning) if (needsDistributedPlanning)
{ {
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
if (maybeHasForeignDistributedTable)
{
WarnIfListHasForeignDistributedTable(rangeTableList);
}
} }
} }
@ -311,17 +322,19 @@ NeedsDistributedPlanning(Query *query)
List *allRTEs = ExtractRangeTableEntryList(query); List *allRTEs = ExtractRangeTableEntryList(query);
return ListContainsDistributedTableRTE(allRTEs); return ListContainsDistributedTableRTE(allRTEs, NULL);
} }
/* /*
* ListContainsDistributedTableRTE gets a list of range table entries * ListContainsDistributedTableRTE gets a list of range table entries
* and returns true if there is at least one distributed relation range * and returns true if there is at least one distributed relation range
* table entry in the list. * table entry in the list. The boolean maybeHasForeignDistributedTable
* variable is set to true if the list contains a foreign table.
*/ */
static bool static bool
ListContainsDistributedTableRTE(List *rangeTableList) ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable)
{ {
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
@ -336,6 +349,12 @@ ListContainsDistributedTableRTE(List *rangeTableList)
if (IsCitusTable(rangeTableEntry->relid)) if (IsCitusTable(rangeTableEntry->relid))
{ {
if (maybeHasForeignDistributedTable != NULL &&
IsForeignTable(rangeTableEntry->relid))
{
*maybeHasForeignDistributedTable = true;
}
return true; return true;
} }
} }
@ -2408,3 +2427,37 @@ GetRTEListProperties(List *rangeTableList)
return rteListProperties; return rteListProperties;
} }
/*
* WarnIfListHasForeignDistributedTable iterates the given list and logs a WARNING
* if the given relation is a distributed foreign table.
* We do that because now we only support Citus Local Tables for foreign tables.
*/
static void
WarnIfListHasForeignDistributedTable(List *rangeTableList)
{
static bool DistributedForeignTableWarningPrompted = false;
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
if (DistributedForeignTableWarningPrompted)
{
return;
}
Oid relationId = rangeTableEntry->relid;
if (IsForeignTable(relationId) && IsCitusTable(relationId) &&
!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
DistributedForeignTableWarningPrompted = true;
ereport(WARNING, (errmsg(
"support for distributed foreign tables are deprecated, "
"please use Citus managed local tables"),
(errdetail(
"Foreign tables can be added to metadata using UDF: "
"citus_add_local_table_to_metadata()"))));
}
}
}

View File

@ -58,6 +58,10 @@ struct ParamWalkerContext
static bool contain_param_walker(Node *node, void *context); static bool contain_param_walker(Node *node, void *context);
/* global variable keeping track of whether we are in a delegated function call */
bool InDelegatedFunctionCall = false;
/* /*
* contain_param_walker scans node for Param nodes. * contain_param_walker scans node for Param nodes.
* Ignore the return value, instead check context afterwards. * Ignore the return value, instead check context afterwards.
@ -112,15 +116,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
} }
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
if (localGroupId != COORDINATOR_GROUP_ID && IsCitusInitiatedRemoteBackend())
{
/*
* Do not delegate from workers if it is initiated by Citus already.
* It means that this function has already been delegated to this node.
*/
return NULL;
}
if (localGroupId == GROUP_ID_UPGRADING) if (localGroupId == GROUP_ID_UPGRADING)
{ {
/* do not delegate while upgrading */ /* do not delegate while upgrading */
@ -218,6 +213,27 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
ereport(DEBUG4, (errmsg("function is distributed"))); ereport(DEBUG4, (errmsg("function is distributed")));
} }
if (IsCitusInitiatedRemoteBackend())
{
/*
* We are planning a call to a distributed function within a Citus backend,
* that means that this is the delegated call.
*/
InDelegatedFunctionCall = true;
return NULL;
}
if (localGroupId != COORDINATOR_GROUP_ID)
{
/*
* We are calling a distributed function on a worker node. We currently
* only delegate from the coordinator.
*
* TODO: remove this restriction.
*/
return NULL;
}
/* /*
* Cannot delegate functions for INSERT ... SELECT func(), since they require * Cannot delegate functions for INSERT ... SELECT func(), since they require
* coordinated transactions. * coordinated transactions.

View File

@ -113,6 +113,12 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort); elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort);
} }
/* pretend we are a regular client to avoid citus-initiated backend checks */
const char *setAppName =
"SET application_name TO run_commands_on_session_level_connection_to_node";
ExecuteCriticalRemoteCommand(singleConnection, setAppName);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -23,6 +23,7 @@
#include "distributed/citus_safe_lib.h" #include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/function_call_delegation.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/intermediate_results.h" #include "distributed/intermediate_results.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
@ -550,6 +551,7 @@ ResetGlobalVariables()
ShouldCoordinatedTransactionUse2PC = false; ShouldCoordinatedTransactionUse2PC = false;
TransactionModifiedNodeMetadata = false; TransactionModifiedNodeMetadata = false;
MetadataSyncOnCommit = false; MetadataSyncOnCommit = false;
InDelegatedFunctionCall = false;
ResetWorkerErrorIndication(); ResetWorkerErrorIndication();
} }

View File

@ -45,8 +45,9 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery(
ShardPlacement *sourceShardPlacement, ShardPlacement *sourceShardPlacement,
WorkerNode *workerNode, WorkerNode *workerNode,
char transferMode); char transferMode);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval,
int nodePort); char *nodeName,
int nodePort);
static bool AnyRelationsModifiedInTransaction(List *relationIdList); static bool AnyRelationsModifiedInTransaction(List *relationIdList);
static List * ReplicatedMetadataSyncedDistributedTableList(void); static List * ReplicatedMetadataSyncedDistributedTableList(void);
@ -335,7 +336,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
* table. * table.
*/ */
static void static void
ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort)
{ {
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
@ -350,7 +352,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort); nodeName, nodePort);
if (targetPlacement != NULL) if (targetPlacement != NULL)
{ {
if (targetPlacement->shardState == SHARD_STATE_ACTIVE) if (targetPlacement->shardState == SHARD_STATE_ACTIVE)
@ -368,8 +369,11 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
get_rel_name(shardInterval->relationId), nodeName, get_rel_name(shardInterval->relationId), nodeName,
nodePort))); nodePort)));
EnsureNoModificationsHaveBeenDone(); /* send commands to new workers, the current user should be a superuser */
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommandList); Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
CurrentUserName(),
ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort); int32 groupId = GroupForNode(nodeName, nodePort);
uint64 placementId = GetNextPlacementId(); uint64 placementId = GetNextPlacementId();
@ -586,7 +590,7 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardDistributionMetadata(shardId, ExclusiveLock);
ReplicateShardToNode(shardInterval, nodeName, nodePort); ReplicateReferenceTableShardToNode(shardInterval, nodeName, nodePort);
} }
/* create foreign constraints between reference tables */ /* create foreign constraints between reference tables */
@ -594,7 +598,11 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{ {
List *commandList = CopyShardForeignConstraintCommandList(shardInterval); List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), commandList); /* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
CurrentUserName(),
commandList);
} }
} }
} }

View File

@ -18,6 +18,7 @@
#include "tcop/utility.h" #include "tcop/utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/function_call_delegation.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -37,6 +38,7 @@ extern bool EnableAlterRolePropagation;
extern bool EnableAlterRoleSetPropagation; extern bool EnableAlterRoleSetPropagation;
extern bool EnableAlterDatabaseOwner; extern bool EnableAlterDatabaseOwner;
extern int UtilityHookLevel; extern int UtilityHookLevel;
extern bool InDelegatedProcedureCall;
/* /*

View File

@ -16,6 +16,7 @@
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/remote_transaction.h" #include "distributed/remote_transaction.h"
#include "lib/ilist.h" #include "lib/ilist.h"
#include "pg_config.h"
#include "portability/instr_time.h" #include "portability/instr_time.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
@ -283,5 +284,7 @@ extern void MarkConnectionConnected(MultiConnection *connection);
extern double MillisecondsPassedSince(instr_time moment); extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart); extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
#if PG_VERSION_NUM < 140000
extern void WarmUpConnParamsHash(void); extern void WarmUpConnParamsHash(void);
#endif
#endif /* CONNECTION_MANAGMENT_H */ #endif /* CONNECTION_MANAGMENT_H */

View File

@ -15,6 +15,14 @@
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
/*
* These flags keep track of whether the process is currently in a delegated
* function or procedure call.
*/
extern bool InDelegatedFunctionCall;
extern bool InDelegatedProcedureCall;
PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext);

View File

@ -149,6 +149,8 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
const char ***parameterValues, bool const char ***parameterValues, bool
useOriginalCustomTypeOids); useOriginalCustomTypeOids);
extern ParamListInfo ExecutorBoundParams(void); extern ParamListInfo ExecutorBoundParams(void);
extern void EnsureRemoteTaskExecutionAllowed(void);
extern bool InTaskExecution(void);
#endif /* MULTI_EXECUTOR_H */ #endif /* MULTI_EXECUTOR_H */

View File

@ -392,6 +392,13 @@ BEGIN
RETURN NEW; RETURN NEW;
END; END;
$insert_100$ LANGUAGE plpgsql; $insert_100$ LANGUAGE plpgsql;
CREATE TABLE local_table (value int);
CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$
BEGIN
INSERT INTO local_table VALUES (100);
RETURN NEW;
END;
$insert_100$ LANGUAGE plpgsql;
BEGIN; BEGIN;
CREATE TRIGGER insert_100_trigger CREATE TRIGGER insert_100_trigger
AFTER TRUNCATE ON another_citus_local_table AFTER TRUNCATE ON another_citus_local_table
@ -416,6 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;
-- cannot perform remote execution from a trigger on a Citus local table
BEGIN; BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic -- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600); INSERT INTO another_citus_local_table VALUES (600);
@ -436,11 +444,70 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
FOR EACH STATEMENT EXECUTE FUNCTION insert_100();') FOR EACH STATEMENT EXECUTE FUNCTION insert_100();')
UPDATE another_citus_local_table SET value=value-1;; UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) ERROR: cannot execute a distributed query from a query on a shard
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) ROLLBACK;
-- can perform regular execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600)
INSERT INTO citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600)
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
-- we should see two rows with "100" -- we should see two rows with "100"
SELECT * FROM reference_table; SELECT * FROM local_table;
NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.reference_table_1507010 reference_table value
---------------------------------------------------------------------
100
100
(2 rows)
ROLLBACK;
-- can perform local execution from a trigger on a Citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600)
INSERT INTO citus_local_table VALUES (600);
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600)
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();')
UPDATE another_citus_local_table SET value=value-1;;
NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100)
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100)
-- we should see two rows with "100"
SELECT * FROM local_table;
NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.local_table_1507011 local_table
value value
--------------------------------------------------------------------- ---------------------------------------------------------------------
100 100
@ -456,11 +523,11 @@ CREATE TABLE par_another_citus_local_table_1 PARTITION OF par_another_citus_loca
ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val); ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val);
ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE; ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE;
SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true); SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true);
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507013, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507011'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507012', 'par_another_citus_local_table_1_val_key_1507012')$$) NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507012'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507013', 'par_another_citus_local_table_1_val_key_1507013')$$)
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507014, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507015, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE')
citus_add_local_table_to_metadata citus_add_local_table_to_metadata
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -489,7 +556,7 @@ BEGIN;
TRUNCATE par_another_citus_local_table CASCADE; TRUNCATE par_another_citus_local_table CASCADE;
NOTICE: truncate cascades to table "par_citus_local_table" NOTICE: truncate cascades to table "par_citus_local_table"
NOTICE: truncate cascades to table "par_citus_local_table_1" NOTICE: truncate cascades to table "par_citus_local_table_1"
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100)
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE
NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_xxxxx"
NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx"
@ -497,12 +564,12 @@ NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_trigger
NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_xxxxx"
NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx"
NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx"
NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100)
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE
-- we should see two rows with "100" -- we should see two rows with "100"
SELECT * FROM par_reference_table; SELECT * FROM par_reference_table;
NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507015 par_reference_table NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507016 par_reference_table
val val
--------------------------------------------------------------------- ---------------------------------------------------------------------
100 100
@ -512,4 +579,4 @@ NOTICE: executing the command locally: SELECT val FROM citus_local_table_trigge
ROLLBACK; ROLLBACK;
-- cleanup at exit -- cleanup at exit
DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE; DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE;
NOTICE: drop cascades to 20 other objects NOTICE: drop cascades to 22 other objects

View File

@ -872,6 +872,18 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_
4 4
(1 row) (1 row)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
2
(1 row)
DROP TABLE test_table; DROP TABLE test_table;
CREATE TABLE test_table(id int, value_1 int); CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);

View File

@ -289,6 +289,12 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_shard; SELECT count(*) FROM pg_dist_shard;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -421,7 +427,7 @@ COMMIT;
SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions();
recover_prepared_transactions recover_prepared_transactions
--------------------------------------------------------------------- ---------------------------------------------------------------------
4 0
(1 row) (1 row)
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');

View File

@ -12,6 +12,8 @@ SELECT citus.clear_network_traffic();
SET citus.shard_count = 2; SET citus.shard_count = 2;
SET citus.shard_replication_factor = 2; SET citus.shard_replication_factor = 2;
-- this test is designed such that no modification lock is acquired
SET citus.allow_modifications_from_workers_to_replicated_tables TO false;
CREATE TABLE select_test (key int, value text); CREATE TABLE select_test (key int, value text);
SELECT create_distributed_table('select_test', 'key'); SELECT create_distributed_table('select_test', 'key');
create_distributed_table create_distributed_table
@ -60,6 +62,12 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
COMMIT; COMMIT;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
TRUNCATE select_test; TRUNCATE select_test;
-- now the same tests with query cancellation -- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement -- put data in shard for which mitm node is first placement
@ -96,6 +104,12 @@ WHERE shardid IN (
1 1
(1 row) (1 row)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
TRUNCATE select_test; TRUNCATE select_test;
-- cancel the second query -- cancel the second query
-- error after second SELECT; txn should fail -- error after second SELECT; txn should fail

View File

@ -66,16 +66,10 @@ ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check(
ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test -- trigger test
CREATE TABLE distributed_table(value int); CREATE TABLE table42(value int);
SELECT create_distributed_table('distributed_table', 'value');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN BEGIN
INSERT INTO distributed_table VALUES (42); INSERT INTO table42 VALUES (42);
RETURN NEW; RETURN NEW;
END; END;
$insert_42$ LANGUAGE plpgsql; $insert_42$ LANGUAGE plpgsql;
@ -85,7 +79,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well -- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99; delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value; select * from table42 ORDER BY value;
value value
--------------------------------------------------------------------- ---------------------------------------------------------------------
42 42
@ -96,7 +90,7 @@ alter foreign table public.foreign_table_newname disable trigger insert_42_trigg
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99; delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled -- should not insert again as trigger disabled
select * from distributed_table ORDER BY value; select * from table42 ORDER BY value;
value value
--------------------------------------------------------------------- ---------------------------------------------------------------------
42 42
@ -199,36 +193,11 @@ NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
(1 row) (1 row)
-- both should error out
SELECT create_distributed_table('foreign_table','data'); SELECT create_distributed_table('foreign_table','data');
create_distributed_table ERROR: foreign tables cannot be distributed
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('foreign_table'); SELECT create_reference_table('foreign_table');
create_reference_table ERROR: foreign tables cannot be distributed
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table
NOTICE: dropping the old foreign_tables_schema_mx.foreign_table
NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
INSERT INTO foreign_table_test VALUES (1, 'testt'); INSERT INTO foreign_table_test VALUES (1, 'testt');
SELECT * FROM foreign_table ORDER BY a; SELECT * FROM foreign_table ORDER BY a;
data | a data | a

View File

@ -62,13 +62,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a');
CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
SELECT create_distributed_table('foreign_distributed_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- and insert some data -- and insert some data
INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5);
INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5);
@ -145,12 +138,6 @@ SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
5 | 6 5 | 6
(7 rows) (7 rows)
SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
a | b
---------------------------------------------------------------------
1 | 1
(1 row)
SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1;
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -378,17 +365,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
455 455
(1 row) (1 row)
SELECT COUNT(*) FROM
(SELECT *, random() FROM unlogged_distributed_table) AS foo,
(SELECT *, random() FROM foreign_distributed_table) AS bar
WHERE foo.a = bar.b;
DEBUG: generating subplan XXX_1 for subquery SELECT a, b, random() AS random FROM mixed_relkind_tests.foreign_distributed_table
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT unlogged_distributed_table.a, unlogged_distributed_table.b, random() AS random FROM mixed_relkind_tests.unlogged_distributed_table) foo, (SELECT intermediate_result.a, intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer, random double precision)) bar WHERE (foo.a OPERATOR(pg_catalog.=) bar.b)
count
---------------------------------------------------------------------
0
(1 row)
UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo;
DEBUG: Wrapping relation "citus_local_table" "foo" to a subquery DEBUG: Wrapping relation "citus_local_table" "foo" to a subquery
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.citus_local_table foo WHERE true DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.citus_local_table foo WHERE true
@ -486,15 +462,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
1014 1014
(1 row) (1 row)
WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table)
SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a);
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.foreign_distributed_table
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.foreign_distributed_table USING (a))
count
---------------------------------------------------------------------
0
(1 row)
WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table)
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b);
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table
@ -658,18 +625,6 @@ $Q$);
Task Count: 4 Task Count: 4
(4 rows) (4 rows)
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2;
$Q$);
coordinator_plan
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.a, remote_scan.count
-> Custom Scan (Citus Adaptive)
Task Count: 4
(4 rows)
-- pull to coordinator WINDOW -- pull to coordinator WINDOW
SELECT public.coordinator_plan($Q$ SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)
@ -686,21 +641,6 @@ $Q$);
Task Count: 4 Task Count: 4
(7 rows) (7 rows)
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2;
$Q$);
coordinator_plan
---------------------------------------------------------------------
Sort
Sort Key: remote_scan.a, (count(*) OVER (?))
-> WindowAgg
-> Sort
Sort Key: remote_scan.worker_column_2
-> Custom Scan (Citus Adaptive)
Task Count: 4
(7 rows)
-- FOR UPDATE -- FOR UPDATE
SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
a | b a | b
@ -737,14 +677,6 @@ BEGIN;
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
COMMIT;
BEGIN;
ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE;
SELECT * FROM foreign_distributed_table;
a
---------------------------------------------------------------------
(0 rows)
COMMIT; COMMIT;
-- cleanup at exit -- cleanup at exit
DROP SCHEMA mixed_relkind_tests CASCADE; DROP SCHEMA mixed_relkind_tests CASCADE;

View File

@ -256,6 +256,10 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_
ERROR: permission denied for function master_update_node ERROR: permission denied for function master_update_node
-- try to manipulate node metadata via privileged user -- try to manipulate node metadata via privileged user
SET ROLE node_metadata_user; SET ROLE node_metadata_user;
SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
BEGIN; BEGIN;
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
?column? ?column?
@ -263,43 +267,29 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
1 1
(1 row) (1 row)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
?column?
----------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port); SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
?column? ?column?
---------- ----------
1 1
(1 row) (1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port);
?column? ?column?
---------- ---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port);
?column?
----------
1 1
(1 row) (1 row)
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
master_update_node master_update_node
-------------------- ---------------------------------------------------------------------
(0 rows)
(1 row)
SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport;
nodename | nodeport | noderole nodename | nodeport | noderole
-----------+----------+----------- -----------+----------+-----------
localhost | 57637 | primary localhost | 57637 | primary
localhost | 57640 | secondary localhost | 57640 | secondary
localhost | 57641 | primary (2 rows)
(3 rows)
ABORT; ABORT;
\c - postgres - :master_port \c - postgres - :master_port

View File

@ -425,14 +425,6 @@ SELECT create_distributed_table('table_range', 'id', 'range');
(1 row) (1 row)
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- check metadata -- check metadata
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
@ -458,8 +450,7 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
table2_groupc | 6 table2_groupc | 6
table1_groupd | 7 table1_groupd | 7
table2_groupd | 7 table2_groupd | 7
table3_groupd | 7 (8 rows)
(9 rows)
-- check effects of dropping tables -- check effects of dropping tables
DROP TABLE table1_groupA; DROP TABLE table1_groupA;
@ -585,13 +576,12 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
table2_groupc | 6 table2_groupc | 6
table1_groupd | 7 table1_groupd | 7
table2_groupd | 7 table2_groupd | 7
table3_groupd | 7
table1_group_none_1 | 8 table1_group_none_1 | 8
table2_group_none_1 | 8 table2_group_none_1 | 8
table1_group_none_2 | 9 table1_group_none_2 | 9
table1_group_none_3 | 10 table1_group_none_3 | 10
table1_group_default | 11 table1_group_default | 11
(17 rows) (16 rows)
-- check failing colocate_with options -- check failing colocate_with options
CREATE TABLE table_postgresql( id int ); CREATE TABLE table_postgresql( id int );
@ -621,14 +611,14 @@ ERROR: cannot colocate tables table1_groupe and table_bigint
DETAIL: Distribution column types don't match for table1_groupe and table_bigint. DETAIL: Distribution column types don't match for table1_groupe and table_bigint.
-- check worker table schemas -- check worker table schemas
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass;
Column | Type | Modifiers Column | Type | Modifiers
--------------------------------------------------------------------- ---------------------------------------------------------------------
dummy_column | text | dummy_column | text |
id | integer | id | integer |
(2 rows) (2 rows)
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass;
Column | Type | Modifiers Column | Type | Modifiers
--------------------------------------------------------------------- ---------------------------------------------------------------------
id | integer | id | integer |
@ -685,8 +675,6 @@ ORDER BY
table1_groupb | table2_groupb | t table1_groupb | table2_groupb | t
table1_groupc | table2_groupc | t table1_groupc | table2_groupc | t
table1_groupd | table2_groupd | t table1_groupd | table2_groupd | t
table1_groupd | table3_groupd | t
table2_groupd | table3_groupd | t
table1_groupe | table2_groupe | t table1_groupe | table2_groupe | t
table1_groupe | table3_groupe | t table1_groupe | table3_groupe | t
table1_groupe | schema_colocation.table4_groupe | t table1_groupe | schema_colocation.table4_groupe | t
@ -699,7 +687,7 @@ ORDER BY
schema_colocation.table4_groupe | table4_groupe | t schema_colocation.table4_groupe | table4_groupe | t
table1_group_none_1 | table2_group_none_1 | t table1_group_none_1 | table2_group_none_1 | t
table1_groupf | table2_groupf | t table1_groupf | table2_groupf | t
(18 rows) (16 rows)
-- check created shards -- check created shards
SELECT SELECT
@ -766,71 +754,55 @@ ORDER BY
table2_groupd | 1300048 | t | 57638 | 1073741824 | 1610612735 table2_groupd | 1300048 | t | 57638 | 1073741824 | 1610612735
table2_groupd | 1300049 | t | 57637 | 1610612736 | 2147483647 table2_groupd | 1300049 | t | 57637 | 1610612736 | 2147483647
table2_groupd | 1300049 | t | 57638 | 1610612736 | 2147483647 table2_groupd | 1300049 | t | 57638 | 1610612736 | 2147483647
table3_groupd | 1300050 | f | 57637 | -2147483648 | -1610612737 table1_groupe | 1300050 | t | 57637 | -2147483648 | -1
table3_groupd | 1300050 | f | 57638 | -2147483648 | -1610612737 table1_groupe | 1300050 | t | 57638 | -2147483648 | -1
table3_groupd | 1300051 | f | 57637 | -1610612736 | -1073741825 table1_groupe | 1300051 | t | 57637 | 0 | 2147483647
table3_groupd | 1300051 | f | 57638 | -1610612736 | -1073741825 table1_groupe | 1300051 | t | 57638 | 0 | 2147483647
table3_groupd | 1300052 | f | 57637 | -1073741824 | -536870913 table2_groupe | 1300052 | t | 57637 | -2147483648 | -1
table3_groupd | 1300052 | f | 57638 | -1073741824 | -536870913 table2_groupe | 1300052 | t | 57638 | -2147483648 | -1
table3_groupd | 1300053 | f | 57637 | -536870912 | -1 table2_groupe | 1300053 | t | 57637 | 0 | 2147483647
table3_groupd | 1300053 | f | 57638 | -536870912 | -1 table2_groupe | 1300053 | t | 57638 | 0 | 2147483647
table3_groupd | 1300054 | f | 57637 | 0 | 536870911 table3_groupe | 1300054 | t | 57637 | -2147483648 | -1
table3_groupd | 1300054 | f | 57638 | 0 | 536870911 table3_groupe | 1300054 | t | 57638 | -2147483648 | -1
table3_groupd | 1300055 | f | 57637 | 536870912 | 1073741823 table3_groupe | 1300055 | t | 57637 | 0 | 2147483647
table3_groupd | 1300055 | f | 57638 | 536870912 | 1073741823 table3_groupe | 1300055 | t | 57638 | 0 | 2147483647
table3_groupd | 1300056 | f | 57637 | 1073741824 | 1610612735 schema_colocation.table4_groupe | 1300056 | t | 57637 | -2147483648 | -1
table3_groupd | 1300056 | f | 57638 | 1073741824 | 1610612735 schema_colocation.table4_groupe | 1300056 | t | 57638 | -2147483648 | -1
table3_groupd | 1300057 | f | 57637 | 1610612736 | 2147483647 schema_colocation.table4_groupe | 1300057 | t | 57637 | 0 | 2147483647
table3_groupd | 1300057 | f | 57638 | 1610612736 | 2147483647 schema_colocation.table4_groupe | 1300057 | t | 57638 | 0 | 2147483647
table1_groupe | 1300058 | t | 57637 | -2147483648 | -1 table1_group_none_1 | 1300058 | t | 57637 | -2147483648 | -1
table1_groupe | 1300058 | t | 57638 | -2147483648 | -1 table1_group_none_1 | 1300058 | t | 57638 | -2147483648 | -1
table1_groupe | 1300059 | t | 57637 | 0 | 2147483647 table1_group_none_1 | 1300059 | t | 57637 | 0 | 2147483647
table1_groupe | 1300059 | t | 57638 | 0 | 2147483647 table1_group_none_1 | 1300059 | t | 57638 | 0 | 2147483647
table2_groupe | 1300060 | t | 57637 | -2147483648 | -1 table2_group_none_1 | 1300060 | t | 57637 | -2147483648 | -1
table2_groupe | 1300060 | t | 57638 | -2147483648 | -1 table2_group_none_1 | 1300060 | t | 57638 | -2147483648 | -1
table2_groupe | 1300061 | t | 57637 | 0 | 2147483647 table2_group_none_1 | 1300061 | t | 57637 | 0 | 2147483647
table2_groupe | 1300061 | t | 57638 | 0 | 2147483647 table2_group_none_1 | 1300061 | t | 57638 | 0 | 2147483647
table3_groupe | 1300062 | t | 57637 | -2147483648 | -1 table1_group_none_2 | 1300062 | t | 57637 | -2147483648 | -1
table3_groupe | 1300062 | t | 57638 | -2147483648 | -1 table1_group_none_2 | 1300062 | t | 57638 | -2147483648 | -1
table3_groupe | 1300063 | t | 57637 | 0 | 2147483647 table1_group_none_2 | 1300063 | t | 57637 | 0 | 2147483647
table3_groupe | 1300063 | t | 57638 | 0 | 2147483647 table1_group_none_2 | 1300063 | t | 57638 | 0 | 2147483647
schema_colocation.table4_groupe | 1300064 | t | 57637 | -2147483648 | -1 table4_groupe | 1300064 | t | 57637 | -2147483648 | -1
schema_colocation.table4_groupe | 1300064 | t | 57638 | -2147483648 | -1 table4_groupe | 1300064 | t | 57638 | -2147483648 | -1
schema_colocation.table4_groupe | 1300065 | t | 57637 | 0 | 2147483647 table4_groupe | 1300065 | t | 57637 | 0 | 2147483647
schema_colocation.table4_groupe | 1300065 | t | 57638 | 0 | 2147483647 table4_groupe | 1300065 | t | 57638 | 0 | 2147483647
table1_group_none_1 | 1300066 | t | 57637 | -2147483648 | -1 table1_group_none_3 | 1300066 | t | 57637 | -2147483648 | -715827884
table1_group_none_1 | 1300066 | t | 57638 | -2147483648 | -1 table1_group_none_3 | 1300066 | t | 57638 | -2147483648 | -715827884
table1_group_none_1 | 1300067 | t | 57637 | 0 | 2147483647 table1_group_none_3 | 1300067 | t | 57637 | -715827883 | 715827881
table1_group_none_1 | 1300067 | t | 57638 | 0 | 2147483647 table1_group_none_3 | 1300067 | t | 57638 | -715827883 | 715827881
table2_group_none_1 | 1300068 | t | 57637 | -2147483648 | -1 table1_group_none_3 | 1300068 | t | 57637 | 715827882 | 2147483647
table2_group_none_1 | 1300068 | t | 57638 | -2147483648 | -1 table1_group_none_3 | 1300068 | t | 57638 | 715827882 | 2147483647
table2_group_none_1 | 1300069 | t | 57637 | 0 | 2147483647 table1_group_default | 1300069 | t | 57637 | -2147483648 | -715827884
table2_group_none_1 | 1300069 | t | 57638 | 0 | 2147483647 table1_group_default | 1300069 | t | 57638 | -2147483648 | -715827884
table1_group_none_2 | 1300070 | t | 57637 | -2147483648 | -1 table1_group_default | 1300070 | t | 57637 | -715827883 | 715827881
table1_group_none_2 | 1300070 | t | 57638 | -2147483648 | -1 table1_group_default | 1300070 | t | 57638 | -715827883 | 715827881
table1_group_none_2 | 1300071 | t | 57637 | 0 | 2147483647 table1_group_default | 1300071 | t | 57637 | 715827882 | 2147483647
table1_group_none_2 | 1300071 | t | 57638 | 0 | 2147483647 table1_group_default | 1300071 | t | 57638 | 715827882 | 2147483647
table4_groupe | 1300072 | t | 57637 | -2147483648 | -1
table4_groupe | 1300072 | t | 57638 | -2147483648 | -1
table4_groupe | 1300073 | t | 57637 | 0 | 2147483647
table4_groupe | 1300073 | t | 57638 | 0 | 2147483647
table1_group_none_3 | 1300074 | t | 57637 | -2147483648 | -715827884
table1_group_none_3 | 1300074 | t | 57638 | -2147483648 | -715827884
table1_group_none_3 | 1300075 | t | 57637 | -715827883 | 715827881
table1_group_none_3 | 1300075 | t | 57638 | -715827883 | 715827881
table1_group_none_3 | 1300076 | t | 57637 | 715827882 | 2147483647
table1_group_none_3 | 1300076 | t | 57638 | 715827882 | 2147483647
table1_group_default | 1300077 | t | 57637 | -2147483648 | -715827884
table1_group_default | 1300077 | t | 57638 | -2147483648 | -715827884
table1_group_default | 1300078 | t | 57637 | -715827883 | 715827881
table1_group_default | 1300078 | t | 57638 | -715827883 | 715827881
table1_group_default | 1300079 | t | 57637 | 715827882 | 2147483647
table1_group_default | 1300079 | t | 57638 | 715827882 | 2147483647
table1_groupf | 1300080 | t | 57637 | | table1_groupf | 1300080 | t | 57637 | |
table1_groupf | 1300080 | t | 57638 | | table1_groupf | 1300080 | t | 57638 | |
table2_groupf | 1300081 | t | 57637 | | table2_groupf | 1300081 | t | 57637 | |
table2_groupf | 1300081 | t | 57638 | | table2_groupf | 1300081 | t | 57638 | |
(108 rows) (92 rows)
-- reset colocation ids to test update_distributed_table_colocation -- reset colocation ids to test update_distributed_table_colocation
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
@ -862,7 +834,7 @@ ERROR: cannot colocate tables table1_groupd and table1_groupb
DETAIL: Shard counts don't match for table1_groupd and table1_groupb. DETAIL: Shard counts don't match for table1_groupd and table1_groupb.
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupE'); SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupE');
ERROR: cannot colocate tables table1_groupe and table1_groupb ERROR: cannot colocate tables table1_groupe and table1_groupb
DETAIL: Shard 1300058 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements. DETAIL: Shard 1300050 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements.
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupF'); SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupF');
ERROR: relation table1_groupf should be a hash distributed table ERROR: relation table1_groupf should be a hash distributed table
SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupD'); SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupD');
@ -1355,4 +1327,3 @@ DROP TABLE range_table;
DROP TABLE none; DROP TABLE none;
DROP TABLE ref; DROP TABLE ref;
DROP TABLE local_table; DROP TABLE local_table;
DROP FOREIGN TABLE table3_groupD CASCADE;

View File

@ -149,44 +149,6 @@ SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r';
0 0
(1 row) (1 row)
-- test foreign table creation
CREATE FOREIGN TABLE foreign_table_to_distribute
(
name text,
id bigint
)
SERVER fake_fdw_server;
SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass
ORDER BY (shardminvalue::integer) ASC;
shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
f | -2147483648 | -1879048193
f | -1879048192 | -1610612737
f | -1610612736 | -1342177281
f | -1342177280 | -1073741825
f | -1073741824 | -805306369
f | -805306368 | -536870913
f | -536870912 | -268435457
f | -268435456 | -1
f | 0 | 268435455
f | 268435456 | 536870911
f | 536870912 | 805306367
f | 805306368 | 1073741823
f | 1073741824 | 1342177279
f | 1342177280 | 1610612735
f | 1610612736 | 1879048191
f | 1879048192 | 2147483647
(16 rows)
-- test shard creation using weird shard count -- test shard creation using weird shard count
CREATE TABLE weird_shard_count CREATE TABLE weird_shard_count
( (
@ -216,11 +178,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size
613566759 613566759
(7 rows) (7 rows)
-- cleanup foreign table, related shards and shard placements
DELETE FROM pg_dist_shard_placement
WHERE shardid IN (SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass);
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass;
DELETE FROM pg_dist_partition
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass;

View File

@ -166,62 +166,12 @@ SELECT master_get_table_ddl_events('fiddly_table');
ALTER TABLE public.fiddly_table OWNER TO postgres ALTER TABLE public.fiddly_table OWNER TO postgres
(3 rows) (3 rows)
-- test foreign tables using fake FDW
CREATE FOREIGN TABLE foreign_table (
id bigint not null,
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890;
NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456"
ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name;
NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456"
ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8);
NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456"
\c - - :public_worker_1_host :worker_1_port
select table_name, column_name, data_type
from information_schema.columns
where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id'
order by table_name;
table_name | column_name | data_type
---------------------------------------------------------------------
renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610008 | rename_name | character
renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610009 | rename_name | character
renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610010 | rename_name | character
renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610011 | rename_name | character
(4 rows)
\c - - :master_host :master_port
SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890');
master_get_table_ddl_events
---------------------------------------------------------------------
CREATE FOREIGN TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 (id bigint NOT NULL, rename_name character(8) DEFAULT ''::text NOT NULL) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true')
ALTER TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 OWNER TO postgres
(2 rows)
-- propagating views is not supported -- propagating views is not supported
CREATE VIEW local_view AS SELECT * FROM simple_table; CREATE VIEW local_view AS SELECT * FROM simple_table;
SELECT master_get_table_ddl_events('local_view'); SELECT master_get_table_ddl_events('local_view');
ERROR: local_view is not a regular, foreign or partitioned table ERROR: local_view is not a regular, foreign or partitioned table
-- clean up -- clean up
DROP VIEW IF EXISTS local_view; DROP VIEW IF EXISTS local_view;
DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890;
NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456"
\c - - :public_worker_1_host :worker_1_port
select table_name, column_name, data_type
from information_schema.columns
where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id'
order by table_name;
table_name | column_name | data_type
---------------------------------------------------------------------
(0 rows)
\c - - :master_host :master_port
DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table,
table_constraint_table, default_value_table, pkey_table, table_constraint_table, default_value_table, pkey_table,
unique_table, clustered_table, fiddly_table; unique_table, clustered_table, fiddly_table;

View File

@ -131,6 +131,12 @@ BEGIN
y := x; y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing -- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
y y
@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
F | S F | S
(1 row) (1 row)
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names -- Same for unqualified names
call mx_call_proc(2, 0); call mx_call_proc(2, 0);
y y
@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
(1 row) (1 row)
select create_distributed_function('mx_call_proc_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- We still don't route them to the workers, because they aren't -- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables. -- colocated with any distributed tables.
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables
F | S F | S
(1 row) (1 row)
call multi_mx_call.mx_call_proc_copy(2);
DEBUG: stored procedure does not have co-located tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement
-- Mark them as colocated with a table. Now we should route them to workers. -- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table colocate_proc_with_table
@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table
(1 row) (1 row)
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
y y
@ -253,6 +278,8 @@ DEBUG: pushing down the procedure
S | S S | S
(1 row) (1 row)
call mx_call_proc_copy(2);
DEBUG: pushing down the procedure
-- Test implicit cast of int to bigint -- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2); call mx_call_proc_bigint(4, 2);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
CALL multi_mx_call.mx_call_proc_tx(20); CALL multi_mx_call.mx_call_proc_tx(20);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
id | val id | val
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 | 1 3 | 1
3 | 5 3 | 5
4 | 5 4 | 5
6 | 5 6 | 5
9 | 2 9 | 2
10 | -2 10 | -2
11 | 3 11 | 3
20 | -2 20 | -2
21 | 3 21 | 3
(9 rows) 100 | 98
100 | 98
100 | 98
101 | 99
101 | 99
101 | 99
102 | 100
102 | 100
102 | 100
103 | 101
103 | 101
103 | 101
104 | 102
104 | 102
104 | 102
105 | 103
105 | 103
105 | 103
106 | 104
106 | 104
106 | 104
107 | 105
107 | 105
107 | 105
108 | 106
108 | 106
108 | 106
109 | 107
109 | 107
109 | 107
110 | 108
110 | 108
110 | 108
(42 rows)
-- Show that function delegation works from worker nodes as well -- Show that function delegation works from worker nodes as well
\c - - - :worker_1_port \c - - - :worker_1_port
@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment
reset client_min_messages; reset client_min_messages;
\set VERBOSITY terse \set VERBOSITY terse
drop schema multi_mx_call cascade; drop schema multi_mx_call cascade;
NOTICE: drop cascades to 13 other objects NOTICE: drop cascades to 14 other objects

View File

@ -131,6 +131,12 @@ BEGIN
y := x; y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing -- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
y y
@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A');
F | S F | S
(1 row) (1 row)
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names -- Same for unqualified names
call mx_call_proc(2, 0); call mx_call_proc(2, 0);
y y
@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca
(1 row) (1 row)
select create_distributed_function('mx_call_proc_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
-- We still don't route them to the workers, because they aren't -- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables. -- colocated with any distributed tables.
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables
F | S F | S
(1 row) (1 row)
call multi_mx_call.mx_call_proc_copy(2);
DEBUG: stored procedure does not have co-located tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement
-- Mark them as colocated with a table. Now we should route them to workers. -- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
colocate_proc_with_table colocate_proc_with_table
@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table
(1 row) (1 row)
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
colocate_proc_with_table
---------------------------------------------------------------------
(1 row)
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
y y
@ -253,6 +278,8 @@ DEBUG: pushing down the procedure
S | S S | S
(1 row) (1 row)
call mx_call_proc_copy(2);
DEBUG: pushing down the procedure
-- Test implicit cast of int to bigint -- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2); call mx_call_proc_bigint(4, 2);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see
CALL multi_mx_call.mx_call_proc_tx(20); CALL multi_mx_call.mx_call_proc_tx(20);
DEBUG: pushing down the procedure DEBUG: pushing down the procedure
SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val;
id | val id | val
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 | 1 3 | 1
3 | 5 3 | 5
4 | 5 4 | 5
6 | 5 6 | 5
9 | 2 9 | 2
10 | -2 10 | -2
11 | 3 11 | 3
20 | -2 20 | -2
21 | 3 21 | 3
(9 rows) 100 | 98
100 | 98
100 | 98
101 | 99
101 | 99
101 | 99
102 | 100
102 | 100
102 | 100
103 | 101
103 | 101
103 | 101
104 | 102
104 | 102
104 | 102
105 | 103
105 | 103
105 | 103
106 | 104
106 | 104
106 | 104
107 | 105
107 | 105
107 | 105
108 | 106
108 | 106
108 | 106
109 | 107
109 | 107
109 | 107
110 | 108
110 | 108
110 | 108
(42 rows)
-- Show that function delegation works from worker nodes as well -- Show that function delegation works from worker nodes as well
\c - - - :worker_1_port \c - - - :worker_1_port
@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment
reset client_min_messages; reset client_min_messages;
\set VERBOSITY terse \set VERBOSITY terse
drop schema multi_mx_call cascade; drop schema multi_mx_call cascade;
NOTICE: drop cascades to 13 other objects NOTICE: drop cascades to 14 other objects

View File

@ -83,6 +83,16 @@ BEGIN
y := x; y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing -- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func(2, 0);
mx_call_func mx_call_func
@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
(F,S) (F,S)
(1 row) (1 row)
select multi_mx_function_call_delegation.mx_call_copy(2);
ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
select squares(4); select squares(4);
squares squares
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca
(1 row) (1 row)
select create_distributed_function('mx_call_func_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('squares(int)'); select create_distributed_function('squares(int)');
create_distributed_function create_distributed_function
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 29
(1 row) (1 row)
select mx_call_func(2, 0) from mx_call_dist_table_1;
mx_call_func
---------------------------------------------------------------------
28
28
28
28
28
28
28
28
28
(9 rows)
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 | 27 29 | 27
(1 row) (1 row)
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement
while executing command on localhost:xxxxx
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes -- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0); SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer
CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
mx_call_func mx_call_func
--------------------------------------------------------------------- ---------------------------------------------------------------------
28 28
@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages; RESET client_min_messages;
\set VERBOSITY terse \set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE; DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 14 other objects NOTICE: drop cascades to 15 other objects

View File

@ -83,6 +83,16 @@ BEGIN
y := x; y := x;
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing -- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func(2, 0);
mx_call_func mx_call_func
@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
(F,S) (F,S)
(1 row) (1 row)
select multi_mx_function_call_delegation.mx_call_copy(2);
ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
select squares(4); select squares(4);
squares squares
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca
(1 row) (1 row)
select create_distributed_function('mx_call_func_copy(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
select create_distributed_function('squares(int)'); select create_distributed_function('squares(int)');
create_distributed_function create_distributed_function
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 29
(1 row) (1 row)
select mx_call_func(2, 0) from mx_call_dist_table_1;
mx_call_func
---------------------------------------------------------------------
28
28
28
28
28
28
28
28
28
(9 rows)
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
29 | 27 29 | 27
(1 row) (1 row)
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment
while executing command on localhost:xxxxx
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
ERROR: cannot execute a distributed query from a query on a shard
CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s"
PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement
while executing command on localhost:xxxxx
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
DEBUG: not pushing down function calls in a multi-statement transaction DEBUG: not pushing down function calls in a multi-statement transaction
CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)"
@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again.
-- show that functions can be delegated from worker nodes -- show that functions can be delegated from worker nodes
SET client_min_messages TO DEBUG1; SET client_min_messages TO DEBUG1;
SELECT mx_call_func(2, 0); SELECT mx_call_func(2, 0);
DEBUG: pushing down the function call DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint)))
CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)"
PL/pgSQL function mx_call_func(integer,integer) line XX at assignment
mx_call_func mx_call_func
--------------------------------------------------------------------- ---------------------------------------------------------------------
28 28
@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public;
RESET client_min_messages; RESET client_min_messages;
\set VERBOSITY terse \set VERBOSITY terse
DROP SCHEMA multi_mx_function_call_delegation CASCADE; DROP SCHEMA multi_mx_function_call_delegation CASCADE;
NOTICE: drop cascades to 14 other objects NOTICE: drop cascades to 15 other objects

View File

@ -92,30 +92,3 @@ SELECT * FROM customer_engagements;
1 | 03-01-2015 | third event 1 | 03-01-2015 | third event
(3 rows) (3 rows)
-- now do the same test over again with a foreign table
CREATE FOREIGN TABLE remote_engagements (
id integer,
created_at date,
event_data text
) SERVER fake_fdw_server;
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- get the newshardid
SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group;
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot repair shard
DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported.
-- clean-up
DROP FOREIGN TABLE remote_engagements CASCADE;

View File

@ -128,45 +128,6 @@ SELECT undistribute_table('referencing_table');
ERROR: cannot complete operation because table referencing_table has a foreign key ERROR: cannot complete operation because table referencing_table has a foreign key
HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true) HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true)
DROP TABLE referenced_table, referencing_table; DROP TABLE referenced_table, referencing_table;
-- test distributed foreign tables
-- we expect errors
-- and we need metadata sync off for foreign tables
SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
NOTICE: dropping metadata on the node (localhost,57638)
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(2 rows)
CREATE FOREIGN TABLE foreign_table (
id bigint not null,
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT undistribute_table('foreign_table');
NOTICE: creating a new table for undistribute_table.foreign_table
NOTICE: dropping the old undistribute_table.foreign_table
NOTICE: renaming the new table to undistribute_table.foreign_table
undistribute_table
---------------------------------------------------------------------
(1 row)
DROP FOREIGN TABLE foreign_table;
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
start_metadata_sync_to_node
---------------------------------------------------------------------
(2 rows)
-- test partitioned tables -- test partitioned tables
CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id);
CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5);

View File

@ -16,12 +16,9 @@ test: failure_add_disable_node
test: failure_copy_to_reference test: failure_copy_to_reference
test: failure_copy_on_hash test: failure_copy_on_hash
test: failure_create_reference_table test: failure_create_reference_table
test: check_mx
test: turn_mx_off
test: failure_create_distributed_table_non_empty test: failure_create_distributed_table_non_empty
test: failure_create_table test: failure_create_table
test: failure_single_select test: failure_single_select
test: turn_mx_on
test: failure_multi_shard_update_delete test: failure_multi_shard_update_delete
test: failure_cte_subquery test: failure_cte_subquery

View File

@ -268,6 +268,15 @@ BEGIN
END; END;
$insert_100$ LANGUAGE plpgsql; $insert_100$ LANGUAGE plpgsql;
CREATE TABLE local_table (value int);
CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$
BEGIN
INSERT INTO local_table VALUES (100);
RETURN NEW;
END;
$insert_100$ LANGUAGE plpgsql;
BEGIN; BEGIN;
CREATE TRIGGER insert_100_trigger CREATE TRIGGER insert_100_trigger
AFTER TRUNCATE ON another_citus_local_table AFTER TRUNCATE ON another_citus_local_table
@ -282,7 +291,7 @@ BEGIN;
SELECT * FROM reference_table; SELECT * FROM reference_table;
ROLLBACK; ROLLBACK;
-- cannot perform remote execution from a trigger on a Citus local table
BEGIN; BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic -- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600); INSERT INTO another_citus_local_table VALUES (600);
@ -296,9 +305,47 @@ BEGIN;
AFTER UPDATE ON citus_local_table AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100(); FOR EACH STATEMENT EXECUTE FUNCTION insert_100();
UPDATE another_citus_local_table SET value=value-1;;
ROLLBACK;
-- can perform regular execution from a trigger on a Citus local table
BEGIN;
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
INSERT INTO citus_local_table VALUES (600);
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
UPDATE another_citus_local_table SET value=value-1;; UPDATE another_citus_local_table SET value=value-1;;
-- we should see two rows with "100" -- we should see two rows with "100"
SELECT * FROM reference_table; SELECT * FROM local_table;
ROLLBACK;
-- can perform local execution from a trigger on a Citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('local_table');
-- update should actually update something to test ON UPDATE CASCADE logic
INSERT INTO another_citus_local_table VALUES (600);
INSERT INTO citus_local_table VALUES (600);
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON another_citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
CREATE TRIGGER insert_100_trigger
AFTER UPDATE ON citus_local_table
FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();
UPDATE another_citus_local_table SET value=value-1;;
-- we should see two rows with "100"
SELECT * FROM local_table;
ROLLBACK; ROLLBACK;
-- test on partitioned citus local tables -- test on partitioned citus local tables

View File

@ -304,6 +304,10 @@ BEGIN;
SELECT create_distributed_table('test_table', 'id'); SELECT create_distributed_table('test_table', 'id');
COMMIT; COMMIT;
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
SELECT citus.mitmproxy('conn.allow()');
SELECT recover_prepared_transactions();
DROP TABLE test_table; DROP TABLE test_table;
CREATE TABLE test_table(id int, value_1 int); CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
@ -314,7 +318,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pi
BEGIN; BEGIN;
SELECT create_distributed_table('test_table', 'id'); SELECT create_distributed_table('test_table', 'id');
COMMIT; COMMIT;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
DROP TABLE test_table; DROP TABLE test_table;
CREATE TABLE test_table(id int, value_1 int); CREATE TABLE test_table(id int, value_1 int);

View File

@ -99,6 +99,7 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").ki
SELECT create_distributed_table('test_table','id',colocate_with=>'temp_table'); SELECT create_distributed_table('test_table','id',colocate_with=>'temp_table');
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_shard; SELECT count(*) FROM pg_dist_shard;
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$);

View File

@ -4,6 +4,9 @@ SELECT citus.clear_network_traffic();
SET citus.shard_count = 2; SET citus.shard_count = 2;
SET citus.shard_replication_factor = 2; SET citus.shard_replication_factor = 2;
-- this test is designed such that no modification lock is acquired
SET citus.allow_modifications_from_workers_to_replicated_tables TO false;
CREATE TABLE select_test (key int, value text); CREATE TABLE select_test (key int, value text);
SELECT create_distributed_table('select_test', 'key'); SELECT create_distributed_table('select_test', 'key');
@ -23,6 +26,8 @@ INSERT INTO select_test VALUES (3, 'more data');
SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3;
COMMIT; COMMIT;
SELECT citus.mitmproxy('conn.allow()');
TRUNCATE select_test; TRUNCATE select_test;
-- now the same tests with query cancellation -- now the same tests with query cancellation
@ -47,6 +52,8 @@ SELECT DISTINCT shardstate FROM pg_dist_shard_placement
WHERE shardid IN ( WHERE shardid IN (
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'select_test'::regclass SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'select_test'::regclass
); );
SELECT citus.mitmproxy('conn.allow()');
TRUNCATE select_test; TRUNCATE select_test;
-- cancel the second query -- cancel the second query

View File

@ -65,17 +65,15 @@ ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2;
ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2;
-- trigger test -- trigger test
CREATE TABLE distributed_table(value int); CREATE TABLE table42(value int);
SELECT create_distributed_table('distributed_table', 'value');
CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$
BEGIN BEGIN
INSERT INTO distributed_table VALUES (42); INSERT INTO table42 VALUES (42);
RETURN NEW; RETURN NEW;
END; END;
$insert_42$ LANGUAGE plpgsql; $insert_42$ LANGUAGE plpgsql;
CREATE TRIGGER insert_42_trigger CREATE TRIGGER insert_42_trigger
AFTER DELETE ON public.foreign_table_newname AFTER DELETE ON public.foreign_table_newname
FOR EACH ROW EXECUTE FUNCTION insert_42(); FOR EACH ROW EXECUTE FUNCTION insert_42();
@ -83,14 +81,14 @@ FOR EACH ROW EXECUTE FUNCTION insert_42();
-- do the same pattern from the workers as well -- do the same pattern from the workers as well
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99; delete from public.foreign_table_newname where id_test = 99;
select * from distributed_table ORDER BY value; select * from table42 ORDER BY value;
-- disable trigger -- disable trigger
alter foreign table public.foreign_table_newname disable trigger insert_42_trigger; alter foreign table public.foreign_table_newname disable trigger insert_42_trigger;
INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); INSERT INTO public.foreign_table_newname VALUES (99, 'test_2');
delete from public.foreign_table_newname where id_test = 99; delete from public.foreign_table_newname where id_test = 99;
-- should not insert again as trigger disabled -- should not insert again as trigger disabled
select * from distributed_table ORDER BY value; select * from table42 ORDER BY value;
DROP TRIGGER insert_42_trigger ON public.foreign_table_newname; DROP TRIGGER insert_42_trigger ON public.foreign_table_newname;
@ -135,10 +133,10 @@ ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgre
-- test undistributing -- test undistributing
DELETE FROM foreign_table; DELETE FROM foreign_table;
SELECT undistribute_table('foreign_table'); SELECT undistribute_table('foreign_table');
-- both should error out
SELECT create_distributed_table('foreign_table','data'); SELECT create_distributed_table('foreign_table','data');
SELECT undistribute_table('foreign_table');
SELECT create_reference_table('foreign_table'); SELECT create_reference_table('foreign_table');
SELECT undistribute_table('foreign_table');
INSERT INTO foreign_table_test VALUES (1, 'testt'); INSERT INTO foreign_table_test VALUES (1, 'testt');
SELECT * FROM foreign_table ORDER BY a; SELECT * FROM foreign_table ORDER BY a;

View File

@ -47,9 +47,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a');
CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table;
CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server;
SELECT create_distributed_table('foreign_distributed_table', 'a');
-- and insert some data -- and insert some data
INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5);
INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5);
@ -65,7 +62,6 @@ SELECT * FROM partitioned_distributed_table UNION SELECT 1, * FROM postgres_loca
SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2;
SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2; SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2;
SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1;
SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2; SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2;
@ -117,11 +113,6 @@ SELECT COUNT(*) FROM
(SELECT *, random() FROM partitioned_distributed_table) AS bar (SELECT *, random() FROM partitioned_distributed_table) AS bar
WHERE foo.a = bar.b; WHERE foo.a = bar.b;
SELECT COUNT(*) FROM
(SELECT *, random() FROM unlogged_distributed_table) AS foo,
(SELECT *, random() FROM foreign_distributed_table) AS bar
WHERE foo.a = bar.b;
UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo;
UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo; UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo;
UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a; UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a;
@ -161,9 +152,6 @@ WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table)
WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table)
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a); SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a);
WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table)
SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a);
WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table)
SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b);
@ -245,22 +233,12 @@ EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2; SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2;
$Q$); $Q$);
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2;
$Q$);
-- pull to coordinator WINDOW -- pull to coordinator WINDOW
SELECT public.coordinator_plan($Q$ SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2; SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2;
$Q$); $Q$);
SELECT public.coordinator_plan($Q$
EXPLAIN (COSTS OFF)
SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2;
$Q$);
-- FOR UPDATE -- FOR UPDATE
SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE;
@ -276,10 +254,5 @@ BEGIN;
SELECT * FROM partitioned_distributed_table; SELECT * FROM partitioned_distributed_table;
COMMIT; COMMIT;
BEGIN;
ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE;
SELECT * FROM foreign_distributed_table;
COMMIT;
-- cleanup at exit -- cleanup at exit
DROP SCHEMA mixed_relkind_tests CASCADE; DROP SCHEMA mixed_relkind_tests CASCADE;

View File

@ -122,9 +122,14 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_
-- show that non-admin role can not activate a node -- show that non-admin role can not activate a node
SET ROLE node_metadata_user; SET ROLE node_metadata_user;
SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
BEGIN; BEGIN;
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port);
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport;
ABORT; ABORT;
\c - postgres - :master_port \c - postgres - :master_port

View File

@ -202,10 +202,6 @@ SELECT create_distributed_table('table_append', 'id', 'append');
CREATE TABLE table_range ( id int ); CREATE TABLE table_range ( id int );
SELECT create_distributed_table('table_range', 'id', 'range'); SELECT create_distributed_table('table_range', 'id', 'range');
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
-- check metadata -- check metadata
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
@ -290,8 +286,8 @@ CREATE TABLE table_bigint ( id bigint );
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
-- check worker table schemas -- check worker table schemas
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass;
\c - - - :master_port \c - - - :master_port
SET citus.next_shard_id TO 1300080; SET citus.next_shard_id TO 1300080;

View File

@ -106,22 +106,6 @@ SELECT sort_names('sumedh', 'jason', 'ozgun');
SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r'; SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r';
-- test foreign table creation
CREATE FOREIGN TABLE foreign_table_to_distribute
(
name text,
id bigint
)
SERVER fake_fdw_server;
SET citus.shard_count TO 16;
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash');
SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass
ORDER BY (shardminvalue::integer) ASC;
-- test shard creation using weird shard count -- test shard creation using weird shard count
CREATE TABLE weird_shard_count CREATE TABLE weird_shard_count
( (
@ -137,14 +121,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size
FROM pg_dist_shard FROM pg_dist_shard
WHERE logicalrelid = 'weird_shard_count'::regclass WHERE logicalrelid = 'weird_shard_count'::regclass
ORDER BY shardminvalue::integer ASC; ORDER BY shardminvalue::integer ASC;
-- cleanup foreign table, related shards and shard placements
DELETE FROM pg_dist_shard_placement
WHERE shardid IN (SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass);
DELETE FROM pg_dist_shard
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass;
DELETE FROM pg_dist_partition
WHERE logicalrelid = 'foreign_table_to_distribute'::regclass;

View File

@ -116,25 +116,6 @@ ALTER TABLE fiddly_table
SELECT master_get_table_ddl_events('fiddly_table'); SELECT master_get_table_ddl_events('fiddly_table');
-- test foreign tables using fake FDW
CREATE FOREIGN TABLE foreign_table (
id bigint not null,
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890;
ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name;
ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8);
\c - - :public_worker_1_host :worker_1_port
select table_name, column_name, data_type
from information_schema.columns
where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id'
order by table_name;
\c - - :master_host :master_port
SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890');
-- propagating views is not supported -- propagating views is not supported
CREATE VIEW local_view AS SELECT * FROM simple_table; CREATE VIEW local_view AS SELECT * FROM simple_table;
@ -142,13 +123,6 @@ SELECT master_get_table_ddl_events('local_view');
-- clean up -- clean up
DROP VIEW IF EXISTS local_view; DROP VIEW IF EXISTS local_view;
DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890;
\c - - :public_worker_1_host :worker_1_port
select table_name, column_name, data_type
from information_schema.columns
where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id'
order by table_name;
\c - - :master_host :master_port
DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table,
table_constraint_table, default_value_table, pkey_table, table_constraint_table, default_value_table, pkey_table,
unique_table, clustered_table, fiddly_table; unique_table, clustered_table, fiddly_table;

View File

@ -100,9 +100,18 @@ BEGIN
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
CREATE PROCEDURE mx_call_proc_copy(x int)
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_call.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
END;$$;
-- Test that undistributed procedures have no issue executing -- Test that undistributed procedures have no issue executing
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call multi_mx_call.mx_call_proc_copy(2);
-- Same for unqualified names -- Same for unqualified names
call mx_call_proc(2, 0); call mx_call_proc(2, 0);
@ -112,6 +121,7 @@ call mx_call_proc_custom_types('S', 'A');
select create_distributed_function('mx_call_proc(int,int)'); select create_distributed_function('mx_call_proc(int,int)');
select create_distributed_function('mx_call_proc_bigint(bigint,bigint)'); select create_distributed_function('mx_call_proc_bigint(bigint,bigint)');
select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)');
select create_distributed_function('mx_call_proc_copy(int)');
-- We still don't route them to the workers, because they aren't -- We still don't route them to the workers, because they aren't
-- colocated with any distributed tables. -- colocated with any distributed tables.
@ -119,16 +129,19 @@ SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
call mx_call_proc_bigint(4, 2); call mx_call_proc_bigint(4, 2);
call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call multi_mx_call.mx_call_proc_copy(2);
-- Mark them as colocated with a table. Now we should route them to workers. -- Mark them as colocated with a table. Now we should route them to workers.
select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1); select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1);
select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0);
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call multi_mx_call.mx_call_proc_custom_types('S', 'A');
call mx_call_proc(2, 0); call mx_call_proc(2, 0);
call mx_call_proc_custom_types('S', 'A'); call mx_call_proc_custom_types('S', 'A');
call mx_call_proc_copy(2);
-- Test implicit cast of int to bigint -- Test implicit cast of int to bigint
call mx_call_proc_bigint(4, 2); call mx_call_proc_bigint(4, 2);

View File

@ -67,9 +67,21 @@ BEGIN
x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group);
END;$$; END;$$;
-- function which internally uses COPY protocol without remote execution
CREATE FUNCTION mx_call_func_copy(x int)
RETURNS bool
LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1
SELECT s,s FROM generate_series(100, 110) s;
RETURN true;
END;$$;
-- Test that undistributed functions have no issue executing -- Test that undistributed functions have no issue executing
select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
select multi_mx_function_call_delegation.mx_call_copy(2);
select squares(4); select squares(4);
-- Same for unqualified name -- Same for unqualified name
@ -79,6 +91,7 @@ select mx_call_func(2, 0);
select create_distributed_function('mx_call_func(int,int)'); select create_distributed_function('mx_call_func(int,int)');
select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); select create_distributed_function('mx_call_func_bigint(bigint,bigint)');
select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)');
select create_distributed_function('mx_call_func_copy(int)');
select create_distributed_function('squares(int)'); select create_distributed_function('squares(int)');
@ -249,10 +262,15 @@ select mx_call_func(floor(random())::int, 2);
-- test forms we don't distribute -- test forms we don't distribute
select * from mx_call_func(2, 0); select * from mx_call_func(2, 0);
select mx_call_func(2, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0;
select mx_call_func(2, 0), mx_call_func(0, 2); select mx_call_func(2, 0), mx_call_func(0, 2);
-- we do not delegate the call, but do push down the query
-- that result in remote execution from workers
select mx_call_func(id, 0) from mx_call_dist_table_1;
select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3;
select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3;
DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DO $$ BEGIN perform mx_call_func_tbl(40); END; $$;
SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val;

View File

@ -80,29 +80,3 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid AND grou
-- get the data from the second placement -- get the data from the second placement
SELECT * FROM customer_engagements; SELECT * FROM customer_engagements;
-- now do the same test over again with a foreign table
CREATE FOREIGN TABLE remote_engagements (
id integer,
created_at date,
event_data text
) SERVER fake_fdw_server;
-- distribute the table
-- create a single shard on the first worker
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
-- get the newshardid
SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass
\gset
-- now, update the second placement as unhealthy
UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group;
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- clean-up
DROP FOREIGN TABLE remote_engagements CASCADE;

View File

@ -52,20 +52,6 @@ SELECT undistribute_table('referencing_table');
DROP TABLE referenced_table, referencing_table; DROP TABLE referenced_table, referencing_table;
-- test distributed foreign tables
-- we expect errors
-- and we need metadata sync off for foreign tables
SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
CREATE FOREIGN TABLE foreign_table (
id bigint not null,
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
SELECT undistribute_table('foreign_table');
DROP FOREIGN TABLE foreign_table;
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
-- test partitioned tables -- test partitioned tables
CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id);
CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5);