From a14739f80815c1e861b8ae74a40d65b52c5856f2 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 13 Mar 2020 15:39:32 +0300 Subject: [PATCH] Local execution of ddl/drop/truncate commands (#3514) * reimplement ExecuteUtilityTaskListWithoutResults for local utility command execution * introduce new functions for local execution of utility commands * change ErrorIfTransactionAccessedPlacementsLocally logic for local utility command execution * enable local execution for TRUNCATE command on distributed & reference tables * update existing tests for local utility command execution * enable local execution for DDL commands on distributed & reference tables * enable local execution for DROP command on distributed & reference tables * add normalization rules for cascaded commands * add new tests for local utility command execution --- .../distributed/commands/utility_hook.c | 28 +- src/backend/distributed/commands/vacuum.c | 6 +- .../distributed/executor/adaptive_executor.c | 56 +- .../distributed/executor/local_executor.c | 141 +++- .../distributed/executor/multi_executor.c | 16 + .../master/master_delete_protocol.c | 40 +- .../distributed/master/master_truncate.c | 8 +- src/include/distributed/local_executor.h | 1 + src/include/distributed/multi_executor.h | 5 +- src/test/regress/bin/normalize.sed | 4 + .../expected/coordinator_shouldhaveshards.out | 5 +- .../expected/local_shard_execution.out | 47 +- .../local_shard_utility_command_execution.out | 731 ++++++++++++++++++ .../expected/multi_mx_add_coordinator.out | 1 + .../multi_mx_insert_select_repartition.out | 29 + .../multi_mx_truncate_from_worker.out | 6 + ...licate_reference_tables_to_coordinator.out | 1 + src/test/regress/multi_schedule | 3 +- .../regress/sql/local_shard_execution.sql | 5 +- .../local_shard_utility_command_execution.sql | 332 ++++++++ .../multi_mx_insert_select_repartition.sql | 16 + 21 files changed, 1436 insertions(+), 45 deletions(-) create mode 100644 src/test/regress/expected/local_shard_utility_command_execution.out create mode 100644 src/test/regress/sql/local_shard_utility_command_execution.sql diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 7ed7a1928..976f24cb7 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -684,16 +684,27 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsureCoordinator(); - if (ddlJob->targetRelationId != InvalidOid) + Oid targetRelationId = ddlJob->targetRelationId; + + if (OidIsValid(targetRelationId)) { /* - * Only for ddlJobs that are targetting a relation (table) we want to sync its - * metadata and verify some properties around the table. + * Only for ddlJobs that are targetting a relation (table) we want to sync + * its metadata and verify some properties around the table. */ - shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); - EnsurePartitionTableNotReplicated(ddlJob->targetRelationId); + shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId); + EnsurePartitionTableNotReplicated(targetRelationId); } + /* + * If it is a local placement of a distributed table or a reference table, + * then execute the DDL command locally. + * Here we set localExecutionSupported to true regardless of whether the + * DDL command is run for/on a distributed table as + * ExecuteUtilityTaskListWithoutResults would already identify those + * DDL tasks not accessing any of the local placements. + */ + bool localExecutionSupported = true; if (!ddlJob->concurrentIndexCmd) { @@ -715,8 +726,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); } - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList); + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported); } else { @@ -727,8 +737,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) PG_TRY(); { - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(ddlJob->taskList); + ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, + localExecutionSupported); if (shouldSyncMetadata) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 62bc45537..ec3b9c77d 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -108,8 +108,10 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex); List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList); - /* use adaptive executor when enabled */ - ExecuteUtilityTaskListWithoutResults(taskList); + /* local execution is not implemented for VACUUM commands */ + bool localExecutionSupported = false; + + ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); executedVacuumCount++; } relationIndex++; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 39e071f2d..503500560 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -815,13 +815,50 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution) /* * ExecuteUtilityTaskListWithoutResults is a wrapper around executing task - * list for utility commands. It simply calls in adaptive executor's task - * execution function. + * list for utility commands. For remote tasks, it simply calls in adaptive + * executor's task execution function. For local tasks (if any), kicks Process + * Utility via CitusProcessUtility for utility commands. As some local utility + * commands can trigger udf calls, this function also processes those udf calls + * locally. */ void -ExecuteUtilityTaskListWithoutResults(List *taskList) +ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported) { - ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize); + RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE; + + List *localTaskList = NIL; + List *remoteTaskList = NIL; + + /* + * Divide tasks into two if localExecutionSupported is set to true and execute + * the local tasks + */ + if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + { + /* + * Either we are executing a utility command or a UDF call triggered + * by such a command, it has to be a modifying one + */ + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + + /* execute local tasks */ + ExecuteLocalUtilityTaskList(localTaskList); + } + else + { + /* all tasks should be executed via remote connections */ + remoteTaskList = taskList; + } + + /* execute remote tasks if any */ + if (list_length(remoteTaskList) > 0) + { + ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize); + } } @@ -900,10 +937,15 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo = NULL; /* - * The code-paths that rely on this function do not know how to execute - * commands locally. + * If current transaction accessed local placements and task list includes + * tasks that should be executed locally (accessing any of the local placements), + * then we should error out as it would cause inconsistencies across the + * remote connection and local execution. */ - ErrorIfTransactionAccessedPlacementsLocally(); + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + { + ErrorIfTransactionAccessedPlacementsLocally(); + } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 3451994c9..7003ea0b3 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -67,9 +67,6 @@ * use local query execution since local execution is sequential. Basically, * we do not want to lose parallelism across local tasks by switching to local * execution. - * - The local execution currently only supports queries. In other words, any - * utility commands like TRUNCATE, fails if the command is executed after a local - * execution inside a transaction block. * - The local execution cannot be mixed with the executors other than adaptive, * namely task-tracker executor. * - Related with the previous item, COPY command cannot be mixed with local @@ -79,6 +76,7 @@ #include "postgres.h" #include "miscadmin.h" +#include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" @@ -90,6 +88,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ #include "distributed/transaction_management.h" +#include "distributed/worker_protocol.h" #include "executor/tstoreReceiver.h" #include "executor/tuptable.h" #if PG_VERSION_NUM >= 120000 @@ -118,6 +117,8 @@ static void LogLocalCommand(Task *task); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); +static void LocallyExecuteUtilityTask(const char *utilityCommand); +static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery); /* @@ -245,6 +246,98 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } +/* + * ExecuteLocalUtilityTaskList executes a list of tasks locally. This function + * also logs local execution notice for each task and sets + * TransactionAccessedLocalPlacement to true for next set of possible queries + * & commands within the current transaction block. See the comment in function. + */ +void +ExecuteLocalUtilityTaskList(List *localTaskList) +{ + Task *localTask = NULL; + + foreach_ptr(localTask, localTaskList) + { + const char *localTaskQueryCommand = TaskQueryString(localTask); + + /* we do not expect tasks with INVALID_SHARD_ID for utility commands */ + Assert(localTask->anchorShardId != INVALID_SHARD_ID); + + Assert(TaskAccessesLocalNode(localTask)); + + /* + * We should register the access to local placement to force the local + * execution of the following commands withing the current transaction. + */ + TransactionAccessedLocalPlacement = true; + + LogLocalCommand(localTask); + + LocallyExecuteUtilityTask(localTaskQueryCommand); + } +} + + +/* + * LocallyExecuteUtilityTask executes the given local task query in the current + * session. + */ +static void +LocallyExecuteUtilityTask(const char *localTaskQueryCommand) +{ + RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand); + + Node *localTaskRawParseTree = localTaskRawStmt->stmt; + + /* + * Actually, the query passed to this function would mostly be a + * utility command to be executed locally. However, some utility + * commands do trigger udf calls (e.g worker_apply_shard_ddl_command) + * to execute commands in a generic way. But as we support local + * execution of utility commands, we should also process those udf + * calls locally as well. In that case, we simply execute the query + * implying the udf call in below conditional block. + */ + if (IsA(localTaskRawParseTree, SelectStmt)) + { + /* we have no external parameters to rewrite the UDF call RawStmt */ + Query *localUdfTaskQuery = + RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0); + + LocallyExecuteUdfTaskQuery(localUdfTaskQuery); + } + else + { + /* + * It is a regular utility command or SELECT query with non-udf, + * targets, then we should execute it locally via process utility. + * + * If it is a regular utility command, CitusProcessUtility is the + * appropriate function to process that command. However, if it's + * a SELECT query with non-udf targets, CitusProcessUtility would + * error out as we are not expecting such SELECT queries triggered + * by utility commands. + */ + CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + } +} + + +/* + * LocallyExecuteUdfTaskQuery executes the given udf command locally. Local udf + * command is simply a "SELECT udf_call()" query and so it cannot be executed + * via process utility. + */ +static void +LocallyExecuteUdfTaskQuery(Query *localUdfTaskQuery) +{ + /* we do not expect any results */ + ExecuteQueryIntoDestReceiver(localUdfTaskQuery, NULL, None_Receiver); +} + + /* * LogLocalCommand logs commands executed locally on this node. Although we're * talking about local execution, the function relies on citus.log_remote_commands @@ -437,16 +530,33 @@ ShouldExecuteTasksLocally(List *taskList) if (TransactionAccessedLocalPlacement) { + bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false; + /* * For various reasons, including the transaction visibility * rules (e.g., read-your-own-writes), we have to use local * execution again if it has already happened within this * transaction block. - * + */ + isValidLocalExecutionPath = IsMultiStatementTransaction() || + InCoordinatedTransaction(); + + /* + * In some cases, such as when a single command leads to a local + * command execution followed by remote task (list) execution, we + * still expect the remote execution to first try local execution + * as TransactionAccessedLocalPlacement is set by the local execution. + * The remote execution shouldn't create any local tasks as the local + * execution should have executed all the local tasks. And, we are + * ensuring it here. + */ + isValidLocalExecutionPath |= !AnyTaskAccessesLocalNode(taskList); + + /* * We might error out later in the execution if it is not suitable * to execute the tasks locally. */ - Assert(IsMultiStatementTransaction() || InCoordinatedTransaction()); + Assert(isValidLocalExecutionPath); /* * TODO: A future improvement could be to keep track of which placements @@ -497,6 +607,27 @@ ShouldExecuteTasksLocally(List *taskList) } +/* + * AnyTaskAccessesLocalNode returns true if a task within the task list accesses + * to the local node. + */ +bool +AnyTaskAccessesLocalNode(List *taskList) +{ + Task *task = NULL; + + foreach_ptr(task, taskList) + { + if (TaskAccessesLocalNode(task)) + { + return true; + } + } + + return false; +} + + /* * TaskAccessesLocalNode returns true if any placements of the task reside on * the node that we're executing the query. diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 59826fe64..82d177054 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -607,6 +607,22 @@ Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams) { RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString); + + /* rewrite the parsed RawStmt to produce a Query */ + Query *query = RewriteRawQueryStmt(rawStmt, queryString, paramOids, numParams); + + return query; +} + + +/* + * RewriteRawQueryStmt rewrites the given parsed RawStmt according to the other + * parameters and returns a Query struct. + */ +Query * +RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, int + numParams) +{ List *queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 1f8fe2a2c..ad82e01e8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -396,6 +396,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, List *dropTaskList = DropTaskList(relationId, schemaName, relationName, deletableShardIntervalList); + bool shouldExecuteTasksLocally = ShouldExecuteTasksLocally(dropTaskList); Task *task = NULL; foreach_ptr(task, dropTaskList) @@ -423,14 +424,39 @@ DropShards(Oid relationId, char *schemaName, char *relationName, continue; } - const char *dropShardPlacementCommand = TaskQueryString(task); - ExecuteDropShardPlacementCommandRemotely(shardPlacement, - relationName, - dropShardPlacementCommand); - - if (isLocalShardPlacement) + /* + * If it is a local placement of a distributed table or a reference table, + * then execute the DROP command locally. + */ + if (isLocalShardPlacement && shouldExecuteTasksLocally) { - TransactionConnectedToLocalGroup = true; + List *singleTaskList = list_make1(task); + + ExecuteLocalUtilityTaskList(singleTaskList); + } + else + { + /* + * Either it was not a local placement or we could not use + * local execution even if it was a local placement. + * If it is the second case, then it is possibly because in + * current transaction, some commands or queries connected + * to local group as well. + * + * Regardless of the node is a remote node or the current node, + * try to open a new connection (or use an existing one) to + * connect to that node to drop the shard placement over that + * remote connection. + */ + const char *dropShardPlacementCommand = TaskQueryString(task); + ExecuteDropShardPlacementCommandRemotely(shardPlacement, + relationName, + dropShardPlacementCommand); + + if (isLocalShardPlacement) + { + TransactionConnectedToLocalGroup = true; + } } DeleteShardPlacementRow(shardPlacementId); diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index d6d4d56e0..4c31a1b97 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -73,7 +73,13 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) { List *taskList = TruncateTaskList(relationId); - ExecuteUtilityTaskListWithoutResults(taskList); + /* + * If it is a local placement of a distributed table or a reference table, + * then execute TRUNCATE command locally. + */ + bool localExecutionSupported = true; + + ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported); } PG_RETURN_DATUM(PointerGetDatum(NULL)); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 98e69bf97..8b11e096c 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -22,6 +22,7 @@ extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); +extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index d1167b243..448158d59 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -84,7 +84,8 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); -extern void ExecuteUtilityTaskListWithoutResults(List *taskList); +extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool + localExecutionSupported); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize); extern bool IsCitusCustomState(PlanState *planState); @@ -94,6 +95,8 @@ extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *worker extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescriptor, Tuplestorestate *tupstore); extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams); +extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, + Oid *paramOids, int numParams); extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params, DestReceiver *dest); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 64d398028..3e1b2d856 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -60,6 +60,10 @@ s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g # isolation_ref2ref_foreign_keys s/"(ref_table_[0-9]_|ref_table_[0-9]_value_fkey_)[0-9]+"/"\1xxxxxxx"/g +# commands cascading to shard relations +s/(NOTICE: .*_)[0-9]{5,}( CASCADE)/\1xxxxx\2/g +s/(NOTICE: [a-z]+ cascades to table ".*)_[0-9]{5,}"/\1_xxxxx"/g + # Line info varies between versions /^LINE [0-9]+:.*$/d /^ *\^$/d diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 54571f077..0af6f175a 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -107,9 +107,8 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar (1 row) ALTER TABLE test DROP COLUMN z; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;') ROLLBACK; BEGIN; ALTER TABLE test DROP COLUMN z; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index c604a3c76..e926dd31e 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -362,8 +362,7 @@ COPY second_distributed_table FROM STDIN WITH CSV; -- (a) Unless the first query is a local query, always use distributed execution. -- (b) If the executor has used local execution, it has to use local execution -- for the remaining of the transaction block. If that's not possible, the - -- executor has to error out (e.g., TRUNCATE is a utility command and we - -- currently do not support local execution of utility commands) + -- executor has to error out -- rollback should be able to rollback local execution BEGIN; INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *; @@ -592,7 +591,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query followed by TRUNCATE command can be executed locally BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -603,9 +602,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar TRUNCATE distributed_table CASCADE; NOTICE: truncate cascades to table "second_distributed_table" -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; @@ -899,6 +901,17 @@ WHERE -- get ready for the next commands TRUNCATE reference_table, distributed_table, second_distributed_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE -- local execution of returning of reference tables INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *; NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 AS citus_table_alias (key) VALUES (1), (2), (3), (4), (5), (6) RETURNING citus_table_alias.key @@ -1324,6 +1337,17 @@ ROLLBACK; TRUNCATE reference_table CASCADE; NOTICE: truncate cascades to table "distributed_table" NOTICE: truncate cascades to table "second_distributed_table" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; -- show that both local, and mixed local-distributed executions @@ -1593,6 +1617,17 @@ COMMIT; TRUNCATE reference_table CASCADE; NOTICE: truncate cascades to table "distributed_table" NOTICE: truncate cascades to table "second_distributed_table" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE +NOTICE: truncate cascades to table "second_distributed_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE -- load some data on a remote shard INSERT INTO reference_table (key) VALUES (2); NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (2) diff --git a/src/test/regress/expected/local_shard_utility_command_execution.out b/src/test/regress/expected/local_shard_utility_command_execution.out new file mode 100644 index 000000000..42ee966e4 --- /dev/null +++ b/src/test/regress/expected/local_shard_utility_command_execution.out @@ -0,0 +1,731 @@ +-- This tests file includes tests for local execution of utility commands. +-- For now, this file includes tests only for local execution of +-- `TRUNCATE/DROP/DDL` commands for all kinds of distributed tables from +-- the coordinator node having regular distributed tables' shards +-- (shouldHaveShards = on) and having reference table placements in it. +\set VERBOSITY terse +SET citus.next_shard_id TO 1500000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; +SET citus.shard_COUNT TO 32; +SET citus.log_local_commands TO ON; +CREATE SCHEMA local_commands_test_schema; +SET search_path TO local_commands_test_schema; +-- let coordinator have distributed table shards/placements +set client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + master_set_node_property +--------------------------------------------------------------------- + +(1 row) + +--------------------------------------------------------------------- +------ local execution of TRUNCATE ------ +--------------------------------------------------------------------- +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); +-- insert some data +INSERT INTO ref_table VALUES(1); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (1) +INSERT INTO dist_table VALUES(1); +-- Currently, we support local execution of TRUNCATE commands for all kinds +-- Hence, cascading to distributed tables wouldn't be a problem even in the +-- case that coordinator have some local distributed table shards. +TRUNCATE ref_table CASCADE; +NOTICE: truncate cascades to table "dist_table" +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table, dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- insert some data +INSERT INTO ref_table VALUES(1); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (1) +INSERT INTO dist_table VALUES(1); +-- As SELECT accesses local placements of reference table, TRUNCATE would also +-- be forced to local execution even if they operate on different tables. +BEGIN; + SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500000 ref_table + count +--------------------------------------------------------------------- + 1 +(1 row) + + TRUNCATE dist_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +COMMIT; +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- insert some data +INSERT INTO ref_table VALUES(2); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (2) +INSERT INTO dist_table VALUES(2); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.dist_table_1500025 (a) VALUES (2) +-- However, SELECT would access local placements via remote connections +-- for regular distributed tables, TRUNCATE would also be executed remotely. +BEGIN; + SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 1 +(1 row) + + TRUNCATE dist_table; +COMMIT; +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- insert some data +INSERT INTO ref_table VALUES(3); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (3) +INSERT INTO dist_table VALUES(3); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.dist_table_1500016 (a) VALUES (3) +-- TRUNCATE on dist_table (note that: again no cascade here) would +-- just be handled via remote executions even on its local shards +TRUNCATE dist_table; +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- insert some data +INSERT INTO ref_table VALUES(4); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (4) +-- However, creating a dist. table is handled by remote connections. +-- Hence, the commands following it (INSERT & TRUNCATE) would also be +-- handled remotely. +BEGIN; + CREATE TABLE ref_table_1(a int); + SELECT create_reference_table('ref_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + + -- insert some data + INSERT INTO ref_table_1 VALUES(5); + TRUNCATE ref_table_1; +COMMIT; +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table_1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1_1500033 ref_table_1 + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- However, as SELECT would access local placements via remote parallel +-- connections for regular distributed tables, below TRUNCATE would error +-- out +BEGIN; + SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + + TRUNCATE ref_table CASCADE; +NOTICE: truncate cascades to table "dist_table" +ERROR: cannot execute DDL on reference table "ref_table" because there was a parallel SELECT access to distributed table "dist_table" in the same transaction +COMMIT; +-- as we do not support local ANALYZE execution yet, below block would error out +BEGIN; + TRUNCATE ref_table CASCADE; +NOTICE: truncate cascades to table "dist_table" +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE + ANALYZE ref_table; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +COMMIT; +-- insert some data +INSERT INTO ref_table VALUES(7); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (7) +INSERT INTO dist_table VALUES(7); +-- we can TRUNCATE those two tables within the same command +TRUNCATE ref_table, dist_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: truncate cascades to table "dist_table_xxxxx" +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table, dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +--------------------------------------------------------------------- +------ local execution of DROP ------ +--------------------------------------------------------------------- +-- droping just the referenced table would error out as dist_table references it +DROP TABLE ref_table; +ERROR: cannot drop table ref_table because other objects depend on it +-- drop those two tables via remote execution +DROP TABLE ref_table, dist_table; +-- drop the other standalone table locally +DROP TABLE ref_table_1; +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_1_xxxxx CASCADE +-- show that DROP commands are successfull +SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename; + tablename +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- We execute SELECT command within the below block locally. +-- Hence we should execute the DROP command locally as well. +BEGIN; + SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500034 ref_table + count +--------------------------------------------------------------------- + 0 +(1 row) + + DROP TABLE ref_table; +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_xxxxx CASCADE +COMMIT; +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); +-- show that DROP command is rollback'd successfully (should print 1) +SELECT 1 FROM pg_tables where tablename='dist_table'; + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- As SELECT will be executed remotely, the DROP command should also be executed +-- remotely to prevent possible self-deadlocks & inconsistencies. +-- FIXME: we have a known bug for below case described in +-- https://github.com/citusdata/citus/issues/3526. Hence, commented out as it could +-- randomly fall into distributed deadlocks +--BEGIN; +-- SELECT COUNT(*) FROM dist_table; +-- DROP TABLE dist_table; +--END; +-- As SELECT will be executed remotely, the DROP command below should also be +-- executed remotely. +CREATE TABLE another_dist_table(a int); +SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; + SELECT COUNT(*) FROM another_dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + + DROP TABLE another_dist_table; +COMMIT; +-- show that DROP command is committed successfully +SELECT 1 FROM pg_tables where tablename='another_dist_table'; + ?column? +--------------------------------------------------------------------- +(0 rows) + +-- below DROP will be executed remotely. +DROP TABLE dist_table; +-- show that DROP command is successfull +SELECT 1 FROM pg_tables where tablename='dist_table'; + ?column? +--------------------------------------------------------------------- +(0 rows) + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); +-- as SELECT on ref_table will be executed locally, the SELECT and DROP following +-- it would also be executed locally +BEGIN; + SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table + count +--------------------------------------------------------------------- + 0 +(1 row) + + DROP TABLE dist_table CASCADE; +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +ROLLBACK; +-- show that DROP command is rollback'd successfully (should print 1) +SELECT 1 FROM pg_tables where tablename='dist_table'; + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +--------------------------------------------------------------------- +------ local execution of DDL commands ------ +--------------------------------------------------------------------- +-- try some complicated CASCADE cases along with DDL commands +CREATE TABLE ref_table_1(a int primary key); +SELECT create_reference_table('ref_table_1'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- below block should execute successfully +BEGIN; + SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- as SELECT above runs locally and as now we support local execution of DDL commands, + -- below DDL should be able to define foreign key constraint successfully + ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500035, 'local_commands_test_schema', 1500132, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a);') + -- insert some data + INSERT INTO ref_table_1 VALUES (1); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1_1500132 (a) VALUES (1) + INSERT INTO ref_table_1 VALUES (2); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1_1500132 (a) VALUES (2) + INSERT INTO ref_table VALUES (1); +NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500035 (a) VALUES (1) + -- chain foreign key constraints + -- local execution should be observed here as well + ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500100, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500103, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500106, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500109, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500112, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500115, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500118, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500121, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500124, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500127, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500130, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);') + INSERT INTO dist_table VALUES (1); + DELETE FROM ref_table_1 WHERE a=2; +NOTICE: executing the command locally: DELETE FROM local_commands_test_schema.ref_table_1_1500132 ref_table_1 WHERE (a OPERATOR(pg_catalog.=) 2) + -- add another column to dist_table + -- note that we execute below DDL locally as well + ALTER TABLE ref_table ADD b int; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500035, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD b int;') + -- define self reference + ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a); +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500035, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a);') + SELECT COUNT(*) FROM ref_table_1, ref_table, dist_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500100 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500103 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500106 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500109 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500112 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500115 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500118 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500121 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500124 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500127 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500130 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true + count +--------------------------------------------------------------------- + 1 +(1 row) + + -- observe DROP on a self-referencing table also works + DROP TABLE ref_table_1, ref_table, dist_table; +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_1_xxxxx CASCADE + -- show that DROP command is successfull + SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename; + tablename +--------------------------------------------------------------------- +(0 rows) + +ROLLBACK; +-- add another column to dist_table (should be executed remotely) +ALTER TABLE dist_table ADD b int; +CREATE SCHEMA foo_schema; +-- As SELECT will be executed remotely, ALTER TABLE SET SCHEMA command should alse be executed remotely +BEGIN; + SELECT COUNT(*) FROM dist_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + + ALTER TABLE dist_table SET SCHEMA foo_schema; + -- show that ALTER TABLE SET SCHEMA is successfull + SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename; + tablename +--------------------------------------------------------------------- + dist_table +(1 row) + +ROLLBACK; +-- However, below ALTER TABLE SET SCHEMA command will be executed locally +BEGIN; + ALTER TABLE ref_table SET SCHEMA foo_schema; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500035, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.ref_table SET SCHEMA foo_schema;') + -- show that ALTER TABLE SET SCHEMA is successfull + SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename; + tablename +--------------------------------------------------------------------- + ref_table + ref_table_1500035 +(2 rows) + +ROLLBACK; +-- Try a bunch of commands and expect failure at SELECT create_distributed_table +BEGIN; + -- here this SELECT will enforce the whole block for local execution + SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- execute bunch of DDL & DROP commands succesfully + ALTER TABLE dist_table ADD column c int; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;') + ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;') + -- as we create table via remote connections, below SELECT create_distributed_table + -- would error out + CREATE TABLE another_dist_table(a int); + SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +COMMIT; +--------------------------------------------------------------------- +------------ partitioned tables ------------- +--------------------------------------------------------------------- +-- test combination of TRUNCATE & DROP & DDL commands with partitioned tables as well +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); +CREATE TABLE partitioning_test_2012 PARTITION OF partitioning_test FOR VALUES FROM ('2012-06-06') TO ('2012-08-08'); +CREATE TABLE partitioning_test_2013 PARTITION OF partitioning_test FOR VALUES FROM ('2013-06-06') TO ('2013-07-07'); +-- load some data +INSERT INTO partitioning_test VALUES (5, '2012-06-06'); +INSERT INTO partitioning_test VALUES (6, '2012-07-07'); +INSERT INTO partitioning_test VALUES (5, '2013-06-06'); +SELECT create_distributed_table('partitioning_test', 'id', colocate_with:='dist_table'); +NOTICE: Copying data from local table... +NOTICE: Copying data from local table... + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- all commands below should be executed via local execution due to SELECT on ref_table +BEGIN; + SELECT * from ref_table; +NOTICE: executing the command locally: SELECT a FROM local_commands_test_schema.ref_table_1500035 ref_table + a +--------------------------------------------------------------------- +(0 rows) + + INSERT INTO partitioning_test VALUES (7, '2012-07-07'); + SELECT COUNT(*) FROM partitioning_test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500165 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500168 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500171 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500174 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500177 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500180 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500183 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500186 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500189 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500192 partitioning_test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500195 partitioning_test WHERE true + count +--------------------------------------------------------------------- + 4 +(1 row) + + -- execute bunch of DDL & DROP commands succesfully + ALTER TABLE partitioning_test ADD column c int; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500165, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500168, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500171, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500174, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500177, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500180, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500183, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500186, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500189, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500192, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500195, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;') + TRUNCATE partitioning_test; +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE + DROP TABLE partitioning_test; +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500197" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500200" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500203" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500206" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500209" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500212" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500215" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500218" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500221" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500224" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE +NOTICE: table "partitioning_test_2012_1500227" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500229" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500232" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500235" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500238" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500241" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500244" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500247" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500250" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500253" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500256" does not exist, skipping +NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE +NOTICE: table "partitioning_test_2013_1500259" does not exist, skipping +ROLLBACK; +-- below should be executed via remote connections +TRUNCATE partitioning_test; +DROP TABLE partitioning_test; +-- cleanup at exit +DROP SCHEMA local_commands_test_schema CASCADE; +NOTICE: drop cascades to 16 other objects +DROP SCHEMA foo_schema; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 8f2d99482..9cf20707f 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -129,6 +129,7 @@ DEBUG: Plan is router executable NOTICE: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1) -- get it ready for the next executions TRUNCATE ref; +NOTICE: executing the command locally: TRUNCATE TABLE mx_add_coordinator.ref_xxxxx CASCADE -- test that changes from a metadata node is reflected in the coordinator placement \c - - - :worker_1_port SET search_path TO mx_add_coordinator,public; diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index 5cbf44293..d61e046f2 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -77,6 +77,35 @@ EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table (10 rows) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; +SET citus.log_local_commands to on; +-- INSERT .. SELECT via repartitioning is not yet support after a local execution, +-- hence below two blocks should fail +BEGIN; + select count(*) from source_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 4 +(1 row) + + insert into target_table SELECT a*2 FROM source_table; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +ROLLBACK; +BEGIN; + select count(*) from source_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 4 +(1 row) + + insert into target_table SELECT a FROM source_table LIMIT 10; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; SELECT * FROM target_table ORDER BY a; diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index dc255f01c..abb59b761 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -76,6 +76,9 @@ INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000 -- now, show that TRUNCATE CASCADE works expected from the worker TRUNCATE "refer'ence_table" CASCADE; NOTICE: truncate cascades to table "on_update_fkey_table" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" SELECT count(*) FROM on_update_fkey_table; count --------------------------------------------------------------------- @@ -96,6 +99,9 @@ ROLLBACK; BEGIN; TRUNCATE "refer'ence_table" CASCADE; NOTICE: truncate cascades to table "on_update_fkey_table" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" +NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx" ROLLBACK; -- test with sequential mode and CASCADE BEGIN; diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index aae9ad9cc..c0292606c 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -344,6 +344,7 @@ $Q$); -- verify that we can drop columns from reference tables replicated to the coordinator -- see https://github.com/citusdata/citus/issues/3279 ALTER TABLE squares DROP COLUMN b; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000000, 'replicate_ref_to_coordinator', 'ALTER TABLE squares DROP COLUMN b;') -- clean-up SET client_min_messages TO ERROR; DROP SCHEMA replicate_ref_to_coordinator CASCADE; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 76b4a112b..21d1ff915 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -51,7 +51,7 @@ test: multi_row_insert test: insert_select_connection_leak # --------- -# at the end of the regression tests regaring recursively planned modifications +# at the end of the regression tests regarding recursively planned modifications # ensure that we don't leak any intermediate results # This test should not run in parallel with any other tests # --------- @@ -282,6 +282,7 @@ test: multi_reference_table test: foreign_key_to_reference_table test: replicate_reference_tables_to_coordinator test: coordinator_shouldhaveshards +test: local_shard_utility_command_execution test: remove_coordinator diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 78505463c..b5ba49caa 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -234,8 +234,7 @@ COPY second_distributed_table FROM STDIN WITH CSV; -- (a) Unless the first query is a local query, always use distributed execution. -- (b) If the executor has used local execution, it has to use local execution -- for the remaining of the transaction block. If that's not possible, the - -- executor has to error out (e.g., TRUNCATE is a utility command and we - -- currently do not support local execution of utility commands) + -- executor has to error out -- rollback should be able to rollback local execution BEGIN; @@ -347,7 +346,7 @@ BEGIN; SELECT count(*) FROM distributed_table WHERE key = 500; ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query followed by TRUNCATE command can be executed locally BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; TRUNCATE distributed_table CASCADE; diff --git a/src/test/regress/sql/local_shard_utility_command_execution.sql b/src/test/regress/sql/local_shard_utility_command_execution.sql new file mode 100644 index 000000000..00ba3c00d --- /dev/null +++ b/src/test/regress/sql/local_shard_utility_command_execution.sql @@ -0,0 +1,332 @@ +-- This tests file includes tests for local execution of utility commands. +-- For now, this file includes tests only for local execution of +-- `TRUNCATE/DROP/DDL` commands for all kinds of distributed tables from +-- the coordinator node having regular distributed tables' shards +-- (shouldHaveShards = on) and having reference table placements in it. + +\set VERBOSITY terse + +SET citus.next_shard_id TO 1500000; +SET citus.shard_replication_factor TO 1; +SET citus.enable_local_execution TO ON; +SET citus.shard_COUNT TO 32; +SET citus.log_local_commands TO ON; + +CREATE SCHEMA local_commands_test_schema; +SET search_path TO local_commands_test_schema; + +-- let coordinator have distributed table shards/placements +set client_min_messages to ERROR; +SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +RESET client_min_messages; + +SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +----------------------------------------- +------ local execution of TRUNCATE ------ +----------------------------------------- + +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); + +-- insert some data +INSERT INTO ref_table VALUES(1); +INSERT INTO dist_table VALUES(1); + +-- Currently, we support local execution of TRUNCATE commands for all kinds +-- Hence, cascading to distributed tables wouldn't be a problem even in the +-- case that coordinator have some local distributed table shards. +TRUNCATE ref_table CASCADE; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table, dist_table; + +-- insert some data +INSERT INTO ref_table VALUES(1); +INSERT INTO dist_table VALUES(1); + +-- As SELECT accesses local placements of reference table, TRUNCATE would also +-- be forced to local execution even if they operate on different tables. +BEGIN; + SELECT COUNT(*) FROM ref_table; + TRUNCATE dist_table; +COMMIT; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + +-- insert some data +INSERT INTO ref_table VALUES(2); +INSERT INTO dist_table VALUES(2); + +-- However, SELECT would access local placements via remote connections +-- for regular distributed tables, TRUNCATE would also be executed remotely. +BEGIN; + SELECT COUNT(*) FROM dist_table; + TRUNCATE dist_table; +COMMIT; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + +-- insert some data +INSERT INTO ref_table VALUES(3); +INSERT INTO dist_table VALUES(3); + +-- TRUNCATE on dist_table (note that: again no cascade here) would +-- just be handled via remote executions even on its local shards +TRUNCATE dist_table; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM dist_table; + +-- insert some data +INSERT INTO ref_table VALUES(4); + +-- However, creating a dist. table is handled by remote connections. +-- Hence, the commands following it (INSERT & TRUNCATE) would also be +-- handled remotely. +BEGIN; + CREATE TABLE ref_table_1(a int); + SELECT create_reference_table('ref_table_1'); + + -- insert some data + INSERT INTO ref_table_1 VALUES(5); + + TRUNCATE ref_table_1; +COMMIT; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table_1; + +-- However, as SELECT would access local placements via remote parallel +-- connections for regular distributed tables, below TRUNCATE would error +-- out +BEGIN; + SELECT COUNT(*) FROM dist_table; + TRUNCATE ref_table CASCADE; +COMMIT; + +-- as we do not support local ANALYZE execution yet, below block would error out +BEGIN; + TRUNCATE ref_table CASCADE; + ANALYZE ref_table; +COMMIT; + +-- insert some data +INSERT INTO ref_table VALUES(7); +INSERT INTO dist_table VALUES(7); + +-- we can TRUNCATE those two tables within the same command +TRUNCATE ref_table, dist_table; + +-- show that TRUNCATE is successfull +SELECT COUNT(*) FROM ref_table, dist_table; + +------------------------------------- +------ local execution of DROP ------ +------------------------------------- + +-- droping just the referenced table would error out as dist_table references it +DROP TABLE ref_table; + +-- drop those two tables via remote execution +DROP TABLE ref_table, dist_table; + +-- drop the other standalone table locally +DROP TABLE ref_table_1; + +-- show that DROP commands are successfull +SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename; + +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + +-- We execute SELECT command within the below block locally. +-- Hence we should execute the DROP command locally as well. +BEGIN; + SELECT COUNT(*) FROM ref_table; + DROP TABLE ref_table; +COMMIT; + +CREATE TABLE ref_table (a int primary key); +SELECT create_reference_table('ref_table'); + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); + +-- show that DROP command is rollback'd successfully (should print 1) +SELECT 1 FROM pg_tables where tablename='dist_table'; + +-- As SELECT will be executed remotely, the DROP command should also be executed +-- remotely to prevent possible self-deadlocks & inconsistencies. +-- FIXME: we have a known bug for below case described in +-- https://github.com/citusdata/citus/issues/3526. Hence, commented out as it could +-- randomly fall into distributed deadlocks +--BEGIN; +-- SELECT COUNT(*) FROM dist_table; +-- DROP TABLE dist_table; +--END; + +-- As SELECT will be executed remotely, the DROP command below should also be +-- executed remotely. +CREATE TABLE another_dist_table(a int); +SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); + +BEGIN; + SELECT COUNT(*) FROM another_dist_table; + DROP TABLE another_dist_table; +COMMIT; + +-- show that DROP command is committed successfully +SELECT 1 FROM pg_tables where tablename='another_dist_table'; + +-- below DROP will be executed remotely. +DROP TABLE dist_table; + +-- show that DROP command is successfull +SELECT 1 FROM pg_tables where tablename='dist_table'; + +CREATE TABLE dist_table(a int); +SELECT create_distributed_table('dist_table', 'a', colocate_with:='none'); + +ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a); + +-- as SELECT on ref_table will be executed locally, the SELECT and DROP following +-- it would also be executed locally +BEGIN; + SELECT COUNT(*) FROM ref_table; + DROP TABLE dist_table CASCADE; +ROLLBACK; + +-- show that DROP command is rollback'd successfully (should print 1) +SELECT 1 FROM pg_tables where tablename='dist_table'; + +--------------------------------------------- +------ local execution of DDL commands ------ +--------------------------------------------- + +-- try some complicated CASCADE cases along with DDL commands + +CREATE TABLE ref_table_1(a int primary key); +SELECT create_reference_table('ref_table_1'); + +-- below block should execute successfully +BEGIN; + SELECT COUNT(*) FROM ref_table; + + -- as SELECT above runs locally and as now we support local execution of DDL commands, + -- below DDL should be able to define foreign key constraint successfully + ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a); + + -- insert some data + INSERT INTO ref_table_1 VALUES (1); + INSERT INTO ref_table_1 VALUES (2); + INSERT INTO ref_table VALUES (1); + + -- chain foreign key constraints + -- local execution should be observed here as well + ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a); + + INSERT INTO dist_table VALUES (1); + + DELETE FROM ref_table_1 WHERE a=2; + + -- add another column to dist_table + -- note that we execute below DDL locally as well + ALTER TABLE ref_table ADD b int; + + -- define self reference + ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a); + + SELECT COUNT(*) FROM ref_table_1, ref_table, dist_table; + + -- observe DROP on a self-referencing table also works + DROP TABLE ref_table_1, ref_table, dist_table; + + -- show that DROP command is successfull + SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename; +ROLLBACK; + +-- add another column to dist_table (should be executed remotely) +ALTER TABLE dist_table ADD b int; + +CREATE SCHEMA foo_schema; + +-- As SELECT will be executed remotely, ALTER TABLE SET SCHEMA command should alse be executed remotely +BEGIN; + SELECT COUNT(*) FROM dist_table; + + ALTER TABLE dist_table SET SCHEMA foo_schema; + + -- show that ALTER TABLE SET SCHEMA is successfull + SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename; +ROLLBACK; + +-- However, below ALTER TABLE SET SCHEMA command will be executed locally +BEGIN; + ALTER TABLE ref_table SET SCHEMA foo_schema; + + -- show that ALTER TABLE SET SCHEMA is successfull + SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename; +ROLLBACK; + +-- Try a bunch of commands and expect failure at SELECT create_distributed_table +BEGIN; + -- here this SELECT will enforce the whole block for local execution + SELECT COUNT(*) FROM ref_table; + + -- execute bunch of DDL & DROP commands succesfully + ALTER TABLE dist_table ADD column c int; + ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL; + + -- as we create table via remote connections, below SELECT create_distributed_table + -- would error out + CREATE TABLE another_dist_table(a int); + SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); +COMMIT; + +--------------------------------------------- +------------ partitioned tables ------------- +--------------------------------------------- + +-- test combination of TRUNCATE & DROP & DDL commands with partitioned tables as well +CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time); + +CREATE TABLE partitioning_test_2012 PARTITION OF partitioning_test FOR VALUES FROM ('2012-06-06') TO ('2012-08-08'); +CREATE TABLE partitioning_test_2013 PARTITION OF partitioning_test FOR VALUES FROM ('2013-06-06') TO ('2013-07-07'); + +-- load some data +INSERT INTO partitioning_test VALUES (5, '2012-06-06'); +INSERT INTO partitioning_test VALUES (6, '2012-07-07'); +INSERT INTO partitioning_test VALUES (5, '2013-06-06'); + +SELECT create_distributed_table('partitioning_test', 'id', colocate_with:='dist_table'); + +-- all commands below should be executed via local execution due to SELECT on ref_table +BEGIN; + SELECT * from ref_table; + INSERT INTO partitioning_test VALUES (7, '2012-07-07'); + SELECT COUNT(*) FROM partitioning_test; + + -- execute bunch of DDL & DROP commands succesfully + ALTER TABLE partitioning_test ADD column c int; + TRUNCATE partitioning_test; + DROP TABLE partitioning_test; +ROLLBACK; + +-- below should be executed via remote connections +TRUNCATE partitioning_test; +DROP TABLE partitioning_test; + +-- cleanup at exit +DROP SCHEMA local_commands_test_schema CASCADE; +DROP SCHEMA foo_schema; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); diff --git a/src/test/regress/sql/multi_mx_insert_select_repartition.sql b/src/test/regress/sql/multi_mx_insert_select_repartition.sql index 845b8b0c3..e71da4a4f 100644 --- a/src/test/regress/sql/multi_mx_insert_select_repartition.sql +++ b/src/test/regress/sql/multi_mx_insert_select_repartition.sql @@ -41,6 +41,21 @@ SET search_path TO multi_mx_insert_select_repartition; EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; +SET citus.log_local_commands to on; + +-- INSERT .. SELECT via repartitioning is not yet support after a local execution, +-- hence below two blocks should fail + +BEGIN; + select count(*) from source_table WHERE a = 1; + insert into target_table SELECT a*2 FROM source_table; +ROLLBACK; + +BEGIN; + select count(*) from source_table WHERE a = 1; + insert into target_table SELECT a FROM source_table LIMIT 10; +ROLLBACK; + \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; SELECT * FROM target_table ORDER BY a; @@ -48,3 +63,4 @@ SELECT * FROM target_table ORDER BY a; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_insert_select_repartition CASCADE; +