diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 43c8c5397..885d8a10e 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -43,6 +43,7 @@ #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" +#include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "optimizer/clauses.h" @@ -60,6 +61,10 @@ #include "utils/varlena.h" #endif +#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" +#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE" + + static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); @@ -208,27 +213,23 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); /* - * We should execute "TRUNCATE table_name;" on the other worker nodes before + * We lock the relation we're TRUNCATING on the other worker nodes before * executing the truncate commands on the shards. This is necessary to prevent * distributed deadlocks where a concurrent operation on the same table (or a * cascading table) is executed on the other nodes. * - * Note that we should skip the current node to prevent a self-deadlock. + * Note that we should skip the current node to prevent a self-deadlock that's why + * we use OTHER_WORKERS tag. */ if (truncateOperation && ShouldSyncTableMetadata(relationId)) { - SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, - DISABLE_DDL_PROPAGATION); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + StringInfo lockRelation = makeStringInfo(); + appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName, + REMOTE_LOCK_MODE_FOR_TRUNCATE); - /* - * Note that here we ignore the schema and send the queryString as is - * since citus_truncate_trigger already uses qualified table name. - * If that was not the case, we should also had to set the search path - * as we do for regular DDLs. - */ - SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, - queryString); + SendCommandToWorkers(OTHER_WORKERS, lockRelation->data); } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) @@ -342,7 +343,7 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) LOCKMODE lockMode = NoLock; /* ensure that we're in a transaction block */ - RequireTransactionChain(true, "lock_relation_if_exists"); + RequireTransactionBlock(true, "lock_relation_if_exists"); relationId = ResolveRelationId(relationName, true); if (!OidIsValid(relationId)) @@ -353,7 +354,6 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) /* get the lock mode */ lockMode = LockModeTextToLockMode(lockModeCString); - /* resolve relationId from passed in schema and relation name */ relationNameList = textToQualifiedNameList(relationName); relation = makeRangeVarFromNameList(relationNameList); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 4e4db59d2..152499843 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -117,14 +117,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) int nodePort = workerNode->workerPort; int connectionFlags = FORCE_NEW_CONNECTION; - if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == GetLocalGroupId()) { continue; @@ -177,14 +176,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, MultiConnection *connection = NULL; int connectionFlags = 0; - if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == GetLocalGroupId()) { continue; diff --git a/src/include/distributed/errormessage.h b/src/include/distributed/errormessage.h index 4626da6b6..0eeb7fd04 100644 --- a/src/include/distributed/errormessage.h +++ b/src/include/distributed/errormessage.h @@ -56,7 +56,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel); \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) #else /* !HAVE_BUILTIN_CONSTANT_P */ #define RaiseDeferredError(error, elevel) \ do { \ @@ -64,7 +65,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel_); \ if (elevel_ >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) #endif /* HAVE_BUILTIN_CONSTANT_P */ void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 04f619d6a..f68051283 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -103,7 +103,7 @@ extern bool IsCoordinator(void); extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); -extern Oid ResolveRelationId(text *relationName, bool failOK); +extern Oid ResolveRelationId(text *relationName, bool missingOk); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation); extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation); extern List * GetTableIndexAndConstraintCommands(Oid relationId); @@ -151,6 +151,7 @@ extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); extern Datum master_drop_sequences(PG_FUNCTION_ARGS); extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); +extern Datum lock_relation_if_exists(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); /* function declarations for shard creation functionality */ diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 9589825d3..19c8025b9 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -37,6 +37,7 @@ /* following functions are renamed in PG11 */ #define PreventInTransactionBlock PreventTransactionChain #define DatumGetJsonbP(d) DatumGetJsonb(d) +#define RequireTransactionBlock RequireTransactionChain /* following defines also exist for PG11 */ #define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 048e684cc..73c1eb3a2 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -22,7 +22,7 @@ typedef enum TargetWorkerSet { WORKERS_WITH_METADATA, - OTHER_WORKERS_WITH_METADATA, + OTHER_WORKERS, ALL_WORKERS } TargetWorkerSet;