Local execution of ddl/drop/truncate commands (#3514)

* reimplement ExecuteUtilityTaskListWithoutResults for local utility command execution

* introduce new functions for local execution of utility commands

* change ErrorIfTransactionAccessedPlacementsLocally logic for local utility command execution

* enable local execution for TRUNCATE command on distributed & reference tables

* update existing tests for local utility command execution

* enable local execution for DDL commands on distributed & reference tables

* enable local execution for DROP command on distributed & reference tables

* add normalization rules for cascaded commands

* add new tests for local utility command execution
pull/3614/head
Onur Tirtir 2020-03-13 15:39:32 +03:00 committed by GitHub
parent ca8f7119fe
commit a14739f808
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1436 additions and 45 deletions

View File

@ -684,16 +684,27 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsureCoordinator();
if (ddlJob->targetRelationId != InvalidOid)
Oid targetRelationId = ddlJob->targetRelationId;
if (OidIsValid(targetRelationId))
{
/*
* Only for ddlJobs that are targetting a relation (table) we want to sync its
* metadata and verify some properties around the table.
* Only for ddlJobs that are targetting a relation (table) we want to sync
* its metadata and verify some properties around the table.
*/
shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
EnsurePartitionTableNotReplicated(ddlJob->targetRelationId);
shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId);
EnsurePartitionTableNotReplicated(targetRelationId);
}
/*
* If it is a local placement of a distributed table or a reference table,
* then execute the DDL command locally.
* Here we set localExecutionSupported to true regardless of whether the
* DDL command is run for/on a distributed table as
* ExecuteUtilityTaskListWithoutResults would already identify those
* DDL tasks not accessing any of the local placements.
*/
bool localExecutionSupported = true;
if (!ddlJob->concurrentIndexCmd)
{
@ -715,8 +726,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
}
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList);
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList, localExecutionSupported);
}
else
{
@ -727,8 +737,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
PG_TRY();
{
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList);
ExecuteUtilityTaskListWithoutResults(ddlJob->taskList,
localExecutionSupported);
if (shouldSyncMetadata)
{

View File

@ -108,8 +108,10 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex);
List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList);
/* use adaptive executor when enabled */
ExecuteUtilityTaskListWithoutResults(taskList);
/* local execution is not implemented for VACUUM commands */
bool localExecutionSupported = false;
ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported);
executedVacuumCount++;
}
relationIndex++;

View File

@ -815,13 +815,50 @@ AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
/*
* ExecuteUtilityTaskListWithoutResults is a wrapper around executing task
* list for utility commands. It simply calls in adaptive executor's task
* execution function.
* list for utility commands. For remote tasks, it simply calls in adaptive
* executor's task execution function. For local tasks (if any), kicks Process
* Utility via CitusProcessUtility for utility commands. As some local utility
* commands can trigger udf calls, this function also processes those udf calls
* locally.
*/
void
ExecuteUtilityTaskListWithoutResults(List *taskList)
ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported)
{
ExecuteTaskList(ROW_MODIFY_NONE, taskList, MaxAdaptiveExecutorPoolSize);
RowModifyLevel rowModifyLevel = ROW_MODIFY_NONE;
List *localTaskList = NIL;
List *remoteTaskList = NIL;
/*
* Divide tasks into two if localExecutionSupported is set to true and execute
* the local tasks
*/
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
{
/*
* Either we are executing a utility command or a UDF call triggered
* by such a command, it has to be a modifying one
*/
bool readOnlyPlan = false;
/* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList);
/* execute local tasks */
ExecuteLocalUtilityTaskList(localTaskList);
}
else
{
/* all tasks should be executed via remote connections */
remoteTaskList = taskList;
}
/* execute remote tasks if any */
if (list_length(remoteTaskList) > 0)
{
ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize);
}
}
@ -900,10 +937,15 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
ParamListInfo paramListInfo = NULL;
/*
* The code-paths that rely on this function do not know how to execute
* commands locally.
* If current transaction accessed local placements and task list includes
* tasks that should be executed locally (accessing any of the local placements),
* then we should error out as it would cause inconsistencies across the
* remote connection and local execution.
*/
ErrorIfTransactionAccessedPlacementsLocally();
if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList))
{
ErrorIfTransactionAccessedPlacementsLocally();
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{

View File

@ -67,9 +67,6 @@
* use local query execution since local execution is sequential. Basically,
* we do not want to lose parallelism across local tasks by switching to local
* execution.
* - The local execution currently only supports queries. In other words, any
* utility commands like TRUNCATE, fails if the command is executed after a local
* execution inside a transaction block.
* - The local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker executor.
* - Related with the previous item, COPY command cannot be mixed with local
@ -79,6 +76,7 @@
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
@ -90,6 +88,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/transaction_management.h"
#include "distributed/worker_protocol.h"
#include "executor/tstoreReceiver.h"
#include "executor/tuptable.h"
#if PG_VERSION_NUM >= 120000
@ -118,6 +117,8 @@ static void LogLocalCommand(Task *task);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
static void LocallyExecuteUtilityTask(const char *utilityCommand);
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
/*
@ -245,6 +246,98 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
}
/*
* ExecuteLocalUtilityTaskList executes a list of tasks locally. This function
* also logs local execution notice for each task and sets
* TransactionAccessedLocalPlacement to true for next set of possible queries
* & commands within the current transaction block. See the comment in function.
*/
void
ExecuteLocalUtilityTaskList(List *localTaskList)
{
Task *localTask = NULL;
foreach_ptr(localTask, localTaskList)
{
const char *localTaskQueryCommand = TaskQueryString(localTask);
/* we do not expect tasks with INVALID_SHARD_ID for utility commands */
Assert(localTask->anchorShardId != INVALID_SHARD_ID);
Assert(TaskAccessesLocalNode(localTask));
/*
* We should register the access to local placement to force the local
* execution of the following commands withing the current transaction.
*/
TransactionAccessedLocalPlacement = true;
LogLocalCommand(localTask);
LocallyExecuteUtilityTask(localTaskQueryCommand);
}
}
/*
* LocallyExecuteUtilityTask executes the given local task query in the current
* session.
*/
static void
LocallyExecuteUtilityTask(const char *localTaskQueryCommand)
{
RawStmt *localTaskRawStmt = (RawStmt *) ParseTreeRawStmt(localTaskQueryCommand);
Node *localTaskRawParseTree = localTaskRawStmt->stmt;
/*
* Actually, the query passed to this function would mostly be a
* utility command to be executed locally. However, some utility
* commands do trigger udf calls (e.g worker_apply_shard_ddl_command)
* to execute commands in a generic way. But as we support local
* execution of utility commands, we should also process those udf
* calls locally as well. In that case, we simply execute the query
* implying the udf call in below conditional block.
*/
if (IsA(localTaskRawParseTree, SelectStmt))
{
/* we have no external parameters to rewrite the UDF call RawStmt */
Query *localUdfTaskQuery =
RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0);
LocallyExecuteUdfTaskQuery(localUdfTaskQuery);
}
else
{
/*
* It is a regular utility command or SELECT query with non-udf,
* targets, then we should execute it locally via process utility.
*
* If it is a regular utility command, CitusProcessUtility is the
* appropriate function to process that command. However, if it's
* a SELECT query with non-udf targets, CitusProcessUtility would
* error out as we are not expecting such SELECT queries triggered
* by utility commands.
*/
CitusProcessUtility(localTaskRawParseTree, localTaskQueryCommand,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
}
}
/*
* LocallyExecuteUdfTaskQuery executes the given udf command locally. Local udf
* command is simply a "SELECT udf_call()" query and so it cannot be executed
* via process utility.
*/
static void
LocallyExecuteUdfTaskQuery(Query *localUdfTaskQuery)
{
/* we do not expect any results */
ExecuteQueryIntoDestReceiver(localUdfTaskQuery, NULL, None_Receiver);
}
/*
* LogLocalCommand logs commands executed locally on this node. Although we're
* talking about local execution, the function relies on citus.log_remote_commands
@ -437,16 +530,33 @@ ShouldExecuteTasksLocally(List *taskList)
if (TransactionAccessedLocalPlacement)
{
bool isValidLocalExecutionPath PG_USED_FOR_ASSERTS_ONLY = false;
/*
* For various reasons, including the transaction visibility
* rules (e.g., read-your-own-writes), we have to use local
* execution again if it has already happened within this
* transaction block.
*
*/
isValidLocalExecutionPath = IsMultiStatementTransaction() ||
InCoordinatedTransaction();
/*
* In some cases, such as when a single command leads to a local
* command execution followed by remote task (list) execution, we
* still expect the remote execution to first try local execution
* as TransactionAccessedLocalPlacement is set by the local execution.
* The remote execution shouldn't create any local tasks as the local
* execution should have executed all the local tasks. And, we are
* ensuring it here.
*/
isValidLocalExecutionPath |= !AnyTaskAccessesLocalNode(taskList);
/*
* We might error out later in the execution if it is not suitable
* to execute the tasks locally.
*/
Assert(IsMultiStatementTransaction() || InCoordinatedTransaction());
Assert(isValidLocalExecutionPath);
/*
* TODO: A future improvement could be to keep track of which placements
@ -497,6 +607,27 @@ ShouldExecuteTasksLocally(List *taskList)
}
/*
* AnyTaskAccessesLocalNode returns true if a task within the task list accesses
* to the local node.
*/
bool
AnyTaskAccessesLocalNode(List *taskList)
{
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (TaskAccessesLocalNode(task))
{
return true;
}
}
return false;
}
/*
* TaskAccessesLocalNode returns true if any placements of the task reside on
* the node that we're executing the query.

View File

@ -607,6 +607,22 @@ Query *
ParseQueryString(const char *queryString, Oid *paramOids, int numParams)
{
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
/* rewrite the parsed RawStmt to produce a Query */
Query *query = RewriteRawQueryStmt(rawStmt, queryString, paramOids, numParams);
return query;
}
/*
* RewriteRawQueryStmt rewrites the given parsed RawStmt according to the other
* parameters and returns a Query struct.
*/
Query *
RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, int
numParams)
{
List *queryTreeList =
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL);

View File

@ -396,6 +396,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
List *dropTaskList = DropTaskList(relationId, schemaName, relationName,
deletableShardIntervalList);
bool shouldExecuteTasksLocally = ShouldExecuteTasksLocally(dropTaskList);
Task *task = NULL;
foreach_ptr(task, dropTaskList)
@ -423,14 +424,39 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
continue;
}
const char *dropShardPlacementCommand = TaskQueryString(task);
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
relationName,
dropShardPlacementCommand);
if (isLocalShardPlacement)
/*
* If it is a local placement of a distributed table or a reference table,
* then execute the DROP command locally.
*/
if (isLocalShardPlacement && shouldExecuteTasksLocally)
{
TransactionConnectedToLocalGroup = true;
List *singleTaskList = list_make1(task);
ExecuteLocalUtilityTaskList(singleTaskList);
}
else
{
/*
* Either it was not a local placement or we could not use
* local execution even if it was a local placement.
* If it is the second case, then it is possibly because in
* current transaction, some commands or queries connected
* to local group as well.
*
* Regardless of the node is a remote node or the current node,
* try to open a new connection (or use an existing one) to
* connect to that node to drop the shard placement over that
* remote connection.
*/
const char *dropShardPlacementCommand = TaskQueryString(task);
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
relationName,
dropShardPlacementCommand);
if (isLocalShardPlacement)
{
TransactionConnectedToLocalGroup = true;
}
}
DeleteShardPlacementRow(shardPlacementId);

View File

@ -73,7 +73,13 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
{
List *taskList = TruncateTaskList(relationId);
ExecuteUtilityTaskListWithoutResults(taskList);
/*
* If it is a local placement of a distributed table or a reference table,
* then execute TRUNCATE command locally.
*/
bool localExecutionSupported = true;
ExecuteUtilityTaskListWithoutResults(taskList, localExecutionSupported);
}
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -22,6 +22,7 @@ extern bool TransactionConnectedToLocalGroup;
/* extern function declarations */
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
extern void ExecuteLocalUtilityTaskList(List *localTaskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList);
extern bool ShouldExecuteTasksLocally(List *taskList);

View File

@ -84,7 +84,8 @@ extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskL
TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore,
bool hasReturning);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool
localExecutionSupported);
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);
extern bool IsCitusCustomState(PlanState *planState);
@ -94,6 +95,8 @@ extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *worker
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
tupleDescriptor, Tuplestorestate *tupstore);
extern Query * ParseQueryString(const char *queryString, Oid *paramOids, int numParams);
extern Query * RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString,
Oid *paramOids, int numParams);
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
params,
DestReceiver *dest);

View File

@ -60,6 +60,10 @@ s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g
# isolation_ref2ref_foreign_keys
s/"(ref_table_[0-9]_|ref_table_[0-9]_value_fkey_)[0-9]+"/"\1xxxxxxx"/g
# commands cascading to shard relations
s/(NOTICE: .*_)[0-9]{5,}( CASCADE)/\1xxxxx\2/g
s/(NOTICE: [a-z]+ cascades to table ".*)_[0-9]{5,}"/\1_xxxxx"/g
# Line info varies between versions
/^LINE [0-9]+:.*$/d
/^ *\^$/d

View File

@ -107,9 +107,8 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar
(1 row)
ALTER TABLE test DROP COLUMN z;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503000, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503003, 'coordinator_shouldhaveshards', 'ALTER TABLE test DROP COLUMN z;')
ROLLBACK;
BEGIN;
ALTER TABLE test DROP COLUMN z;

View File

@ -362,8 +362,7 @@ COPY second_distributed_table FROM STDIN WITH CSV;
-- (a) Unless the first query is a local query, always use distributed execution.
-- (b) If the executor has used local execution, it has to use local execution
-- for the remaining of the transaction block. If that's not possible, the
-- executor has to error out (e.g., TRUNCATE is a utility command and we
-- currently do not support local execution of utility commands)
-- executor has to error out
-- rollback should be able to rollback local execution
BEGIN;
INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *;
@ -592,7 +591,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
(1 row)
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
-- a local query followed by TRUNCATE command can be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
@ -603,9 +602,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
TRUNCATE distributed_table CASCADE;
NOTICE: truncate cascades to table "second_distributed_table"
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
@ -899,6 +901,17 @@ WHERE
-- get ready for the next commands
TRUNCATE reference_table, distributed_table, second_distributed_table;
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
-- local execution of returning of reference tables
INSERT INTO reference_table VALUES (1),(2),(3),(4),(5),(6) RETURNING *;
NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 AS citus_table_alias (key) VALUES (1), (2), (3), (4), (5), (6) RETURNING citus_table_alias.key
@ -1324,6 +1337,17 @@ ROLLBACK;
TRUNCATE reference_table CASCADE;
NOTICE: truncate cascades to table "distributed_table"
NOTICE: truncate cascades to table "second_distributed_table"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
-- show that both local, and mixed local-distributed executions
@ -1593,6 +1617,17 @@ COMMIT;
TRUNCATE reference_table CASCADE;
NOTICE: truncate cascades to table "distributed_table"
NOTICE: truncate cascades to table "second_distributed_table"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.reference_table_xxxxx CASCADE
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.distributed_table_xxxxx CASCADE
NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
-- load some data on a remote shard
INSERT INTO reference_table (key) VALUES (2);
NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (2)

View File

@ -0,0 +1,731 @@
-- This tests file includes tests for local execution of utility commands.
-- For now, this file includes tests only for local execution of
-- `TRUNCATE/DROP/DDL` commands for all kinds of distributed tables from
-- the coordinator node having regular distributed tables' shards
-- (shouldHaveShards = on) and having reference table placements in it.
\set VERBOSITY terse
SET citus.next_shard_id TO 1500000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
SET citus.shard_COUNT TO 32;
SET citus.log_local_commands TO ON;
CREATE SCHEMA local_commands_test_schema;
SET search_path TO local_commands_test_schema;
-- let coordinator have distributed table shards/placements
set client_min_messages to ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
master_set_node_property
---------------------------------------------------------------------
(1 row)
---------------------------------------------------------------------
------ local execution of TRUNCATE ------
---------------------------------------------------------------------
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- insert some data
INSERT INTO ref_table VALUES(1);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (1)
INSERT INTO dist_table VALUES(1);
-- Currently, we support local execution of TRUNCATE commands for all kinds
-- Hence, cascading to distributed tables wouldn't be a problem even in the
-- case that coordinator have some local distributed table shards.
TRUNCATE ref_table CASCADE;
NOTICE: truncate cascades to table "dist_table"
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table, dist_table;
count
---------------------------------------------------------------------
0
(1 row)
-- insert some data
INSERT INTO ref_table VALUES(1);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (1)
INSERT INTO dist_table VALUES(1);
-- As SELECT accesses local placements of reference table, TRUNCATE would also
-- be forced to local execution even if they operate on different tables.
BEGIN;
SELECT COUNT(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500000 ref_table
count
---------------------------------------------------------------------
1
(1 row)
TRUNCATE dist_table;
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
0
(1 row)
-- insert some data
INSERT INTO ref_table VALUES(2);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (2)
INSERT INTO dist_table VALUES(2);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.dist_table_1500025 (a) VALUES (2)
-- However, SELECT would access local placements via remote connections
-- for regular distributed tables, TRUNCATE would also be executed remotely.
BEGIN;
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
1
(1 row)
TRUNCATE dist_table;
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
0
(1 row)
-- insert some data
INSERT INTO ref_table VALUES(3);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (3)
INSERT INTO dist_table VALUES(3);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.dist_table_1500016 (a) VALUES (3)
-- TRUNCATE on dist_table (note that: again no cascade here) would
-- just be handled via remote executions even on its local shards
TRUNCATE dist_table;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
0
(1 row)
-- insert some data
INSERT INTO ref_table VALUES(4);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (4)
-- However, creating a dist. table is handled by remote connections.
-- Hence, the commands following it (INSERT & TRUNCATE) would also be
-- handled remotely.
BEGIN;
CREATE TABLE ref_table_1(a int);
SELECT create_reference_table('ref_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- insert some data
INSERT INTO ref_table_1 VALUES(5);
TRUNCATE ref_table_1;
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table_1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1_1500033 ref_table_1
count
---------------------------------------------------------------------
0
(1 row)
-- However, as SELECT would access local placements via remote parallel
-- connections for regular distributed tables, below TRUNCATE would error
-- out
BEGIN;
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
0
(1 row)
TRUNCATE ref_table CASCADE;
NOTICE: truncate cascades to table "dist_table"
ERROR: cannot execute DDL on reference table "ref_table" because there was a parallel SELECT access to distributed table "dist_table" in the same transaction
COMMIT;
-- as we do not support local ANALYZE execution yet, below block would error out
BEGIN;
TRUNCATE ref_table CASCADE;
NOTICE: truncate cascades to table "dist_table"
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
ANALYZE ref_table;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
COMMIT;
-- insert some data
INSERT INTO ref_table VALUES(7);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500000 (a) VALUES (7)
INSERT INTO dist_table VALUES(7);
-- we can TRUNCATE those two tables within the same command
TRUNCATE ref_table, dist_table;
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.ref_table_xxxxx CASCADE
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.dist_table_xxxxx CASCADE
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table, dist_table;
count
---------------------------------------------------------------------
0
(1 row)
---------------------------------------------------------------------
------ local execution of DROP ------
---------------------------------------------------------------------
-- droping just the referenced table would error out as dist_table references it
DROP TABLE ref_table;
ERROR: cannot drop table ref_table because other objects depend on it
-- drop those two tables via remote execution
DROP TABLE ref_table, dist_table;
-- drop the other standalone table locally
DROP TABLE ref_table_1;
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_1_xxxxx CASCADE
-- show that DROP commands are successfull
SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename;
tablename
---------------------------------------------------------------------
(0 rows)
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- We execute SELECT command within the below block locally.
-- Hence we should execute the DROP command locally as well.
BEGIN;
SELECT COUNT(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500034 ref_table
count
---------------------------------------------------------------------
0
(1 row)
DROP TABLE ref_table;
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_xxxxx CASCADE
COMMIT;
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- show that DROP command is rollback'd successfully (should print 1)
SELECT 1 FROM pg_tables where tablename='dist_table';
?column?
---------------------------------------------------------------------
1
(1 row)
-- As SELECT will be executed remotely, the DROP command should also be executed
-- remotely to prevent possible self-deadlocks & inconsistencies.
-- FIXME: we have a known bug for below case described in
-- https://github.com/citusdata/citus/issues/3526. Hence, commented out as it could
-- randomly fall into distributed deadlocks
--BEGIN;
-- SELECT COUNT(*) FROM dist_table;
-- DROP TABLE dist_table;
--END;
-- As SELECT will be executed remotely, the DROP command below should also be
-- executed remotely.
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
create_distributed_table
---------------------------------------------------------------------
(1 row)
BEGIN;
SELECT COUNT(*) FROM another_dist_table;
count
---------------------------------------------------------------------
0
(1 row)
DROP TABLE another_dist_table;
COMMIT;
-- show that DROP command is committed successfully
SELECT 1 FROM pg_tables where tablename='another_dist_table';
?column?
---------------------------------------------------------------------
(0 rows)
-- below DROP will be executed remotely.
DROP TABLE dist_table;
-- show that DROP command is successfull
SELECT 1 FROM pg_tables where tablename='dist_table';
?column?
---------------------------------------------------------------------
(0 rows)
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- as SELECT on ref_table will be executed locally, the SELECT and DROP following
-- it would also be executed locally
BEGIN;
SELECT COUNT(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table
count
---------------------------------------------------------------------
0
(1 row)
DROP TABLE dist_table CASCADE;
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
ROLLBACK;
-- show that DROP command is rollback'd successfully (should print 1)
SELECT 1 FROM pg_tables where tablename='dist_table';
?column?
---------------------------------------------------------------------
1
(1 row)
---------------------------------------------------------------------
------ local execution of DDL commands ------
---------------------------------------------------------------------
-- try some complicated CASCADE cases along with DDL commands
CREATE TABLE ref_table_1(a int primary key);
SELECT create_reference_table('ref_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- below block should execute successfully
BEGIN;
SELECT COUNT(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table
count
---------------------------------------------------------------------
0
(1 row)
-- as SELECT above runs locally and as now we support local execution of DDL commands,
-- below DDL should be able to define foreign key constraint successfully
ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a);
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500035, 'local_commands_test_schema', 1500132, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a);')
-- insert some data
INSERT INTO ref_table_1 VALUES (1);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1_1500132 (a) VALUES (1)
INSERT INTO ref_table_1 VALUES (2);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1_1500132 (a) VALUES (2)
INSERT INTO ref_table VALUES (1);
NOTICE: executing the command locally: INSERT INTO local_commands_test_schema.ref_table_1500035 (a) VALUES (1)
-- chain foreign key constraints
-- local execution should be observed here as well
ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500100, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500103, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500106, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500109, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500112, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500115, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500118, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500121, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500124, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500127, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500130, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);')
INSERT INTO dist_table VALUES (1);
DELETE FROM ref_table_1 WHERE a=2;
NOTICE: executing the command locally: DELETE FROM local_commands_test_schema.ref_table_1_1500132 ref_table_1 WHERE (a OPERATOR(pg_catalog.=) 2)
-- add another column to dist_table
-- note that we execute below DDL locally as well
ALTER TABLE ref_table ADD b int;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500035, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD b int;')
-- define self reference
ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a);
NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1500035, 'local_commands_test_schema', 1500035, 'local_commands_test_schema', 'ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a);')
SELECT COUNT(*) FROM ref_table_1, ref_table, dist_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500100 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500103 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500106 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500109 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500112 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500115 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500118 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500121 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500124 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500127 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((local_commands_test_schema.dist_table_1500130 dist_table JOIN local_commands_test_schema.ref_table_1_1500132 ref_table_1 ON (true)) JOIN local_commands_test_schema.ref_table_1500035 ref_table ON (true)) WHERE true
count
---------------------------------------------------------------------
1
(1 row)
-- observe DROP on a self-referencing table also works
DROP TABLE ref_table_1, ref_table, dist_table;
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.dist_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.ref_table_1_xxxxx CASCADE
-- show that DROP command is successfull
SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename;
tablename
---------------------------------------------------------------------
(0 rows)
ROLLBACK;
-- add another column to dist_table (should be executed remotely)
ALTER TABLE dist_table ADD b int;
CREATE SCHEMA foo_schema;
-- As SELECT will be executed remotely, ALTER TABLE SET SCHEMA command should alse be executed remotely
BEGIN;
SELECT COUNT(*) FROM dist_table;
count
---------------------------------------------------------------------
0
(1 row)
ALTER TABLE dist_table SET SCHEMA foo_schema;
-- show that ALTER TABLE SET SCHEMA is successfull
SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename;
tablename
---------------------------------------------------------------------
dist_table
(1 row)
ROLLBACK;
-- However, below ALTER TABLE SET SCHEMA command will be executed locally
BEGIN;
ALTER TABLE ref_table SET SCHEMA foo_schema;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500035, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.ref_table SET SCHEMA foo_schema;')
-- show that ALTER TABLE SET SCHEMA is successfull
SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename;
tablename
---------------------------------------------------------------------
ref_table
ref_table_1500035
(2 rows)
ROLLBACK;
-- Try a bunch of commands and expect failure at SELECT create_distributed_table
BEGIN;
-- here this SELECT will enforce the whole block for local execution
SELECT COUNT(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table
count
---------------------------------------------------------------------
0
(1 row)
-- execute bunch of DDL & DROP commands succesfully
ALTER TABLE dist_table ADD column c int;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ADD column c int;')
ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
-- as we create table via remote connections, below SELECT create_distributed_table
-- would error out
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
COMMIT;
---------------------------------------------------------------------
------------ partitioned tables -------------
---------------------------------------------------------------------
-- test combination of TRUNCATE & DROP & DDL commands with partitioned tables as well
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
CREATE TABLE partitioning_test_2012 PARTITION OF partitioning_test FOR VALUES FROM ('2012-06-06') TO ('2012-08-08');
CREATE TABLE partitioning_test_2013 PARTITION OF partitioning_test FOR VALUES FROM ('2013-06-06') TO ('2013-07-07');
-- load some data
INSERT INTO partitioning_test VALUES (5, '2012-06-06');
INSERT INTO partitioning_test VALUES (6, '2012-07-07');
INSERT INTO partitioning_test VALUES (5, '2013-06-06');
SELECT create_distributed_table('partitioning_test', 'id', colocate_with:='dist_table');
NOTICE: Copying data from local table...
NOTICE: Copying data from local table...
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- all commands below should be executed via local execution due to SELECT on ref_table
BEGIN;
SELECT * from ref_table;
NOTICE: executing the command locally: SELECT a FROM local_commands_test_schema.ref_table_1500035 ref_table
a
---------------------------------------------------------------------
(0 rows)
INSERT INTO partitioning_test VALUES (7, '2012-07-07');
SELECT COUNT(*) FROM partitioning_test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500165 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500168 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500171 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500174 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500177 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500180 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500183 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500186 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500189 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500192 partitioning_test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.partitioning_test_1500195 partitioning_test WHERE true
count
---------------------------------------------------------------------
4
(1 row)
-- execute bunch of DDL & DROP commands succesfully
ALTER TABLE partitioning_test ADD column c int;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500165, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500168, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500171, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500174, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500177, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500180, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500183, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500186, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500189, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500192, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500195, 'local_commands_test_schema', 'ALTER TABLE partitioning_test ADD column c int;')
TRUNCATE partitioning_test;
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
DROP TABLE partitioning_test;
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500197" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500200" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500203" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500206" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500209" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500212" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500215" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500218" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500221" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500224" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2012_xxxxx CASCADE
NOTICE: table "partitioning_test_2012_1500227" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500229" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500232" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500235" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500238" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500241" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500244" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500247" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500250" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500253" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500256" does not exist, skipping
NOTICE: executing the command locally: DROP TABLE IF EXISTS local_commands_test_schema.partitioning_test_2013_xxxxx CASCADE
NOTICE: table "partitioning_test_2013_1500259" does not exist, skipping
ROLLBACK;
-- below should be executed via remote connections
TRUNCATE partitioning_test;
DROP TABLE partitioning_test;
-- cleanup at exit
DROP SCHEMA local_commands_test_schema CASCADE;
NOTICE: drop cascades to 16 other objects
DROP SCHEMA foo_schema;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?
---------------------------------------------------------------------
1
(1 row)

View File

@ -129,6 +129,7 @@ DEBUG: Plan is router executable
NOTICE: executing the command locally: INSERT INTO mx_add_coordinator.ref_7000000 (a) VALUES (1)
-- get it ready for the next executions
TRUNCATE ref;
NOTICE: executing the command locally: TRUNCATE TABLE mx_add_coordinator.ref_xxxxx CASCADE
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;

View File

@ -77,6 +77,35 @@ EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table
(10 rows)
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
SET citus.log_local_commands to on;
-- INSERT .. SELECT via repartitioning is not yet support after a local execution,
-- hence below two blocks should fail
BEGIN;
select count(*) from source_table WHERE a = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
4
(1 row)
insert into target_table SELECT a*2 FROM source_table;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
ROLLBACK;
BEGIN;
select count(*) from source_table WHERE a = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
4
(1 row)
insert into target_table SELECT a FROM source_table LIMIT 10;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
ROLLBACK;
\c - - - :master_port
SET search_path TO multi_mx_insert_select_repartition;
SELECT * FROM target_table ORDER BY a;

View File

@ -76,6 +76,9 @@ INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
SELECT count(*) FROM on_update_fkey_table;
count
---------------------------------------------------------------------
@ -96,6 +99,9 @@ ROLLBACK;
BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;

View File

@ -344,6 +344,7 @@ $Q$);
-- verify that we can drop columns from reference tables replicated to the coordinator
-- see https://github.com/citusdata/citus/issues/3279
ALTER TABLE squares DROP COLUMN b;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (8000000, 'replicate_ref_to_coordinator', 'ALTER TABLE squares DROP COLUMN b;')
-- clean-up
SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE;

View File

@ -51,7 +51,7 @@ test: multi_row_insert
test: insert_select_connection_leak
# ---------
# at the end of the regression tests regaring recursively planned modifications
# at the end of the regression tests regarding recursively planned modifications
# ensure that we don't leak any intermediate results
# This test should not run in parallel with any other tests
# ---------
@ -282,6 +282,7 @@ test: multi_reference_table
test: foreign_key_to_reference_table
test: replicate_reference_tables_to_coordinator
test: coordinator_shouldhaveshards
test: local_shard_utility_command_execution
test: remove_coordinator

View File

@ -234,8 +234,7 @@ COPY second_distributed_table FROM STDIN WITH CSV;
-- (a) Unless the first query is a local query, always use distributed execution.
-- (b) If the executor has used local execution, it has to use local execution
-- for the remaining of the transaction block. If that's not possible, the
-- executor has to error out (e.g., TRUNCATE is a utility command and we
-- currently do not support local execution of utility commands)
-- executor has to error out
-- rollback should be able to rollback local execution
BEGIN;
@ -347,7 +346,7 @@ BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 500;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
-- a local query followed by TRUNCATE command can be executed locally
BEGIN;
SELECT count(*) FROM distributed_table WHERE key = 1;
TRUNCATE distributed_table CASCADE;

View File

@ -0,0 +1,332 @@
-- This tests file includes tests for local execution of utility commands.
-- For now, this file includes tests only for local execution of
-- `TRUNCATE/DROP/DDL` commands for all kinds of distributed tables from
-- the coordinator node having regular distributed tables' shards
-- (shouldHaveShards = on) and having reference table placements in it.
\set VERBOSITY terse
SET citus.next_shard_id TO 1500000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
SET citus.shard_COUNT TO 32;
SET citus.log_local_commands TO ON;
CREATE SCHEMA local_commands_test_schema;
SET search_path TO local_commands_test_schema;
-- let coordinator have distributed table shards/placements
set client_min_messages to ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
RESET client_min_messages;
SELECT master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
-----------------------------------------
------ local execution of TRUNCATE ------
-----------------------------------------
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- insert some data
INSERT INTO ref_table VALUES(1);
INSERT INTO dist_table VALUES(1);
-- Currently, we support local execution of TRUNCATE commands for all kinds
-- Hence, cascading to distributed tables wouldn't be a problem even in the
-- case that coordinator have some local distributed table shards.
TRUNCATE ref_table CASCADE;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table, dist_table;
-- insert some data
INSERT INTO ref_table VALUES(1);
INSERT INTO dist_table VALUES(1);
-- As SELECT accesses local placements of reference table, TRUNCATE would also
-- be forced to local execution even if they operate on different tables.
BEGIN;
SELECT COUNT(*) FROM ref_table;
TRUNCATE dist_table;
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
-- insert some data
INSERT INTO ref_table VALUES(2);
INSERT INTO dist_table VALUES(2);
-- However, SELECT would access local placements via remote connections
-- for regular distributed tables, TRUNCATE would also be executed remotely.
BEGIN;
SELECT COUNT(*) FROM dist_table;
TRUNCATE dist_table;
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
-- insert some data
INSERT INTO ref_table VALUES(3);
INSERT INTO dist_table VALUES(3);
-- TRUNCATE on dist_table (note that: again no cascade here) would
-- just be handled via remote executions even on its local shards
TRUNCATE dist_table;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM dist_table;
-- insert some data
INSERT INTO ref_table VALUES(4);
-- However, creating a dist. table is handled by remote connections.
-- Hence, the commands following it (INSERT & TRUNCATE) would also be
-- handled remotely.
BEGIN;
CREATE TABLE ref_table_1(a int);
SELECT create_reference_table('ref_table_1');
-- insert some data
INSERT INTO ref_table_1 VALUES(5);
TRUNCATE ref_table_1;
COMMIT;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table_1;
-- However, as SELECT would access local placements via remote parallel
-- connections for regular distributed tables, below TRUNCATE would error
-- out
BEGIN;
SELECT COUNT(*) FROM dist_table;
TRUNCATE ref_table CASCADE;
COMMIT;
-- as we do not support local ANALYZE execution yet, below block would error out
BEGIN;
TRUNCATE ref_table CASCADE;
ANALYZE ref_table;
COMMIT;
-- insert some data
INSERT INTO ref_table VALUES(7);
INSERT INTO dist_table VALUES(7);
-- we can TRUNCATE those two tables within the same command
TRUNCATE ref_table, dist_table;
-- show that TRUNCATE is successfull
SELECT COUNT(*) FROM ref_table, dist_table;
-------------------------------------
------ local execution of DROP ------
-------------------------------------
-- droping just the referenced table would error out as dist_table references it
DROP TABLE ref_table;
-- drop those two tables via remote execution
DROP TABLE ref_table, dist_table;
-- drop the other standalone table locally
DROP TABLE ref_table_1;
-- show that DROP commands are successfull
SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename;
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
-- We execute SELECT command within the below block locally.
-- Hence we should execute the DROP command locally as well.
BEGIN;
SELECT COUNT(*) FROM ref_table;
DROP TABLE ref_table;
COMMIT;
CREATE TABLE ref_table (a int primary key);
SELECT create_reference_table('ref_table');
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- show that DROP command is rollback'd successfully (should print 1)
SELECT 1 FROM pg_tables where tablename='dist_table';
-- As SELECT will be executed remotely, the DROP command should also be executed
-- remotely to prevent possible self-deadlocks & inconsistencies.
-- FIXME: we have a known bug for below case described in
-- https://github.com/citusdata/citus/issues/3526. Hence, commented out as it could
-- randomly fall into distributed deadlocks
--BEGIN;
-- SELECT COUNT(*) FROM dist_table;
-- DROP TABLE dist_table;
--END;
-- As SELECT will be executed remotely, the DROP command below should also be
-- executed remotely.
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
BEGIN;
SELECT COUNT(*) FROM another_dist_table;
DROP TABLE another_dist_table;
COMMIT;
-- show that DROP command is committed successfully
SELECT 1 FROM pg_tables where tablename='another_dist_table';
-- below DROP will be executed remotely.
DROP TABLE dist_table;
-- show that DROP command is successfull
SELECT 1 FROM pg_tables where tablename='dist_table';
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a', colocate_with:='none');
ALTER TABLE dist_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table(a);
-- as SELECT on ref_table will be executed locally, the SELECT and DROP following
-- it would also be executed locally
BEGIN;
SELECT COUNT(*) FROM ref_table;
DROP TABLE dist_table CASCADE;
ROLLBACK;
-- show that DROP command is rollback'd successfully (should print 1)
SELECT 1 FROM pg_tables where tablename='dist_table';
---------------------------------------------
------ local execution of DDL commands ------
---------------------------------------------
-- try some complicated CASCADE cases along with DDL commands
CREATE TABLE ref_table_1(a int primary key);
SELECT create_reference_table('ref_table_1');
-- below block should execute successfully
BEGIN;
SELECT COUNT(*) FROM ref_table;
-- as SELECT above runs locally and as now we support local execution of DDL commands,
-- below DDL should be able to define foreign key constraint successfully
ALTER TABLE ref_table ADD CONSTRAINT fkey FOREIGN KEY(a) REFERENCES ref_table_1(a);
-- insert some data
INSERT INTO ref_table_1 VALUES (1);
INSERT INTO ref_table_1 VALUES (2);
INSERT INTO ref_table VALUES (1);
-- chain foreign key constraints
-- local execution should be observed here as well
ALTER TABLE dist_table ADD CONSTRAINT fkey1 FOREIGN KEY(a) REFERENCES ref_table(a);
INSERT INTO dist_table VALUES (1);
DELETE FROM ref_table_1 WHERE a=2;
-- add another column to dist_table
-- note that we execute below DDL locally as well
ALTER TABLE ref_table ADD b int;
-- define self reference
ALTER TABLE ref_table ADD CONSTRAINT fkey2 FOREIGN KEY(b) REFERENCES ref_table(a);
SELECT COUNT(*) FROM ref_table_1, ref_table, dist_table;
-- observe DROP on a self-referencing table also works
DROP TABLE ref_table_1, ref_table, dist_table;
-- show that DROP command is successfull
SELECT tablename FROM pg_tables where schemaname='local_commands_test_schema' ORDER BY tablename;
ROLLBACK;
-- add another column to dist_table (should be executed remotely)
ALTER TABLE dist_table ADD b int;
CREATE SCHEMA foo_schema;
-- As SELECT will be executed remotely, ALTER TABLE SET SCHEMA command should alse be executed remotely
BEGIN;
SELECT COUNT(*) FROM dist_table;
ALTER TABLE dist_table SET SCHEMA foo_schema;
-- show that ALTER TABLE SET SCHEMA is successfull
SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename;
ROLLBACK;
-- However, below ALTER TABLE SET SCHEMA command will be executed locally
BEGIN;
ALTER TABLE ref_table SET SCHEMA foo_schema;
-- show that ALTER TABLE SET SCHEMA is successfull
SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename;
ROLLBACK;
-- Try a bunch of commands and expect failure at SELECT create_distributed_table
BEGIN;
-- here this SELECT will enforce the whole block for local execution
SELECT COUNT(*) FROM ref_table;
-- execute bunch of DDL & DROP commands succesfully
ALTER TABLE dist_table ADD column c int;
ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;
-- as we create table via remote connections, below SELECT create_distributed_table
-- would error out
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
COMMIT;
---------------------------------------------
------------ partitioned tables -------------
---------------------------------------------
-- test combination of TRUNCATE & DROP & DDL commands with partitioned tables as well
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
CREATE TABLE partitioning_test_2012 PARTITION OF partitioning_test FOR VALUES FROM ('2012-06-06') TO ('2012-08-08');
CREATE TABLE partitioning_test_2013 PARTITION OF partitioning_test FOR VALUES FROM ('2013-06-06') TO ('2013-07-07');
-- load some data
INSERT INTO partitioning_test VALUES (5, '2012-06-06');
INSERT INTO partitioning_test VALUES (6, '2012-07-07');
INSERT INTO partitioning_test VALUES (5, '2013-06-06');
SELECT create_distributed_table('partitioning_test', 'id', colocate_with:='dist_table');
-- all commands below should be executed via local execution due to SELECT on ref_table
BEGIN;
SELECT * from ref_table;
INSERT INTO partitioning_test VALUES (7, '2012-07-07');
SELECT COUNT(*) FROM partitioning_test;
-- execute bunch of DDL & DROP commands succesfully
ALTER TABLE partitioning_test ADD column c int;
TRUNCATE partitioning_test;
DROP TABLE partitioning_test;
ROLLBACK;
-- below should be executed via remote connections
TRUNCATE partitioning_test;
DROP TABLE partitioning_test;
-- cleanup at exit
DROP SCHEMA local_commands_test_schema CASCADE;
DROP SCHEMA foo_schema;
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);

View File

@ -41,6 +41,21 @@ SET search_path TO multi_mx_insert_select_repartition;
EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
SET citus.log_local_commands to on;
-- INSERT .. SELECT via repartitioning is not yet support after a local execution,
-- hence below two blocks should fail
BEGIN;
select count(*) from source_table WHERE a = 1;
insert into target_table SELECT a*2 FROM source_table;
ROLLBACK;
BEGIN;
select count(*) from source_table WHERE a = 1;
insert into target_table SELECT a FROM source_table LIMIT 10;
ROLLBACK;
\c - - - :master_port
SET search_path TO multi_mx_insert_select_repartition;
SELECT * FROM target_table ORDER BY a;
@ -48,3 +63,4 @@ SELECT * FROM target_table ORDER BY a;
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA multi_mx_insert_select_repartition CASCADE;