diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index b0c980dbe..e569232aa 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -101,17 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); -static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand, bool isTopLevel); -static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand, bool isTopLevel); -static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand, bool isTopLevel); +static List * PlanIndexStmt(IndexStmt *createIndexStatement, + const char *createIndexCommand); +static List * PlanDropIndexStmt(DropStmt *dropIndexStatement, + const char *dropIndexCommand); +static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement, + const char *alterTableCommand); static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand); -static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, - bool isTopLevel); +static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, + const char *alterObjectSchemaCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); @@ -132,11 +131,7 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel); -static void ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, - bool isTopLevel); +static void ExecuteDistributedDDLJob(DDLJob *ddlJob); static void ShowNoticeIfNotUsing2PC(void); static List * DDLTaskList(Oid relationId, const char *commandString); static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, @@ -151,16 +146,13 @@ static bool warnedUserAbout2PC = false; /* - * Utility for handling citus specific concerns around utility statements. - * - * There's two basic types of concerns here: - * 1) Intercept utility statements that run after distributed query - * execution. At this stage, the Create Table command for the master node's - * temporary table has been executed, and this table's relationId is - * visible to us. We can therefore update the relationId in master node's - * select query. - * 2) Handle utility statements on distributed tables that the core code can't - * handle. + * multi_ProcessUtility is the main entry hook for implementing Citus-specific + * utility behavior. Its primary responsibilities are intercepting COPY and DDL + * commands and augmenting the coordinator's command with corresponding tasks + * to be run on worker nodes, after suitably ensuring said commands' options + * are fully supported by Citus. Much of the DDL behavior is toggled by Citus' + * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as + * TRUNCATE and VACUUM are also supported. */ void multi_ProcessUtility(Node *parsetree, @@ -173,6 +165,7 @@ multi_ProcessUtility(Node *parsetree, bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; + List *ddlJobs = NIL; if (IsA(parsetree, TransactionStmt)) { @@ -236,6 +229,8 @@ multi_ProcessUtility(Node *parsetree, } } + /* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */ + if (IsA(parsetree, CreateSeqStmt)) { ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree); @@ -251,18 +246,12 @@ multi_ProcessUtility(Node *parsetree, ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); } - /* - * DDL commands are propagated to workers only if EnableDDLPropagation is - * set to true and the current node is the coordinator - */ + /* only generate worker DDLJobs if propagation is enabled */ if (EnableDDLPropagation) { - bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); - if (IsA(parsetree, IndexStmt)) { - parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, - isTopLevel); + ddlJobs = PlanIndexStmt((IndexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) @@ -270,7 +259,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); + ddlJobs = PlanDropIndexStmt(dropStatement, queryString); } } @@ -279,8 +268,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, - isTopLevel); + ddlJobs = PlanAlterTableStmt(alterTableStmt, queryString); } } @@ -304,8 +292,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString, - isTopLevel); + ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString); } /* @@ -344,7 +331,8 @@ multi_ProcessUtility(Node *parsetree, * Therefore, we check whether the given ALTER TABLE statement is a * FOREIGN KEY constraint and if so disable the validation step. * Note that validation is done on the shard level when DDL - * propagation is enabled. + * propagation is enabled. Unlike the preceeding Plan* calls, the + * following eagerly executes some tasks on workers. */ parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); } @@ -369,13 +357,13 @@ multi_ProcessUtility(Node *parsetree, " necessary users and roles."))); } + /* set user if needed and go ahead and run local utility using standard hook */ if (commandMustRunAsOwner) { GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); } - /* now drop into standard process utility */ standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); @@ -384,7 +372,20 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } - /* we run VacuumStmt after standard hook to benefit from its checks and locking */ + /* after local command has completed, finish by executing worker DDLJobs, if any */ + if (ddlJobs != NIL) + { + ListCell *ddlJobCell = NULL; + + foreach(ddlJobCell, ddlJobs) + { + DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell); + + ExecuteDistributedDDLJob(ddlJob); + } + } + + /* TODO: fold VACUUM's processing into the above block */ if (IsA(parsetree, VacuumStmt)) { VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree; @@ -608,17 +609,18 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR /* - * ProcessIndexStmt processes create index statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the IndexStmt node for the command to be executed on the - * master node table. + * PlanIndexStmt determines whether a given CREATE INDEX statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ -static Node * -ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, - bool isTopLevel) +static List * +PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { + List *ddlJobs = NIL; + /* * We first check whether a distributed relation is affected. For that, we need to * open the relation. To prevent race conditions with later lookups, lock the table, @@ -673,33 +675,32 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand /* if index does not exist, send the command to workers */ if (!OidIsValid(indexRelationId)) { - ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); - } - else if (!createIndexStatement->if_not_exists) - { - /* if the index exists and there is no IF NOT EXISTS clause, error */ - ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"%s\" already exists", indexName))); + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = relationId; + ddlJob->commandString = createIndexCommand; + ddlJob->taskList = DDLTaskList(relationId, createIndexCommand); + + ddlJobs = list_make1(ddlJob); } } } - return (Node *) createIndexStatement; + return ddlJobs; } /* - * ProcessDropIndexStmt processes drop index statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the DropStmt node for the command to be executed on the - * master node table. + * PlanDropIndexStmt determines whether a given DROP INDEX statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ -static Node * -ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, - bool isTopLevel) +static List * +PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) { + List *ddlJobs = NIL; ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; Oid distributedRelationId = InvalidOid; @@ -764,28 +765,34 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, if (OidIsValid(distributedIndexId)) { + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); - /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); + ddlJob->targetRelationId = distributedRelationId; + ddlJob->commandString = dropIndexCommand; + ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); + + ddlJobs = list_make1(ddlJob); } - return (Node *) dropIndexStatement; + return ddlJobs; } /* - * ProcessAlterTableStmt processes alter table statements for distributed tables. - * The function first checks if the statement belongs to a distributed table - * or not. If it does, then it executes distributed logic for the command. - * - * The function returns the AlterTableStmt node for the command to be executed on the - * master node table. + * PlanAlterTableStmt determines whether a given ALTER TABLE statement involves + * a distributed table. If so (and if the statement does not use unsupported + * options), it modifies the input statement to ensure proper execution against + * the master node table and creates a DDLJob to encapsulate information needed + * during the worker node portion of DDL execution before returning that DDLJob + * in a List. If no distributed table is involved, this function returns NIL. */ -static Node * -ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, - bool isTopLevel) +static List * +PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) { + List *ddlJobs = NIL; + DDLJob *ddlJob = NULL; LOCKMODE lockmode = 0; Oid leftRelationId = InvalidOid; Oid rightRelationId = InvalidOid; @@ -796,20 +803,20 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl /* first check whether a distributed relation is affected */ if (alterTableStatement->relation == NULL) { - return (Node *) alterTableStatement; + return NIL; } lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); if (!OidIsValid(leftRelationId)) { - return (Node *) alterTableStatement; + return NIL; } isDistributedRelation = IsDistributedTable(leftRelationId); if (!isDistributedRelation) { - return (Node *) alterTableStatement; + return NIL; } ErrorIfUnsupportedAlterTableStmt(alterTableStatement); @@ -854,17 +861,25 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl } } + ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = leftRelationId; + ddlJob->commandString = alterTableCommand; + if (rightRelationId) { - ExecuteDistributedForeignKeyCommand(leftRelationId, rightRelationId, - alterTableCommand, isTopLevel); + /* if foreign key related, use specialized task list function ... */ + ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, + alterTableCommand); } else { - ExecuteDistributedDDLCommand(leftRelationId, alterTableCommand, isTopLevel); + /* ... otherwise use standard DDL task list function */ + ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); } - return (Node *) alterTableStatement; + ddlJobs = list_make1(ddlJob); + + return ddlJobs; } @@ -931,21 +946,21 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, /* - * ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed - * objects. The function first checks if the statement belongs to a distributed objects - * or not. If it does, then it checks whether given object is a table. If it is, we warn - * out, since we do not support ALTER ... SET SCHEMA + * PlanAlterObjectSchemaStmt determines whether a given ALTER ... SET SCHEMA + * statement involves a distributed table and issues a warning if so. Because + * we do not support distributed ALTER ... SET SCHEMA, this function always + * returns NIL. */ -static Node * -ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, bool isTopLevel) +static List * +PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, + const char *alterObjectSchemaCommand) { Oid relationId = InvalidOid; bool noWait = false; if (alterObjectSchemaStmt->relation == NULL) { - return (Node *) alterObjectSchemaStmt; + return NIL; } relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation, @@ -956,16 +971,16 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, /* first check whether a distributed relation is affected */ if (!OidIsValid(relationId) || !IsDistributedTable(relationId)) { - return (Node *) alterObjectSchemaStmt; + return NIL; } - /* warn out if a distributed relation is affected */ + /* emit a warning if a distributed relation is affected */ ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to " "worker nodes"), errhint("Connect to worker nodes directly to manually " "change schemas of affected objects."))); - return (Node *) alterObjectSchemaStmt; + return NIL; } @@ -1951,19 +1966,17 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* - * ExecuteDistributedDDLCommand applies a given DDL command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is + * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans- + * action, including metadata sync if needed. If the multi shard commit protocol is * in its default value of '1pc', then a notice message indicating that '2pc' might be * used for extra safety. In the commit protocol, a BEGIN is sent after connection to * each shard placement and COMMIT/ROLLBACK is handled by * CompleteShardPlacementTransactions function. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel) +ExecuteDistributedDDLJob(DDLJob *ddlJob) { - List *taskList = NIL; - bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId); + bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId); if (XactModificationLevel == XACT_MODIFICATION_DATA) { @@ -1979,60 +1992,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, if (shouldSyncMetadata) { SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); + SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); } - taskList = DDLTaskList(relationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); -} - - -/* - * ExecuteDistributedForeignKeyCommand applies a given foreign key command to the given - * distributed table in a distributed transaction. If the multi shard commit protocol is - * in its default value of '1pc', then a notice message indicating that '2pc' might be - * used for extra safety. In the commit protocol, a BEGIN is sent after connection to - * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. - * - * leftRelationId is the relation id of actual distributed table which given foreign key - * command is applied. rightRelationId is the relation id of distributed table which - * foreign key refers to. - */ -static void -ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, - const char *ddlCommandString, bool isTopLevel) -{ - List *taskList = NIL; - bool shouldSyncMetadata = false; - - if (XactModificationLevel == XACT_MODIFICATION_DATA) - { - ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), - errmsg("distributed DDL commands must not appear within " - "transaction blocks containing single-shard data " - "modifications"))); - } - - EnsureCoordinator(); - ShowNoticeIfNotUsing2PC(); - - /* - * It is sufficient to check only one of the tables for metadata syncing on workers, - * since the colocation of two tables implies that either both or none of them have - * metadata on workers. - */ - shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId); - if (shouldSyncMetadata) - { - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); - } - - taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString); - - ExecuteModifyTasksWithoutResults(taskList); + ExecuteModifyTasksWithoutResults(ddlJob->taskList); } @@ -2393,15 +2356,16 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) /* - * ReplicateGrantStmt replicates GRANT/REVOKE command to worker nodes if the - * the statement affects distributed tables. + * PlanGrantStmt determines whether a given GRANT/REVOKE statement involves + * a distributed table. If so, it creates DDLJobs to encapsulate information + * needed during the worker node portion of DDL execution before returning the + * DDLJobs in a List. If no distributed table is involved, this returns NIL. * * NB: So far column level privileges are not supported. */ -void -ReplicateGrantStmt(Node *parsetree) +List * +PlanGrantStmt(GrantStmt *grantStmt) { - GrantStmt *grantStmt = (GrantStmt *) parsetree; StringInfoData privsString; StringInfoData granteesString; StringInfoData targetString; @@ -2409,6 +2373,7 @@ ReplicateGrantStmt(Node *parsetree) ListCell *granteeCell = NULL; ListCell *objectCell = NULL; bool isFirst = true; + List *ddlJobs = NIL; initStringInfo(&privsString); initStringInfo(&granteesString); @@ -2422,7 +2387,7 @@ ReplicateGrantStmt(Node *parsetree) if (grantStmt->targtype != ACL_TARGET_OBJECT || grantStmt->objtype != ACL_OBJECT_RELATION) { - return; + return NIL; } /* deparse the privileges */ @@ -2493,7 +2458,7 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; - bool isTopLevel = true; + DDLJob *ddlJob = NULL; if (!IsDistributedTable(relOid)) { @@ -2526,7 +2491,15 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); + ddlJob = palloc0(sizeof(DDLJob)); + ddlJob->targetRelationId = relOid; + ddlJob->commandString = pstrdup(ddlString.data); + ddlJob->taskList = DDLTaskList(relOid, ddlString.data); + + ddlJobs = lappend(ddlJobs, ddlJob); + resetStringInfo(&ddlString); } + + return ddlJobs; } diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index 2b3a7cdf1..143df5d65 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -14,10 +14,23 @@ extern bool EnableDDLPropagation; +/* + * A DDLJob encapsulates the remote tasks and commands needed to process all or + * part of a distributed DDL command. It hold the distributed relation's oid, + * the original DDL command string (for MX DDL propagation), and a task list of + * DDL_TASK-type Tasks to be executed. + */ +typedef struct DDLJob +{ + Oid targetRelationId; /* oid of the target distributed relation */ + const char *commandString; /* initial (coordinator) DDL command string */ + List *taskList; /* worker DDL tasks to execute */ +} DDLJob; + extern void multi_ProcessUtility(Node *parsetree, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, char *completionTag); -extern void ReplicateGrantStmt(Node *parsetree); +extern List * PlanGrantStmt(GrantStmt *grantStmt); #endif /* MULTI_UTILITY_H */ diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 100ce2d05..d8c1f58ed 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -163,14 +163,10 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey); ERROR: relation "lineitem_orderkey_index" already exists CREATE INDEX try_index ON lineitem USING gist (l_orderkey); -NOTICE: using one-phase commit for distributed DDL commands -HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' ERROR: data type bigint has no default operator class for access method "gist" HINT: You must specify an operator class for the index or define a default operator class for the data type. -CONTEXT: while executing command on localhost:57638 CREATE INDEX try_index ON lineitem (non_existent_column); ERROR: column "non_existent_column" does not exist -CONTEXT: while executing command on localhost:57638 CREATE INDEX ON lineitem (l_orderkey); ERROR: creating index without a name on a distributed table is currently unsupported -- Verify that none of failed indexes got created on the master node @@ -205,6 +201,8 @@ DROP INDEX CONCURRENTLY lineitem_orderkey_index; ERROR: dropping indexes concurrently on distributed tables is currently unsupported -- Verify that we can succesfully drop indexes DROP INDEX lineitem_orderkey_index; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' DROP INDEX lineitem_orderkey_index_new; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index da8ddd2f5..1d96eb681 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -43,9 +43,9 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); (1 row) CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); +DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index 132ed3e72..df9d775ae 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -142,9 +142,9 @@ Indexes: \c - - - :master_port SET client_min_messages TO debug2; CREATE INDEX ddl_test_index ON mx_ddl_table(value); +DEBUG: building index "ddl_test_index" on table "mx_ddl_table" NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: building index "ddl_test_index" on table "mx_ddl_table" RESET client_min_messages; DROP INDEX ddl_test_index; -- show that sequences owned by mx tables result in unique values diff --git a/src/test/regress/expected/multi_mx_modifying_xacts.out b/src/test/regress/expected/multi_mx_modifying_xacts.out index cf45bb9e0..afac72f54 100644 --- a/src/test/regress/expected/multi_mx_modifying_xacts.out +++ b/src/test/regress/expected/multi_mx_modifying_xacts.out @@ -183,7 +183,7 @@ ABORT; -- applies to DDL BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN text motto; +ALTER TABLE labs_mx ADD COLUMN motto text; ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications COMMIT; -- doesn't apply to COPY after modifications diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index b476242b3..1cdc897ab 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -610,9 +610,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh \c - - - :master_port ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column; +NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col; -- verify column is dropped \d test_schema_support.nation_hash; @@ -665,9 +665,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh \c - - - :master_port SET search_path TO test_schema_support; ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column; +NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col; -- verify column is dropped \d test_schema_support.nation_hash; diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index a5165c40a..41a1474ee 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -140,6 +140,9 @@ SELECT * FROM mx_ref_table ORDER BY col_1; \c - - - :master_port DROP TABLE mx_ref_table; +CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' \c - - - :worker_1_port -- DDL commands \d mx_table @@ -149,8 +152,10 @@ DROP TABLE mx_ref_table; col_1 | integer | col_2 | text | col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +Indexes: + "mx_test_uniq_index" UNIQUE, btree (col_1) -CREATE INDEX mx_test_index ON mx_table(col_1); +CREATE INDEX mx_test_index ON mx_table(col_2); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. ALTER TABLE mx_table ADD COLUMN col_4 int; @@ -166,6 +171,8 @@ HINT: Connect to the coordinator and run it again. col_1 | integer | col_2 | text | col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) +Indexes: + "mx_test_uniq_index" UNIQUE, btree (col_1) -- master_modify_multiple_shards SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none'''); @@ -223,6 +230,9 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port +DROP INDEX mx_test_uniq_index; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', 5432); master_add_node -------------------------------------------- diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index b16ab3310..4f9395709 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -261,8 +261,7 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER; NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; -ERROR: column "non_existent_column" of relation "lineitem_alter_220000" does not exist -CONTEXT: while executing command on localhost:57638 +ERROR: column "non_existent_column" of relation "lineitem_alter" does not exist ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2; @@ -360,13 +359,13 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CON -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; ERROR: type "non_existent_type" does not exist -CONTEXT: while executing command on localhost:57638 +LINE 1: ALTER TABLE lineitem_alter ADD COLUMN new_column non_existen... + ^ ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; ERROR: column "null_column" contains null values CONTEXT: while executing command on localhost:57638 ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; ERROR: invalid input syntax for integer: "a" -CONTEXT: while executing command on localhost:57638 -- Verify that we error out on statements involving RENAME ALTER TABLE lineitem_alter RENAME TO lineitem_renamed; ERROR: renaming distributed tables or their objects is currently unsupported diff --git a/src/test/regress/sql/multi_mx_modifying_xacts.sql b/src/test/regress/sql/multi_mx_modifying_xacts.sql index aaf25c722..b7b482095 100644 --- a/src/test/regress/sql/multi_mx_modifying_xacts.sql +++ b/src/test/regress/sql/multi_mx_modifying_xacts.sql @@ -154,7 +154,7 @@ ABORT; -- applies to DDL BEGIN; INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); -ALTER TABLE labs_mx ADD COLUMN text motto; +ALTER TABLE labs_mx ADD COLUMN motto text; COMMIT; -- doesn't apply to COPY after modifications diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index e890354b1..3860e80e0 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -91,11 +91,12 @@ SELECT * FROM mx_ref_table ORDER BY col_1; \c - - - :master_port DROP TABLE mx_ref_table; +CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); \c - - - :worker_1_port -- DDL commands \d mx_table -CREATE INDEX mx_test_index ON mx_table(col_1); +CREATE INDEX mx_test_index ON mx_table(col_2); ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); \d mx_table @@ -122,6 +123,7 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port +DROP INDEX mx_test_uniq_index; SELECT master_add_node('localhost', 5432); \c - - - :worker_1_port