diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 38ac46a2d..af8e384ce 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -463,7 +463,6 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); - ddlJob->concurrentIndexCmd = createIndexStatement->concurrent; ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->commandString = createIndexCommand; ddlJob->taskList = CreateIndexTaskList(createIndexStatement); @@ -598,8 +597,6 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = IsReindexWithParam_compat(reindexStatement, - "concurrently"); ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement, "concurrently"); ddlJob->commandString = reindexCommand; @@ -707,7 +704,6 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, } ddlJob->targetRelationId = distributedRelationId; - ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent; /* * We do not want DROP INDEX CONCURRENTLY to commit locally before @@ -866,6 +862,7 @@ CreateIndexTaskList(IndexStmt *indexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); + task->cannotBeExecutedInTransction = indexStmt->concurrent; taskList = lappend(taskList, task); @@ -910,6 +907,8 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); + task->cannotBeExecutedInTransction = + IsReindexWithParam_compat(reindexStmt, "concurrently"); taskList = lappend(taskList, task); @@ -1205,6 +1204,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); + task->cannotBeExecutedInTransction = dropStmt->concurrent; taskList = lappend(taskList, task); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index eb1bf4d31..575bfa635 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -275,6 +275,7 @@ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) #endif static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); static void CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); +static void EnsureCopyCanRunOnRelation(Oid relationId); static HTAB * CreateConnectionStateHash(MemoryContext memoryContext); static HTAB * CreateShardStateHash(MemoryContext memoryContext); static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, @@ -392,7 +393,11 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) } } + Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + + EnsureCopyCanRunOnRelation(relationId); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); /* disallow modifications to a partition table which have rep. factor > 1 */ @@ -418,6 +423,30 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) } +/* + * EnsureCopyCanRunOnRelation throws error is the database in read-only mode. + */ +static void +EnsureCopyCanRunOnRelation(Oid relationId) +{ + /* first, do the regular check and give consistent errors with regular queries */ + EnsureModificationsCanRunOnRelation(relationId); + + /* + * We use 2PC for all COPY commands. It means that we cannot allow any COPY + * on replicas even if the user allows via WritableStandbyCoordinator GUC. + */ + if (RecoveryInProgress() && WritableStandbyCoordinator) + { + ereport(ERROR, (errmsg("COPY command to Citus tables is not allowed in " + "read-only mode"), + errhint("All COPY commands to citus tables happen via 2PC, " + "and 2PC requires the database to be in a writable state."), + errdetail("the database is read-only"))); + } +} + + /* * CopyToExistingShards implements the COPY table_name FROM ... for hash or * range-partitioned tables where there are already shards into which to copy @@ -2304,11 +2333,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, UseCoordinatedTransaction(); - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || - MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - Use2PCForCoordinatedTransaction(); - } + /* all modifications use 2PC */ + Use2PCForCoordinatedTransaction(); /* define how tuples will be serialised */ CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index 7b6b71839..ccdcf058f 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -128,7 +128,6 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = tableRelationId; - ddlJob->concurrentIndexCmd = false; ddlJob->commandString = renameCommand; ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index ababdd1a8..3f14f42db 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -92,7 +92,6 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -198,7 +197,6 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -238,7 +236,6 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -277,7 +274,6 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -369,7 +365,6 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -410,7 +405,6 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->startNewTransaction = false; ddlJob->commandString = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index b45b5fd57..3405b3c3e 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1095,7 +1095,6 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, /* fill them here as it is possible to use them in some conditional blocks below */ DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; - ddlJob->concurrentIndexCmd = false; const char *sqlForTaskList = alterTableCommand; if (deparseAT) @@ -1659,7 +1658,6 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); QualifyTreeNode((Node *) stmt); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->commandString = DeparseTreeNode((Node *) stmt); ddlJob->taskList = DDLTaskList(relationId, ddlJob->commandString); return list_make1(ddlJob); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 990d67ac0..29bc2df01 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -881,7 +881,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) bool localExecutionSupported = true; - if (!ddlJob->concurrentIndexCmd) + if (!TaskListCannotBeExecutedInTransaction(ddlJob->taskList)) { if (shouldSyncMetadata) { @@ -940,10 +940,6 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) StartTransactionCommand(); } - /* save old commit protocol to restore at xact end */ - Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); - SavedMultiShardCommitProtocol = MultiShardCommitProtocol; - MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; MemoryContext savedContext = CurrentMemoryContext; PG_TRY(); @@ -1044,7 +1040,6 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command) DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = relationId; - ddlJob->concurrentIndexCmd = false; ddlJob->commandString = GetTableDDLCommand(command); ddlJob->taskList = taskList; @@ -1295,7 +1290,6 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = InvalidOid; - ddlJob->concurrentIndexCmd = false; ddlJob->commandString = NULL; ddlJob->taskList = list_make1(task); diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 3b57c9d36..7f1e04f76 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -170,20 +170,6 @@ ExecuteVacuumOnDistributedTables(VacuumStmt *vacuumStmt, List *relationIdList, { if (IsCitusTable(relationId)) { - /* - * VACUUM commands cannot run inside a transaction block, so we use - * the "bare" commit protocol without BEGIN/COMMIT. However, ANALYZE - * commands can run inside a transaction block. Notice that we do this - * once even if there are multiple distributed tables to be vacuumed. - */ - if (executedVacuumCount == 0 && (vacuumParams.options & VACOPT_VACUUM) != 0) - { - /* save old commit protocol to restore at xact end */ - Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); - SavedMultiShardCommitProtocol = MultiShardCommitProtocol; - MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; - } - List *vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex); List *taskList = VacuumTaskList(relationId, vacuumParams, vacuumColumnList); @@ -304,6 +290,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); + task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM); taskList = lappend(taskList, task); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 88504986e..ef33a6585 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -633,7 +633,6 @@ static void CleanUpSessions(DistributedExecution *execution); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); -static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); @@ -1198,7 +1197,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) + if (TaskListCannotBeExecutedInTransaction(taskList)) { /* * We prefer to error on any failures for CREATE INDEX @@ -1225,15 +1224,6 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, xactProperties.errorOnAnyFailure = true; xactProperties.requires2PC = true; } - else if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC && - IsMultiShardModification(modLevel, taskList)) - { - /* - * Even if we're not using 2PC, we prefer to error out - * on any failures during multi shard modifications/DDLs. - */ - xactProperties.errorOnAnyFailure = true; - } } else if (InCoordinatedTransaction()) { @@ -1326,17 +1316,6 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan) } -/* - * IsMultiShardModification returns true if the task list is a modification - * across shards. - */ -static bool -IsMultiShardModification(RowModifyLevel modLevel, List *taskList) -{ - return list_length(taskList) > 1 && TaskListModifiesDatabase(modLevel, taskList); -} - - /* * TaskListModifiesDatabase is a helper function for DistributedExecutionModifiesDatabase and * DistributedPlanModifiesDatabase. @@ -1376,17 +1355,17 @@ DistributedExecutionRequiresRollback(List *taskList) { int taskCount = list_length(taskList); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) - { - return false; - } - if (taskCount == 0) { return false; } Task *task = (Task *) linitial(taskList); + if (task->cannotBeExecutedInTransction) + { + /* vacuum, create index concurrently etc. */ + return false; + } bool selectForUpdate = task->relationRowLockList != NIL; if (selectForUpdate) @@ -1456,18 +1435,15 @@ TaskListRequires2PC(List *taskList) } bool multipleTasks = list_length(taskList) > 1; - if (!ReadOnlyTask(task->taskType) && - multipleTasks && MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + if (!ReadOnlyTask(task->taskType) && multipleTasks) { + /* all multi-shard modifications use 2PC */ return true; } if (task->taskType == DDL_TASK) { - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - return true; - } + return true; } return false; @@ -1499,6 +1475,27 @@ ReadOnlyTask(TaskType taskType) } +/* + * TaskListCannotBeExecutedInTransaction returns true if any of the + * tasks in the input cannot be executed in a transaction. These are + * tasks like VACUUM or CREATE INDEX CONCURRENTLY etc. + */ +bool +TaskListCannotBeExecutedInTransaction(List *taskList) +{ + Task *task = NULL; + foreach_ptr(task, taskList) + { + if (task->cannotBeExecutedInTransction) + { + return true; + } + } + + return false; +} + + /* * SelectForUpdateOnReferenceTable returns true if the input task * contains a FOR UPDATE clause that locks any reference tables. @@ -3566,12 +3563,6 @@ HandleMultiConnectionSuccess(WorkerSession *session) static void Activate2PCIfModifyingTransactionExpandsToNewNode(WorkerSession *session) { - if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC) - { - /* we don't need 2PC, so no need to continue */ - return; - } - DistributedExecution *execution = session->workerPool->distributedExecution; if (TransactionModifiedDistributedTable(execution) && DistributedExecutionModifiesDatabase(execution) && diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 0e6e6472c..a84009a1a 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -319,7 +319,7 @@ EnsureModificationsCanRunOnRelation(Oid relationId) return; } - if (!IsCitusTable(relationId)) + if (!OidIsValid(relationId) || !IsCitusTable(relationId)) { /* we are not interested in PG tables */ return; diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 15192746e..8f71dcd5f 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -242,14 +242,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, /* DROP table commands are currently only supported from the coordinator */ Assert(localGroupId == COORDINATOR_GROUP_ID); - /* - * At this point we intentionally decided to not use 2PC for reference - * tables - */ - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - Use2PCForCoordinatedTransaction(); - } + Use2PCForCoordinatedTransaction(); List *dropTaskList = DropTaskList(relationId, schemaName, relationName, deletableShardIntervalList); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 39c7ce6c6..fc1f5c0fb 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -179,12 +179,6 @@ static const struct config_enum_entry coordinator_aggregation_options[] = { { NULL, 0, false } }; -static const struct config_enum_entry shard_commit_protocol_options[] = { - { "1pc", COMMIT_PROTOCOL_1PC, false }, - { "2pc", COMMIT_PROTOCOL_2PC, false }, - { NULL, 0, false } -}; - static const struct config_enum_entry log_level_options[] = { { "off", CITUS_LOG_LEVEL_OFF, false }, { "debug5", DEBUG5, false}, @@ -1341,20 +1335,6 @@ RegisterCitusConfigVariables(void) GUC_UNIT_MS | GUC_NO_SHOW_ALL, NULL, NULL, NULL); - DefineCustomEnumVariable( - "citus.multi_shard_commit_protocol", - gettext_noop("Sets the commit protocol for commands modifying multiple shards."), - gettext_noop("When a failure occurs during commands that modify multiple " - "shards, two-phase commit is required to ensure data is never lost " - "and this is the default. However, changing to 1pc may give small " - "performance benefits."), - &MultiShardCommitProtocol, - COMMIT_PROTOCOL_2PC, - shard_commit_protocol_options, - PGC_USERSET, - GUC_STANDARD, - NULL, NULL, NULL); - DefineCustomEnumVariable( "citus.multi_shard_modify_mode", gettext_noop("Sets the connection type for multi shard modify queries"), diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 5acd51b9c..191aec8ab 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -46,10 +46,6 @@ CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; -/* GUC, the commit protocol to use for commands affecting more than one connection */ -int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC; -int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; - /* * GUC that determines whether a SELECT in a transaction block should also run in * a transaction block on the worker even if no writes have occurred yet. @@ -94,9 +90,9 @@ MemoryContext CommitContext = NULL; /* * Should this coordinated transaction use 2PC? Set by - * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and - * MultiShardCommitProtocol was set to 2PC. But, even if this - * flag is set, the transaction manager is smart enough to only + * CoordinatedTransactionUse2PC(), e.g. if any modification + * is issued and us 2PC. But, even if this flag is set, + * the transaction manager is smart enough to only * do 2PC on the remote connections that did a modification. * * As a variable name ShouldCoordinatedTransactionUse2PC could @@ -119,7 +115,6 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction SubTransactionId parentSubid, void *arg); /* remaining functions */ -static void ResetShardPlacementTransactionState(void); static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); static void PopSubXact(SubTransactionId subId); @@ -268,13 +263,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) MemoryContext previousContext = CurrentMemoryContext; MemoryContextSwitchTo(CommitContext); - /* - * Call other parts of citus that need to integrate into - * transaction management. Do so before doing other work, so the - * callbacks still can perform work if needed. - */ - ResetShardPlacementTransactionState(); - if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED) { /* handles both already prepared and open transactions */ @@ -321,8 +309,6 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) RemoveIntermediateResultsDirectory(); - ResetShardPlacementTransactionState(); - /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) { @@ -576,21 +562,6 @@ ResetGlobalVariables() } -/* - * ResetShardPlacementTransactionState performs cleanup after the end of a - * transaction. - */ -static void -ResetShardPlacementTransactionState(void) -{ - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) - { - MultiShardCommitProtocol = SavedMultiShardCommitProtocol; - SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; - } -} - - /* * CoordinatedSubTransactionCallback is the callback used to implement * distributed ROLLBACK TO SAVEPOINT. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index fba73445f..605732b65 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -329,6 +329,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(isLocalTableModification); + COPY_SCALAR_FIELD(cannotBeExecutedInTransction); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index a3743c281..1aa7d8261 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -538,6 +538,7 @@ OutTask(OUTFUNC_ARGS) WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_BOOL_FIELD(isLocalTableModification); + WRITE_BOOL_FIELD(cannotBeExecutedInTransction); } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 22f8a8cf1..2b61532ed 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -48,7 +48,6 @@ extern int UtilityHookLevel; typedef struct DDLJob { Oid targetRelationId; /* oid of the target distributed relation */ - bool concurrentIndexCmd; /* related to a CONCURRENTLY index command? */ /* * Whether to commit and start a new transaction before sending commands diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 0054c958a..14d280e9e 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -142,6 +142,7 @@ extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); extern bool ReadOnlyTask(TaskType taskType); +extern bool TaskListCannotBeExecutedInTransaction(List *taskList); extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues, bool diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 523f4a754..8757c149d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -331,6 +331,11 @@ typedef struct Task * isLocalTableModification is true if the task is on modifying a local table. */ bool isLocalTableModification; + + /* + * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. + */ + bool cannotBeExecutedInTransction; } Task; diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index fdb6be1b7..c3c57f244 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -46,14 +46,6 @@ typedef enum CoordinatedTransactionState } CoordinatedTransactionState; -/* Enumeration that defines the different commit protocols available */ -typedef enum -{ - COMMIT_PROTOCOL_BARE = 0, - COMMIT_PROTOCOL_1PC = 1, - COMMIT_PROTOCOL_2PC = 2 -} CommitProtocolType; - /* Enumeration to keep track of context within nested sub-transactions */ typedef struct SubXactContext { @@ -73,13 +65,6 @@ extern bool SelectOpensTransactionBlock; */ extern bool FunctionOpensTransactionBlock; -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; -extern int SingleShardCommitProtocol; - -/* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */ -extern int SavedMultiShardCommitProtocol; - /* state needed to prevent new connections during modifying transactions */ extern XactModificationType XactModificationLevel; diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out deleted file mode 100644 index 9e86d7098..000000000 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ /dev/null @@ -1,287 +0,0 @@ -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- do not cache any connections -SET citus.max_cached_conns_per_worker TO 1; -SET citus.shard_count = 1; -SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; -SET citus.next_shard_id TO 100400; -ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; -CREATE TABLE copy_test (key int, value int); -SELECT create_distributed_table('copy_test', 'key', 'append'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT citus.clear_network_traffic(); - clear_network_traffic ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- all of the following tests test behavior with 2 shard placements ---- -SHOW citus.shard_replication_factor; - citus.shard_replication_factor ---------------------------------------------------------------------- - 2 -(1 row) - ----- kill the connection when we try to create the shard ---- -SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- kill the connection when we try to start a transaction ---- -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- kill the connection when we start the COPY ---- -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- kill the connection when we send the data ---- -SELECT citus.mitmproxy('conn.onCopyData().kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard xxxxx on localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT count(1) FROM copy_test; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- cancel the connection when we send the data ---- -SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").cancel(' || pg_backend_pid() || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: canceling statement due to user request -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; -ERROR: canceling statement due to user request ----- kill the connection when we try to get the size of the table ---- -SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- kill the connection when we try to get the min, max of the table ---- -SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: failure on connection marked as essential: localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 -(2 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - ----- kill the connection when we try to COMMIT ---- -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: failed to commit transaction on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 - copy_test | 100408 | t | 0 | 3 | 100408 | 1 | 8192 | localhost | 57637 | 112 - copy_test | 100408 | t | 0 | 3 | 100408 | 3 | 8192 | localhost | 9060 | 113 -(4 rows) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 8 -(1 row) - --- ==== Clean up, we're done here ==== -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -DROP TABLE copy_test; diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out deleted file mode 100644 index 227d420e3..000000000 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ /dev/null @@ -1,421 +0,0 @@ -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- do not cache any connections -SET citus.max_cached_conns_per_worker TO 0; -SET citus.shard_count = 1; -SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; -SET citus.next_shard_id TO 100400; -SET citus.max_cached_conns_per_worker TO 0; -ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; -CREATE TABLE copy_test (key int, value int); -SELECT create_distributed_table('copy_test', 'key'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT citus.clear_network_traffic(); - clear_network_traffic ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - --- ==== kill the connection when we try to start a transaction ==== --- the query should abort -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:xxxxx -CONTEXT: COPY copy_test, line 1: "0, 0" --- ==== kill the connection when we try to start the COPY ==== --- the query should abort -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -COPY copy_test, line 1: "0, 0" --- ==== kill the connection when we first start sending data ==== --- the query should abort -SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard xxxxx on localhost:xxxxx --- ==== kill the connection when the worker confirms it's received the data ==== --- the query should abort -SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard xxxxx on localhost:xxxxx --- ==== kill the connection when we try to send COMMIT ==== --- the query should succeed, and the placement should be marked inactive -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) AND shardstate = 3; - count ---------------------------------------------------------------------- - 0 -(1 row) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 4 -(1 row) - -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: failed to commit transaction on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx --- the shard is marked invalid -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) AND shardstate = 3; - count ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT count(1) FROM copy_test; - count ---------------------------------------------------------------------- - 8 -(1 row) - --- ==== clean up a little bit before running the next test ==== -UPDATE pg_dist_shard_placement SET shardstate = 1 -WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -); -TRUNCATE copy_test; --- ==== try to COPY invalid data ==== --- here the coordinator actually sends the data, but then unexpectedly closes the --- connection when it notices the data stream is broken. Crucially, it closes the --- connection before sending COMMIT, so no data makes it into the worker. -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; -ERROR: missing data for column "value" -CONTEXT: COPY copy_test, line 5: "10" --- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; -ERROR: missing data for column "value" -CONTEXT: COPY copy_test, line 5: "10" -SELECT * FROM copy_test ORDER BY key, value; - key | value ---------------------------------------------------------------------- -(0 rows) - --- ==== clean up some more to prepare for tests with only one replica ==== -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -TRUNCATE copy_test; -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; - shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - 100400 | 1 | 0 | localhost | 9060 | 100 - 100400 | 3 | 0 | localhost | 57637 | 101 -(2 rows) - --- ==== okay, run some tests where there's only one active shard ==== -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the worker is unreachable -SELECT citus.mitmproxy('conn.killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: COPY copy_test, line 1: "0, 0" -ERROR: could not connect to any active placements -CONTEXT: COPY copy_test, line 1: "0, 0" -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the first message fails -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:xxxxx -CONTEXT: COPY copy_test, line 1: "0, 0" -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the COPY message fails -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -COPY copy_test, line 1: "0, 0" -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the COPY data fails -SELECT citus.mitmproxy('conn.onCopyData().killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard xxxxx on localhost:xxxxx -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the COMMIT fails -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: failed to commit transaction on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -ERROR: could not commit transaction on any active node -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(4 rows) - --- the placement is not marked invalid -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; - shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - 100400 | 1 | 0 | localhost | 9060 | 100 - 100400 | 3 | 0 | localhost | 57637 | 101 -(2 rows) - --- the COMMIT makes it through but the connection dies before we get a response -SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: failed to commit transaction on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -ERROR: could not commit transaction on any active node -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; - shardid | shardstate | shardlength | nodename | nodeport | placementid ---------------------------------------------------------------------- - 100400 | 1 | 0 | localhost | 9060 | 100 - 100400 | 3 | 0 | localhost | 57637 | 101 -(2 rows) - -SELECT * FROM copy_test; - key | value ---------------------------------------------------------------------- - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 - 0 | 0 - 1 | 1 - 2 | 4 - 3 | 9 -(8 rows) - --- ==== Clean up, we're done here ==== -SELECT citus.mitmproxy('conn.allow()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -DROP TABLE copy_test; diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty.out b/src/test/regress/expected/failure_create_distributed_table_non_empty.out index cce74939a..3de2cd874 100644 --- a/src/test/regress/expected/failure_create_distributed_table_non_empty.out +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty.out @@ -589,7 +589,6 @@ DROP SCHEMA create_distributed_table_non_empty_failure; CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); -SET citus.multi_shard_commit_protocol TO '1pc'; SELECT citus.mitmproxy('conn.kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_create_table.out b/src/test/regress/expected/failure_create_table.out index 9e042eb8d..e37060735 100644 --- a/src/test/regress/expected/failure_create_table.out +++ b/src/test/regress/expected/failure_create_table.out @@ -448,8 +448,6 @@ DROP TABLE test_table; DROP SCHEMA failure_create_table; CREATE SCHEMA failure_create_table; CREATE TABLE test_table(id int, value_1 int); --- Test inside transaction and with 1PC -SET citus.multi_shard_commit_protocol TO "1pc"; -- Kill connection before sending query to the worker with 1pc. SELECT citus.mitmproxy('conn.kill()'); mitmproxy @@ -609,7 +607,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name tex AS 'citus', $$master_create_worker_shards$$ LANGUAGE C STRICT; -- Test master_create_worker_shards with 2pc -SET citus.multi_shard_commit_protocol TO "2pc"; CREATE TABLE test_table_2(id int, value_1 int); SELECT master_create_distributed_table('test_table_2', 'id', 'hash'); master_create_distributed_table diff --git a/src/test/regress/expected/failure_ddl.out b/src/test/regress/expected/failure_ddl.out index 49fc1eddf..20082792a 100644 --- a/src/test/regress/expected/failure_ddl.out +++ b/src/test/regress/expected/failure_ddl.out @@ -17,8 +17,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) SET citus.next_shard_id TO 100800; --- we'll start with replication factor 1, 1PC and parallel mode -SET citus.multi_shard_commit_protocol TO '1pc'; +-- we'll start with replication factor 1, 2PC and parallel mode SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; CREATE TABLE test_table (key int, value int); @@ -137,7 +136,6 @@ SELECT citus.mitmproxy('conn.allow()'); DROP TABLE test_table; SET citus.next_shard_id TO 100800; -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; CREATE TABLE test_table (key int, value int); @@ -325,8 +323,6 @@ SELECT run_command_on_placements('test_table', $$SELECT array_agg(name::text ORD (localhost,57637,100803,t,"{key,new_column,value}") (4 rows) --- now, lets test with 2PC -SET citus.multi_shard_commit_protocol TO '2pc'; -- in the first test, kill just in the first -- response we get from the worker SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); @@ -686,7 +682,6 @@ SELECT run_command_on_placements('test_table', $$SELECT array_agg(name::text ORD (4 rows) -- another set of tests with 2PC and replication factor = 2 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 2; -- re-create the table with replication factor 2 diff --git a/src/test/regress/expected/failure_multi_shard_update_delete.out b/src/test/regress/expected/failure_multi_shard_update_delete.out index 15ff08a7d..5be0980f1 100644 --- a/src/test/regress/expected/failure_multi_shard_update_delete.out +++ b/src/test/regress/expected/failure_multi_shard_update_delete.out @@ -47,10 +47,156 @@ SELECT count(*) FROM t2; 7 (1 row) -SHOW citus.multi_shard_commit_protocol ; - citus.multi_shard_commit_protocol +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); + mitmproxy --------------------------------------------------------------------- - 2pc + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +DELETE FROM t2 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is deleted +SELECT count(*) FROM t2; + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +--------------------------------------------------------------------- + 3 | 1 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +--------------------------------------------------------------------- + 3 | 1 +(1 row) + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +--------------------------------------------------------------------- + 3 | 1 +(1 row) + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +--------------------------------------------------------------------- + 3 | 1 +(1 row) + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +UPDATE t2 SET c = 4 WHERE b = 2; +ERROR: canceling statement due to user request +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + b2 | c4 +--------------------------------------------------------------------- + 3 | 1 (1 row) -- DELETION TESTS @@ -205,161 +351,6 @@ SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 3 | 1 (1 row) --- switch to 1PC -SET citus.multi_shard_commit_protocol TO '1PC'; --- DELETION TESTS --- delete using a filter on non-partition column filter --- test both kill and cancellation -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- issue a multi shard delete -DELETE FROM t2 WHERE b = 2; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. --- verify nothing is deleted -SELECT count(*) FROM t2; - count ---------------------------------------------------------------------- - 7 -(1 row) - --- kill just one connection -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -DELETE FROM t2 WHERE b = 2; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. --- verify nothing is deleted -SELECT count(*) FROM t2; - count ---------------------------------------------------------------------- - 7 -(1 row) - --- cancellation -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- issue a multi shard delete -DELETE FROM t2 WHERE b = 2; -ERROR: canceling statement due to user request --- verify nothing is deleted -SELECT count(*) FROM t2; - count ---------------------------------------------------------------------- - 7 -(1 row) - --- cancel just one connection -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -DELETE FROM t2 WHERE b = 2; -ERROR: canceling statement due to user request --- verify nothing is deleted -SELECT count(*) FROM t2; - count ---------------------------------------------------------------------- - 7 -(1 row) - --- UPDATE TESTS --- update non-partition column based on a filter on another non-partition column --- DELETION TESTS --- delete using a filter on non-partition column filter --- test both kill and cancellation -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - b2 | c4 ---------------------------------------------------------------------- - 3 | 1 -(1 row) - -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- issue a multi shard update -UPDATE t2 SET c = 4 WHERE b = 2; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - b2 | c4 ---------------------------------------------------------------------- - 3 | 1 -(1 row) - --- kill just one connection -SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -UPDATE t2 SET c = 4 WHERE b = 2; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - b2 | c4 ---------------------------------------------------------------------- - 3 | 1 -(1 row) - --- cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - --- issue a multi shard update -UPDATE t2 SET c = 4 WHERE b = 2; -ERROR: canceling statement due to user request --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - b2 | c4 ---------------------------------------------------------------------- - 3 | 1 -(1 row) - --- cancel just one connection -SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); - mitmproxy ---------------------------------------------------------------------- - -(1 row) - -UPDATE t2 SET c = 4 WHERE b = 2; -ERROR: canceling statement due to user request --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - b2 | c4 ---------------------------------------------------------------------- - 3 | 1 -(1 row) - -RESET citus.multi_shard_commit_protocol; -- -- fail when cascading deletes from foreign key -- unfortunately cascading deletes from foreign keys @@ -610,8 +601,6 @@ SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FRO 3 | 3 (1 row) --- switch to 1PC -SET citus.multi_shard_commit_protocol TO '1PC'; SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; b1 | b2 --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_truncate.out b/src/test/regress/expected/failure_truncate.out index 6ecb7bd5b..645ec0392 100644 --- a/src/test/regress/expected/failure_truncate.out +++ b/src/test/regress/expected/failure_truncate.out @@ -16,8 +16,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) --- we'll start with replication factor 1, 1PC and parallel mode -SET citus.multi_shard_commit_protocol TO '1pc'; +-- we'll start with replication factor 1, 2PC and parallel mode SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; CREATE TABLE test_table (key int, value int); @@ -205,11 +204,9 @@ SELECT count(*) FROM test_table; 20 (1 row) --- kill as soon as the coordinator sends COMMIT --- One shard should not get truncated but the other should --- since it is sent from another connection. --- Thus, we should see a partially successful truncate --- Note: This is the result of using 1pc and there is no way to recover from it +-- kill as soon as the coordinator sends COMMIT PREPARED +-- the transaction succeeds on one placement, and we need to +-- recover prepared statements to see the other placement as well SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -223,6 +220,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 2 +(1 row) + SELECT * FROM unhealthy_shard_count; count --------------------------------------------------------------------- @@ -232,7 +235,7 @@ SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; count --------------------------------------------------------------------- - 8 + 0 (1 row) -- refill the table @@ -304,8 +307,6 @@ WARNING: server closed the connection unexpectedly connection not open connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: could not commit transaction for shard xxxxx on any active node -WARNING: could not commit transaction for shard xxxxx on any active node SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -582,8 +583,6 @@ SELECT count(*) FROM test_table; 20 (1 row) --- now, lets test with 2PC -SET citus.multi_shard_commit_protocol TO '2pc'; -- in the first test, kill just in the first -- response we get from the worker SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); @@ -943,7 +942,6 @@ SELECT count(*) FROM test_table; (1 row) -- final set of tests with 2PC and replication factor = 2 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 2; -- re-create the table with replication factor 2 diff --git a/src/test/regress/expected/failure_vacuum.out b/src/test/regress/expected/failure_vacuum.out index ebf167042..018249ad2 100644 --- a/src/test/regress/expected/failure_vacuum.out +++ b/src/test/regress/expected/failure_vacuum.out @@ -10,7 +10,6 @@ SELECT citus.mitmproxy('conn.allow()'); SET citus.shard_count = 1; SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; CREATE TABLE vacuum_test (key int, value int); SELECT create_distributed_table('vacuum_test', 'key'); create_distributed_table diff --git a/src/test/regress/expected/multi_follower_dml.out b/src/test/regress/expected/multi_follower_dml.out index 4538d82e2..08e47e5c6 100644 --- a/src/test/regress/expected/multi_follower_dml.out +++ b/src/test/regress/expected/multi_follower_dml.out @@ -168,26 +168,40 @@ DETAIL: the database is read-only HINT: All modifications to replicated tables happen via 2PC, and 2PC requires the database to be in a writable state. INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); ERROR: cannot execute INSERT in a read-only transaction --- COPY is not possible in 2PC mode +-- COPY is not possible because Citus user 2PC COPY the_table (a, b, z) FROM STDIN WITH CSV; -ERROR: cannot assign TransactionIds during recovery --- COPY is not possible in 2PC mode +ERROR: COPY command to Citus tables is not allowed in read-only mode +DETAIL: the database is read-only +HINT: All COPY commands to citus tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. COPY the_replicated_table (a, b, z) FROM STDIN WITH CSV; -ERROR: cannot assign TransactionIds during recovery +ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. +DETAIL: the database is read-only +HINT: All modifications to replicated tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. COPY reference_table (a, b, z) FROM STDIN WITH CSV; -ERROR: cannot assign TransactionIds during recovery +ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. +DETAIL: the database is read-only +HINT: All modifications to replicated tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. COPY citus_local_table (a, b, z) FROM STDIN WITH CSV; -ERROR: cannot assign TransactionIds during recovery --- 1PC is possible -SET citus.multi_shard_commit_protocol TO '1pc'; +ERROR: COPY command to Citus tables is not allowed in read-only mode +DETAIL: the database is read-only +HINT: All COPY commands to citus tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. +-- all multi-shard modifications require 2PC hence not supported INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); +ERROR: cannot assign TransactionIds during recovery SELECT * FROM the_table ORDER BY a; a | b | z --------------------------------------------------------------------- - 2 | 3 | 4 - 5 | 6 | 7 -(2 rows) +(0 rows) +-- all modifications to reference tables use 2PC, hence not supported INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. DETAIL: the database is read-only @@ -197,6 +211,7 @@ SELECT * FROM reference_table ORDER BY a; --------------------------------------------------------------------- (0 rows) +-- citus local tables are on the coordinator, and coordinator is read-only INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); ERROR: cannot execute INSERT in a read-only transaction SELECT * FROM citus_local_table ORDER BY a; @@ -207,12 +222,7 @@ SELECT * FROM citus_local_table ORDER BY a; -- modifying CTEs are possible WITH del AS (DELETE FROM the_table RETURNING *) SELECT * FROM del ORDER BY a; - a | b | z ---------------------------------------------------------------------- - 2 | 3 | 4 - 5 | 6 | 7 -(2 rows) - +ERROR: cannot assign TransactionIds during recovery WITH del AS (DELETE FROM reference_table RETURNING *) SELECT * FROM del ORDER BY a; ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. @@ -226,17 +236,29 @@ HINT: All modifications to replicated tables happen via 2PC, and 2PC requires t WITH del AS (DELETE FROM citus_local_table RETURNING *) SELECT * FROM del ORDER BY a; ERROR: cannot execute DELETE in a read-only transaction --- COPY is possible in 1PC mode +-- multi-shard COPY is not possible due to 2PC COPY the_table (a, b, z) FROM STDIN WITH CSV; +ERROR: COPY command to Citus tables is not allowed in read-only mode +DETAIL: the database is read-only +HINT: All COPY commands to citus tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. COPY reference_table (a, b, z) FROM STDIN WITH CSV; -ERROR: cannot assign TransactionIds during recovery +ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. +DETAIL: the database is read-only +HINT: All modifications to replicated tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. COPY citus_local_table (a, b, z) FROM STDIN WITH CSV; +ERROR: COPY command to Citus tables is not allowed in read-only mode +DETAIL: the database is read-only +HINT: All COPY commands to citus tables happen via 2PC, and 2PC requires the database to be in a writable state. +\. +invalid command \. SELECT * FROM the_table ORDER BY a; - a | b | z + a | b | z --------------------------------------------------------------------- - 10 | 10 | 10 - 11 | 11 | 11 -(2 rows) +(0 rows) SELECT * FROM reference_table ORDER BY a; a | b | z @@ -244,19 +266,19 @@ SELECT * FROM reference_table ORDER BY a; (0 rows) SELECT * FROM citus_local_table ORDER BY a; - a | b | z + a | b | z --------------------------------------------------------------------- - 10 | 10 | 10 - 11 | 11 | 11 -(2 rows) +(0 rows) -DELETE FROM the_table; DELETE FROM reference_table; ERROR: writing to worker nodes is not currently allowed for replicated tables such as reference tables or hash distributed tables with replication factor greater than 1. DETAIL: the database is read-only HINT: All modifications to replicated tables happen via 2PC, and 2PC requires the database to be in a writable state. DELETE FROM citus_local_table; ERROR: cannot execute DELETE in a read-only transaction +-- multi-shard modification always uses 2PC, so not supported +DELETE FROM the_table; +ERROR: cannot assign TransactionIds during recovery -- DDL is not possible TRUNCATE the_table; ERROR: cannot execute TRUNCATE TABLE in a read-only transaction @@ -298,11 +320,9 @@ SELECT * FROM reference_table ORDER BY a; (0 rows) SELECT * FROM citus_local_table ORDER BY a; - a | b | z + a | b | z --------------------------------------------------------------------- - 10 | 10 | 10 - 11 | 11 | 11 -(2 rows) +(0 rows) -- we should still disallow writes to local tables INSERT INTO local VALUES (1, 1); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index ad0369790..7a5b75e11 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -575,7 +575,6 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); (1 row) SET citus.shard_count = 5; -SET citus.multi_shard_commit_protocol TO '2pc'; CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; -- Create MX tables @@ -748,7 +747,6 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid, nodename, nodeport; -- Check that CREATE INDEX statement is propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; SET client_min_messages TO 'ERROR'; CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_table_2_col1_key UNIQUE (col1); @@ -769,7 +767,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE -- Check that DROP INDEX statement is propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; DROP INDEX mx_test_schema_2.mx_index_3; \c - - - :worker_1_port SELECT "Column", "Type", "Definition" FROM index_attrs WHERE @@ -777,7 +774,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE ERROR: relation "mx_test_schema_2.mx_index_3" does not exist -- Check that ALTER TABLE statements are propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; ALTER TABLE @@ -805,7 +801,6 @@ SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1 -- Check that foreign key constraint with NOT VALID works as well \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; ALTER TABLE mx_test_schema_1.mx_table_1 @@ -1789,7 +1784,6 @@ DROP TABLE mx_ref; DROP TABLE dist_table_1, dist_table_2; RESET citus.shard_count; RESET citus.shard_replication_factor; -RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out index 01c8ab839..5509bd501 100644 --- a/src/test/regress/expected/multi_modifying_xacts.out +++ b/src/test/regress/expected/multi_modifying_xacts.out @@ -291,7 +291,6 @@ SELECT count(*) FROM pg_dist_transaction; (1 row) -- 2pc failure and success tests -SET citus.multi_shard_commit_protocol TO '2pc'; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- @@ -338,7 +337,6 @@ SELECT count(*) FROM pg_dist_transaction; 2 (1 row) -RESET citus.multi_shard_commit_protocol; -- create a check function SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$ BEGIN @@ -372,7 +370,6 @@ ORDER BY nodeport, shardid; -- for replicated tables use 2PC even if multi-shard commit protocol -- is set to 2PC BEGIN; -SET LOCAL citus.multi_shard_commit_protocol TO '1pc'; DELETE FROM researchers WHERE lab_id = 6; \copy researchers FROM STDIN delimiter ',' \copy researchers FROM STDIN delimiter ',' diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index f253d5b5d..20cec7578 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -17,7 +17,6 @@ SELECT pg_reload_conf(); t (1 row) -SET citus.multi_shard_commit_protocol TO '2pc'; -- Ensure pg_dist_transaction is empty for test SELECT recover_prepared_transactions(); recover_prepared_transactions diff --git a/src/test/regress/expected/multi_name_lengths.out b/src/test/regress/expected/multi_name_lengths.out index 9821101fa..5ef151042 100644 --- a/src/test/regress/expected/multi_name_lengths.out +++ b/src/test/regress/expected/multi_name_lengths.out @@ -2,7 +2,6 @@ -- MULTI_NAME_LENGTHS -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 225000; -SET citus.multi_shard_commit_protocol = '2pc'; SET citus.shard_count TO 2; -- this function is dropped in Citus10, added here for tests CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, diff --git a/src/test/regress/expected/multi_prepare_plsql.out b/src/test/regress/expected/multi_prepare_plsql.out index 83fc11457..74c9835ff 100644 --- a/src/test/regress/expected/multi_prepare_plsql.out +++ b/src/test/regress/expected/multi_prepare_plsql.out @@ -1179,7 +1179,6 @@ SELECT key, seq FROM func_parameter_test ORDER BY seq; DROP FUNCTION insert_with_max(text); DROP TABLE func_parameter_test; -- test prepared DDL, mainly to verify we don't mess up the query tree -SET citus.multi_shard_commit_protocol TO '2pc'; CREATE TABLE prepare_ddl (x int, y int); SELECT create_distributed_table('prepare_ddl', 'x'); create_distributed_table diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index c8fd42454..c5b8347f8 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -98,7 +98,6 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; SET citus.force_max_query_parallelization TO ON; SET citus.shard_replication_factor TO 2; SET citus.shard_count TO 2; -SET citus.multi_shard_commit_protocol TO '2pc'; -- create_distributed_table may behave differently if shards -- created via the executor or not, so not checking its value -- may result multiple test outputs, so instead just make sure that diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index f3da8a5a1..e5257c5a0 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -52,7 +52,6 @@ select create_distributed_table('dist','a'); create index idx on dist(a); set citus.log_remote_commands to on; -- make sure that we send the tablespace option -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; reindex(TABLESPACE test_tablespace) index idx; NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); diff --git a/src/test/regress/expected/sequential_modifications.out b/src/test/regress/expected/sequential_modifications.out index a3b1ad506..80bbba25a 100644 --- a/src/test/regress/expected/sequential_modifications.out +++ b/src/test/regress/expected/sequential_modifications.out @@ -117,7 +117,6 @@ SELECT distributed_2PCs_are_equal_to_placement_count(); -- even if 1PC used, we use 2PC as we modify replicated tables -- see distributed TXs in the pg_dist_transaction -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -132,7 +131,6 @@ SELECT no_distributed_2PCs(); f (1 row) -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -154,7 +152,6 @@ SELECT create_reference_table('ref_test'); (1 row) -SET citus.multi_shard_commit_protocol TO '1pc'; -- reference tables should always use 2PC SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); @@ -186,7 +183,7 @@ SELECT distributed_2PCs_are_equal_to_worker_count(); (1 row) -- tables with replication factor > 1 should also obey --- both multi_shard_commit_protocol and multi_shard_modify_mode +-- multi_shard_modify_mode SET citus.shard_replication_factor TO 2; CREATE TABLE test_table_rep_2 (a int); SELECT create_distributed_table('test_table_rep_2', 'a'); @@ -196,7 +193,6 @@ SELECT create_distributed_table('test_table_rep_2', 'a'); (1 row) -- even if 1PC used, we use 2PC with rep > 1 -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -226,7 +222,6 @@ SELECT no_distributed_2PCs(); (1 row) -- 2PC should always use 2PC with rep > 1 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -257,7 +252,6 @@ SELECT distributed_2PCs_are_equal_to_placement_count(); -- CREATE INDEX CONCURRENTLY should work fine with rep > 1 -- with both 2PC and different parallel modes -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); recover_prepared_transactions @@ -273,7 +267,6 @@ SELECT no_distributed_2PCs(); t (1 row) -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); recover_prepared_transactions diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index f016c239b..17b4d0c14 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -20,8 +20,6 @@ test: failure_create_reference_table test: failure_create_distributed_table_non_empty test: failure_create_table -test: failure_1pc_copy_hash -test: failure_1pc_copy_append test: failure_multi_shard_update_delete test: failure_cte_subquery test: failure_insert_select_via_coordinator diff --git a/src/test/regress/sql/failure_1pc_copy_append.sql b/src/test/regress/sql/failure_1pc_copy_append.sql deleted file mode 100644 index a97e21058..000000000 --- a/src/test/regress/sql/failure_1pc_copy_append.sql +++ /dev/null @@ -1,92 +0,0 @@ -SELECT citus.mitmproxy('conn.allow()'); - --- do not cache any connections -SET citus.max_cached_conns_per_worker TO 1; - -SET citus.shard_count = 1; -SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; -SET citus.next_shard_id TO 100400; -ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; - -CREATE TABLE copy_test (key int, value int); -SELECT create_distributed_table('copy_test', 'key', 'append'); - -SELECT citus.clear_network_traffic(); - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT count(1) FROM copy_test; - ----- all of the following tests test behavior with 2 shard placements ---- -SHOW citus.shard_replication_factor; - ----- kill the connection when we try to create the shard ---- -SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we try to start a transaction ---- -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we start the COPY ---- -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we send the data ---- -SELECT citus.mitmproxy('conn.onCopyData().kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; - -SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").kill()'); -SELECT count(1) FROM copy_test; - ----- cancel the connection when we send the data ---- -SELECT citus.mitmproxy('conn.onQuery(query="SELECT|COPY").cancel(' || pg_backend_pid() || ')'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we try to get the size of the table ---- -SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we try to get the min, max of the table ---- -SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - ----- kill the connection when we try to COMMIT ---- -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p - WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass - ORDER BY placementid; -SELECT count(1) FROM copy_test; - --- ==== Clean up, we're done here ==== - -SELECT citus.mitmproxy('conn.allow()'); -DROP TABLE copy_test; diff --git a/src/test/regress/sql/failure_1pc_copy_hash.sql b/src/test/regress/sql/failure_1pc_copy_hash.sql deleted file mode 100644 index 34c3d15c4..000000000 --- a/src/test/regress/sql/failure_1pc_copy_hash.sql +++ /dev/null @@ -1,148 +0,0 @@ -SELECT citus.mitmproxy('conn.allow()'); - --- do not cache any connections -SET citus.max_cached_conns_per_worker TO 0; - -SET citus.shard_count = 1; -SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; -SET citus.next_shard_id TO 100400; -SET citus.max_cached_conns_per_worker TO 0; -ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; - -CREATE TABLE copy_test (key int, value int); -SELECT create_distributed_table('copy_test', 'key'); - -SELECT citus.clear_network_traffic(); - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT count(1) FROM copy_test; - --- ==== kill the connection when we try to start a transaction ==== --- the query should abort - -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; - --- ==== kill the connection when we try to start the COPY ==== --- the query should abort - -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; - --- ==== kill the connection when we first start sending data ==== --- the query should abort - -SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; - --- ==== kill the connection when the worker confirms it's received the data ==== --- the query should abort - -SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; - --- ==== kill the connection when we try to send COMMIT ==== --- the query should succeed, and the placement should be marked inactive - -SELECT citus.mitmproxy('conn.allow()'); -SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) AND shardstate = 3; -SELECT count(1) FROM copy_test; - -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; - --- the shard is marked invalid -SELECT citus.mitmproxy('conn.allow()'); -SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) AND shardstate = 3; -SELECT count(1) FROM copy_test; - --- ==== clean up a little bit before running the next test ==== - -UPDATE pg_dist_shard_placement SET shardstate = 1 -WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -); -TRUNCATE copy_test; - --- ==== try to COPY invalid data ==== - --- here the coordinator actually sends the data, but then unexpectedly closes the --- connection when it notices the data stream is broken. Crucially, it closes the --- connection before sending COMMIT, so no data makes it into the worker. -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; - --- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; - -SELECT * FROM copy_test ORDER BY key, value; - --- ==== clean up some more to prepare for tests with only one replica ==== - -SELECT citus.mitmproxy('conn.allow()'); - -TRUNCATE copy_test; -UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; - --- ==== okay, run some tests where there's only one active shard ==== - -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT * FROM copy_test; - --- the worker is unreachable -SELECT citus.mitmproxy('conn.killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); -SELECT * FROM copy_test; - --- the first message fails -SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); -SELECT * FROM copy_test; - --- the COPY message fails -SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); -SELECT * FROM copy_test; - --- the COPY data fails -SELECT citus.mitmproxy('conn.onCopyData().killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); -SELECT * FROM copy_test; - --- the COMMIT fails -SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); -SELECT * FROM copy_test; - --- the placement is not marked invalid -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; - --- the COMMIT makes it through but the connection dies before we get a response -SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); -COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -SELECT citus.mitmproxy('conn.allow()'); - -SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( - SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass -) ORDER BY nodeport, placementid; -SELECT * FROM copy_test; - --- ==== Clean up, we're done here ==== - -SELECT citus.mitmproxy('conn.allow()'); -DROP TABLE copy_test; diff --git a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql index 1db9783a3..21350aee5 100644 --- a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql +++ b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql @@ -215,7 +215,6 @@ DROP SCHEMA create_distributed_table_non_empty_failure; CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); -SET citus.multi_shard_commit_protocol TO '1pc'; SELECT citus.mitmproxy('conn.kill()'); SELECT create_distributed_table('test_table', 'id'); diff --git a/src/test/regress/sql/failure_create_table.sql b/src/test/regress/sql/failure_create_table.sql index 2710c4e01..c10c16c30 100644 --- a/src/test/regress/sql/failure_create_table.sql +++ b/src/test/regress/sql/failure_create_table.sql @@ -164,9 +164,6 @@ DROP SCHEMA failure_create_table; CREATE SCHEMA failure_create_table; CREATE TABLE test_table(id int, value_1 int); --- Test inside transaction and with 1PC -SET citus.multi_shard_commit_protocol TO "1pc"; - -- Kill connection before sending query to the worker with 1pc. SELECT citus.mitmproxy('conn.kill()'); @@ -242,7 +239,6 @@ CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name tex LANGUAGE C STRICT; -- Test master_create_worker_shards with 2pc -SET citus.multi_shard_commit_protocol TO "2pc"; CREATE TABLE test_table_2(id int, value_1 int); SELECT master_create_distributed_table('test_table_2', 'id', 'hash'); diff --git a/src/test/regress/sql/failure_ddl.sql b/src/test/regress/sql/failure_ddl.sql index 47448f758..2618771ab 100644 --- a/src/test/regress/sql/failure_ddl.sql +++ b/src/test/regress/sql/failure_ddl.sql @@ -20,8 +20,7 @@ SELECT citus.mitmproxy('conn.allow()'); SET citus.next_shard_id TO 100800; --- we'll start with replication factor 1, 1PC and parallel mode -SET citus.multi_shard_commit_protocol TO '1pc'; +-- we'll start with replication factor 1, 2PC and parallel mode SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; @@ -68,7 +67,6 @@ SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where r SELECT citus.mitmproxy('conn.allow()'); DROP TABLE test_table; SET citus.next_shard_id TO 100800; -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; @@ -143,9 +141,6 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; SELECT run_command_on_placements('test_table', $$SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = '%s'::regclass;$$) ORDER BY 1; --- now, lets test with 2PC -SET citus.multi_shard_commit_protocol TO '2pc'; - -- in the first test, kill just in the first -- response we get from the worker SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); @@ -261,7 +256,6 @@ SELECT run_command_on_placements('test_table', $$SELECT array_agg(name::text ORD -- another set of tests with 2PC and replication factor = 2 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 2; diff --git a/src/test/regress/sql/failure_multi_shard_update_delete.sql b/src/test/regress/sql/failure_multi_shard_update_delete.sql index d9d0c2250..124af66a0 100644 --- a/src/test/regress/sql/failure_multi_shard_update_delete.sql +++ b/src/test/regress/sql/failure_multi_shard_update_delete.sql @@ -30,7 +30,76 @@ INSERT INTO t2 VALUES (1, 1, 1), (1, 2, 1), (2, 1, 2), (2, 2, 4), (3, 1, 3), (3, SELECT pg_backend_pid() as pid \gset SELECT count(*) FROM t2; -SHOW citus.multi_shard_commit_protocol ; +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); +-- issue a multi shard delete +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); +DELETE FROM t2 WHERE b = 2; + +-- verify nothing is deleted +SELECT count(*) FROM t2; + +-- UPDATE TESTS +-- update non-partition column based on a filter on another non-partition column +-- DELETION TESTS +-- delete using a filter on non-partition column filter +-- test both kill and cancellation + + +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- kill just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancellation +SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); +-- issue a multi shard update +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; + +-- cancel just one connection +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); +UPDATE t2 SET c = 4 WHERE b = 2; + +-- verify nothing is updated +SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; -- DELETION TESTS -- delete using a filter on non-partition column filter @@ -103,81 +172,6 @@ UPDATE t2 SET c = 4 WHERE b = 2; -- verify nothing is updated SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; --- switch to 1PC -SET citus.multi_shard_commit_protocol TO '1PC'; - --- DELETION TESTS --- delete using a filter on non-partition column filter --- test both kill and cancellation - -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").kill()'); --- issue a multi shard delete -DELETE FROM t2 WHERE b = 2; - --- verify nothing is deleted -SELECT count(*) FROM t2; - --- kill just one connection -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").kill()'); -DELETE FROM t2 WHERE b = 2; - --- verify nothing is deleted -SELECT count(*) FROM t2; - --- cancellation -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM").cancel(' || :pid || ')'); --- issue a multi shard delete -DELETE FROM t2 WHERE b = 2; - --- verify nothing is deleted -SELECT count(*) FROM t2; - --- cancel just one connection -SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM multi_shard.t2_201005").cancel(' || :pid || ')'); -DELETE FROM t2 WHERE b = 2; - --- verify nothing is deleted -SELECT count(*) FROM t2; - --- UPDATE TESTS --- update non-partition column based on a filter on another non-partition column --- DELETION TESTS --- delete using a filter on non-partition column filter --- test both kill and cancellation - - -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); --- issue a multi shard update -UPDATE t2 SET c = 4 WHERE b = 2; - --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - --- kill just one connection -SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").kill()'); -UPDATE t2 SET c = 4 WHERE b = 2; - --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - --- cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || :pid || ')'); --- issue a multi shard update -UPDATE t2 SET c = 4 WHERE b = 2; - --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - --- cancel just one connection -SELECT citus.mitmproxy('conn.onQuery(query="UPDATE multi_shard.t2_201005").cancel(' || :pid || ')'); -UPDATE t2 SET c = 4 WHERE b = 2; - --- verify nothing is updated -SELECT count(*) FILTER (WHERE b = 2) AS b2, count(*) FILTER (WHERE c = 4) AS c4 FROM t2; - -RESET citus.multi_shard_commit_protocol; -- -- fail when cascading deletes from foreign key @@ -272,8 +266,6 @@ UPDATE t3 SET b = 1 WHERE b = 2 RETURNING *; -- verify nothing is updated SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; --- switch to 1PC -SET citus.multi_shard_commit_protocol TO '1PC'; SELECT count(*) FILTER (WHERE b = 1) b1, count(*) FILTER (WHERE b = 2) AS b2 FROM t3; diff --git a/src/test/regress/sql/failure_truncate.sql b/src/test/regress/sql/failure_truncate.sql index cf7428bbd..059a62cd9 100644 --- a/src/test/regress/sql/failure_truncate.sql +++ b/src/test/regress/sql/failure_truncate.sql @@ -15,8 +15,7 @@ SET citus.force_max_query_parallelization TO on; SELECT citus.mitmproxy('conn.allow()'); --- we'll start with replication factor 1, 1PC and parallel mode -SET citus.multi_shard_commit_protocol TO '1pc'; +-- we'll start with replication factor 1, 2PC and parallel mode SET citus.shard_count = 4; SET citus.shard_replication_factor = 1; @@ -77,14 +76,13 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; --- kill as soon as the coordinator sends COMMIT --- One shard should not get truncated but the other should --- since it is sent from another connection. --- Thus, we should see a partially successful truncate --- Note: This is the result of using 1pc and there is no way to recover from it +-- kill as soon as the coordinator sends COMMIT PREPARED +-- the transaction succeeds on one placement, and we need to +-- recover prepared statements to see the other placement as well SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); TRUNCATE test_table; SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; @@ -192,8 +190,6 @@ SELECT recover_prepared_transactions(); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; --- now, lets test with 2PC -SET citus.multi_shard_commit_protocol TO '2pc'; -- in the first test, kill just in the first -- response we get from the worker @@ -308,7 +304,6 @@ SELECT recover_prepared_transactions(); SELECT count(*) FROM test_table; -- final set of tests with 2PC and replication factor = 2 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.shard_count = 4; SET citus.shard_replication_factor = 2; diff --git a/src/test/regress/sql/failure_vacuum.sql b/src/test/regress/sql/failure_vacuum.sql index 044ef4a79..b87b78195 100644 --- a/src/test/regress/sql/failure_vacuum.sql +++ b/src/test/regress/sql/failure_vacuum.sql @@ -8,7 +8,6 @@ SELECT citus.mitmproxy('conn.allow()'); SET citus.shard_count = 1; SET citus.shard_replication_factor = 2; -- one shard per worker -SET citus.multi_shard_commit_protocol TO '1pc'; CREATE TABLE vacuum_test (key int, value int); SELECT create_distributed_table('vacuum_test', 'key'); diff --git a/src/test/regress/sql/multi_follower_dml.sql b/src/test/regress/sql/multi_follower_dml.sql index 35a4e254a..dc03f258c 100644 --- a/src/test/regress/sql/multi_follower_dml.sql +++ b/src/test/regress/sql/multi_follower_dml.sql @@ -74,31 +74,25 @@ INSERT INTO the_replicated_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); --- COPY is not possible in 2PC mode +-- COPY is not possible because Citus user 2PC COPY the_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. --- COPY is not possible in 2PC mode COPY the_replicated_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. COPY reference_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. COPY citus_local_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. --- 1PC is possible -SET citus.multi_shard_commit_protocol TO '1pc'; +-- all multi-shard modifications require 2PC hence not supported INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); SELECT * FROM the_table ORDER BY a; + +-- all modifications to reference tables use 2PC, hence not supported INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); SELECT * FROM reference_table ORDER BY a; + +-- citus local tables are on the coordinator, and coordinator is read-only INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7); SELECT * FROM citus_local_table ORDER BY a; @@ -112,26 +106,22 @@ SELECT * FROM del ORDER BY a; WITH del AS (DELETE FROM citus_local_table RETURNING *) SELECT * FROM del ORDER BY a; --- COPY is possible in 1PC mode +-- multi-shard COPY is not possible due to 2PC COPY the_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. COPY reference_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. COPY citus_local_table (a, b, z) FROM STDIN WITH CSV; -10,10,10 -11,11,11 \. SELECT * FROM the_table ORDER BY a; SELECT * FROM reference_table ORDER BY a; SELECT * FROM citus_local_table ORDER BY a; -DELETE FROM the_table; DELETE FROM reference_table; DELETE FROM citus_local_table; +-- multi-shard modification always uses 2PC, so not supported +DELETE FROM the_table; + -- DDL is not possible TRUNCATE the_table; TRUNCATE reference_table; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index a1e69b2a3..ad9ae7168 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -214,7 +214,6 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; -- Test DDL propagation in MX tables SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SET citus.shard_count = 5; -SET citus.multi_shard_commit_protocol TO '2pc'; CREATE SCHEMA mx_test_schema_1; CREATE SCHEMA mx_test_schema_2; @@ -301,7 +300,6 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid, nodename, nodeport; -- Check that CREATE INDEX statement is propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; SET client_min_messages TO 'ERROR'; CREATE INDEX mx_index_3 ON mx_test_schema_2.mx_table_2 USING hash (col1); ALTER TABLE mx_test_schema_2.mx_table_2 ADD CONSTRAINT mx_table_2_col1_key UNIQUE (col1); @@ -313,7 +311,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE -- Check that DROP INDEX statement is propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; DROP INDEX mx_test_schema_2.mx_index_3; \c - - - :worker_1_port SELECT "Column", "Type", "Definition" FROM index_attrs WHERE @@ -321,7 +318,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE -- Check that ALTER TABLE statements are propagated \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; ALTER TABLE @@ -338,7 +334,6 @@ SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1 -- Check that foreign key constraint with NOT VALID works as well \c - - - :master_port -SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; ALTER TABLE mx_test_schema_1.mx_table_1 @@ -822,7 +817,6 @@ DROP TABLE dist_table_1, dist_table_2; RESET citus.shard_count; RESET citus.shard_replication_factor; -RESET citus.multi_shard_commit_protocol; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql index 93cc34ff1..4c46011be 100644 --- a/src/test/regress/sql/multi_modifying_xacts.sql +++ b/src/test/regress/sql/multi_modifying_xacts.sql @@ -253,7 +253,6 @@ SELECT * FROM researchers WHERE lab_id = 6; SELECT count(*) FROM pg_dist_transaction; -- 2pc failure and success tests -SET citus.multi_shard_commit_protocol TO '2pc'; SELECT recover_prepared_transactions(); -- copy with unique index violation BEGIN; @@ -281,7 +280,6 @@ SELECT * FROM researchers WHERE lab_id = 6; -- verify 2pc SELECT count(*) FROM pg_dist_transaction; -RESET citus.multi_shard_commit_protocol; -- create a check function SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$ @@ -304,7 +302,6 @@ ORDER BY nodeport, shardid; -- for replicated tables use 2PC even if multi-shard commit protocol -- is set to 2PC BEGIN; -SET LOCAL citus.multi_shard_commit_protocol TO '1pc'; DELETE FROM researchers WHERE lab_id = 6; \copy researchers FROM STDIN delimiter ',' 31, 6, 'Bjarne Stroustrup' diff --git a/src/test/regress/sql/multi_mx_transaction_recovery.sql b/src/test/regress/sql/multi_mx_transaction_recovery.sql index 241382e49..2a6b4991b 100644 --- a/src/test/regress/sql/multi_mx_transaction_recovery.sql +++ b/src/test/regress/sql/multi_mx_transaction_recovery.sql @@ -12,8 +12,6 @@ SELECT create_distributed_table('test_recovery', 'x'); ALTER SYSTEM SET citus.recover_2pc_interval TO -1; SELECT pg_reload_conf(); -SET citus.multi_shard_commit_protocol TO '2pc'; - -- Ensure pg_dist_transaction is empty for test SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/multi_name_lengths.sql b/src/test/regress/sql/multi_name_lengths.sql index 783e394d1..3328572d4 100644 --- a/src/test/regress/sql/multi_name_lengths.sql +++ b/src/test/regress/sql/multi_name_lengths.sql @@ -4,7 +4,6 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 225000; -SET citus.multi_shard_commit_protocol = '2pc'; SET citus.shard_count TO 2; -- this function is dropped in Citus10, added here for tests diff --git a/src/test/regress/sql/multi_prepare_plsql.sql b/src/test/regress/sql/multi_prepare_plsql.sql index a61be1880..8589e5b5a 100644 --- a/src/test/regress/sql/multi_prepare_plsql.sql +++ b/src/test/regress/sql/multi_prepare_plsql.sql @@ -533,7 +533,6 @@ DROP FUNCTION insert_with_max(text); DROP TABLE func_parameter_test; -- test prepared DDL, mainly to verify we don't mess up the query tree -SET citus.multi_shard_commit_protocol TO '2pc'; CREATE TABLE prepare_ddl (x int, y int); SELECT create_distributed_table('prepare_ddl', 'x'); diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index bbebfd86c..c29273a82 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -70,7 +70,6 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; SET citus.force_max_query_parallelization TO ON; SET citus.shard_replication_factor TO 2; SET citus.shard_count TO 2; -SET citus.multi_shard_commit_protocol TO '2pc'; -- create_distributed_table may behave differently if shards -- created via the executor or not, so not checking its value diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index 1646d7c23..7998d33f1 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -29,7 +29,6 @@ create index idx on dist(a); set citus.log_remote_commands to on; -- make sure that we send the tablespace option -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; reindex(TABLESPACE test_tablespace) index idx; reindex(TABLESPACE test_tablespace, verbose) index idx; diff --git a/src/test/regress/sql/sequential_modifications.sql b/src/test/regress/sql/sequential_modifications.sql index 3a242c453..d3293c5af 100644 --- a/src/test/regress/sql/sequential_modifications.sql +++ b/src/test/regress/sql/sequential_modifications.sql @@ -96,13 +96,11 @@ SELECT distributed_2PCs_are_equal_to_placement_count(); -- even if 1PC used, we use 2PC as we modify replicated tables -- see distributed TXs in the pg_dist_transaction -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); ALTER TABLE test_table ADD CONSTRAINT c_check CHECK(a > 0); SELECT no_distributed_2PCs(); -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); ALTER TABLE test_table ADD CONSTRAINT d_check CHECK(a > 0); @@ -110,7 +108,6 @@ SELECT no_distributed_2PCs(); CREATE TABLE ref_test(a int); SELECT create_reference_table('ref_test'); -SET citus.multi_shard_commit_protocol TO '1pc'; -- reference tables should always use 2PC SET citus.multi_shard_modify_mode TO 'sequential'; @@ -125,13 +122,12 @@ CREATE INDEX ref_test_seq_index_2 ON ref_test(a); SELECT distributed_2PCs_are_equal_to_worker_count(); -- tables with replication factor > 1 should also obey --- both multi_shard_commit_protocol and multi_shard_modify_mode +-- multi_shard_modify_mode SET citus.shard_replication_factor TO 2; CREATE TABLE test_table_rep_2 (a int); SELECT create_distributed_table('test_table_rep_2', 'a'); -- even if 1PC used, we use 2PC with rep > 1 -SET citus.multi_shard_commit_protocol TO '1pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); @@ -144,7 +140,6 @@ CREATE INDEX test_table_rep_2_i_2 ON test_table_rep_2(a); SELECT no_distributed_2PCs(); -- 2PC should always use 2PC with rep > 1 -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); CREATE INDEX test_table_rep_2_i_3 ON test_table_rep_2(a); @@ -157,7 +152,6 @@ SELECT distributed_2PCs_are_equal_to_placement_count(); -- CREATE INDEX CONCURRENTLY should work fine with rep > 1 -- with both 2PC and different parallel modes -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'sequential'; SELECT recover_prepared_transactions(); CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a); @@ -165,7 +159,6 @@ CREATE INDEX CONCURRENTLY test_table_rep_2_i_5 ON test_table_rep_2(a); -- we shouldn't see any distributed transactions SELECT no_distributed_2PCs(); -SET citus.multi_shard_commit_protocol TO '2pc'; SET citus.multi_shard_modify_mode TO 'parallel'; SELECT recover_prepared_transactions(); CREATE INDEX CONCURRENTLY test_table_rep_2_i_6 ON test_table_rep_2(a);