mirror of https://github.com/citusdata/citus.git
Properly send commands to other nodes
We previously implemented OTHER_WORKERS_WITH_METADATA tag. However, that was wrong. See the related discussion: https://github.com/citusdata/citus/issues/2320 Instead, we switched using OTHER_WORKER_NODES and make the command that we're running optional such that even if the node is not a metadata node, we won't be in trouble.pull/2370/head
parent
5cf8fbe7b6
commit
76aa6951c2
|
@ -43,6 +43,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "optimizer/clauses.h"
|
#include "optimizer/clauses.h"
|
||||||
|
@ -60,6 +61,10 @@
|
||||||
#include "utils/varlena.h"
|
#include "utils/varlena.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');"
|
||||||
|
#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE"
|
||||||
|
|
||||||
|
|
||||||
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
||||||
taskType);
|
taskType);
|
||||||
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
||||||
|
@ -208,27 +213,23 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We should execute "TRUNCATE table_name;" on the other worker nodes before
|
* We lock the relation we're TRUNCATING on the other worker nodes before
|
||||||
* executing the truncate commands on the shards. This is necessary to prevent
|
* executing the truncate commands on the shards. This is necessary to prevent
|
||||||
* distributed deadlocks where a concurrent operation on the same table (or a
|
* distributed deadlocks where a concurrent operation on the same table (or a
|
||||||
* cascading table) is executed on the other nodes.
|
* cascading table) is executed on the other nodes.
|
||||||
*
|
*
|
||||||
* Note that we should skip the current node to prevent a self-deadlock.
|
* Note that we should skip the current node to prevent a self-deadlock that's why
|
||||||
|
* we use OTHER_WORKERS tag.
|
||||||
*/
|
*/
|
||||||
if (truncateOperation && ShouldSyncTableMetadata(relationId))
|
if (truncateOperation && ShouldSyncTableMetadata(relationId))
|
||||||
{
|
{
|
||||||
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
|
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||||
DISABLE_DDL_PROPAGATION);
|
StringInfo lockRelation = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName,
|
||||||
|
REMOTE_LOCK_MODE_FOR_TRUNCATE);
|
||||||
|
|
||||||
/*
|
SendCommandToWorkers(OTHER_WORKERS, lockRelation->data);
|
||||||
* Note that here we ignore the schema and send the queryString as is
|
|
||||||
* since citus_truncate_trigger already uses qualified table name.
|
|
||||||
* If that was not the case, we should also had to set the search path
|
|
||||||
* as we do for regular DDLs.
|
|
||||||
*/
|
|
||||||
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
|
|
||||||
queryString);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
|
@ -342,7 +343,7 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
LOCKMODE lockMode = NoLock;
|
LOCKMODE lockMode = NoLock;
|
||||||
|
|
||||||
/* ensure that we're in a transaction block */
|
/* ensure that we're in a transaction block */
|
||||||
RequireTransactionChain(true, "lock_relation_if_exists");
|
RequireTransactionBlock(true, "lock_relation_if_exists");
|
||||||
|
|
||||||
relationId = ResolveRelationId(relationName, true);
|
relationId = ResolveRelationId(relationName, true);
|
||||||
if (!OidIsValid(relationId))
|
if (!OidIsValid(relationId))
|
||||||
|
@ -353,7 +354,6 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
/* get the lock mode */
|
/* get the lock mode */
|
||||||
lockMode = LockModeTextToLockMode(lockModeCString);
|
lockMode = LockModeTextToLockMode(lockModeCString);
|
||||||
|
|
||||||
|
|
||||||
/* resolve relationId from passed in schema and relation name */
|
/* resolve relationId from passed in schema and relation name */
|
||||||
relationNameList = textToQualifiedNameList(relationName);
|
relationNameList = textToQualifiedNameList(relationName);
|
||||||
relation = makeRangeVarFromNameList(relationNameList);
|
relation = makeRangeVarFromNameList(relationNameList);
|
||||||
|
|
|
@ -117,14 +117,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
|
||||||
int nodePort = workerNode->workerPort;
|
int nodePort = workerNode->workerPort;
|
||||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||||
|
|
||||||
if ((targetWorkerSet == WORKERS_WITH_METADATA ||
|
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
||||||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
|
|
||||||
!workerNode->hasMetadata)
|
!workerNode->hasMetadata)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
|
if (targetWorkerSet == OTHER_WORKERS &&
|
||||||
workerNode->groupId == GetLocalGroupId())
|
workerNode->groupId == GetLocalGroupId())
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
@ -177,14 +176,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
|
|
||||||
if ((targetWorkerSet == WORKERS_WITH_METADATA ||
|
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
||||||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
|
|
||||||
!workerNode->hasMetadata)
|
!workerNode->hasMetadata)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
|
if (targetWorkerSet == OTHER_WORKERS &&
|
||||||
workerNode->groupId == GetLocalGroupId())
|
workerNode->groupId == GetLocalGroupId())
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -56,7 +56,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons
|
||||||
RaiseDeferredErrorInternal(error, elevel); \
|
RaiseDeferredErrorInternal(error, elevel); \
|
||||||
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
|
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
|
||||||
pg_unreachable(); } \
|
pg_unreachable(); } \
|
||||||
} while (0)
|
} \
|
||||||
|
while (0)
|
||||||
#else /* !HAVE_BUILTIN_CONSTANT_P */
|
#else /* !HAVE_BUILTIN_CONSTANT_P */
|
||||||
#define RaiseDeferredError(error, elevel) \
|
#define RaiseDeferredError(error, elevel) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -64,7 +65,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons
|
||||||
RaiseDeferredErrorInternal(error, elevel_); \
|
RaiseDeferredErrorInternal(error, elevel_); \
|
||||||
if (elevel_ >= ERROR) { \
|
if (elevel_ >= ERROR) { \
|
||||||
pg_unreachable(); } \
|
pg_unreachable(); } \
|
||||||
} while (0)
|
} \
|
||||||
|
while (0)
|
||||||
#endif /* HAVE_BUILTIN_CONSTANT_P */
|
#endif /* HAVE_BUILTIN_CONSTANT_P */
|
||||||
|
|
||||||
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);
|
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);
|
||||||
|
|
|
@ -103,7 +103,7 @@ extern bool IsCoordinator(void);
|
||||||
extern bool CStoreTable(Oid relationId);
|
extern bool CStoreTable(Oid relationId);
|
||||||
extern uint64 GetNextShardId(void);
|
extern uint64 GetNextShardId(void);
|
||||||
extern uint64 GetNextPlacementId(void);
|
extern uint64 GetNextPlacementId(void);
|
||||||
extern Oid ResolveRelationId(text *relationName, bool failOK);
|
extern Oid ResolveRelationId(text *relationName, bool missingOk);
|
||||||
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
||||||
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
||||||
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
||||||
|
@ -151,6 +151,7 @@ extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
|
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
|
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
|
||||||
|
extern Datum lock_relation_if_exists(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
/* function declarations for shard creation functionality */
|
/* function declarations for shard creation functionality */
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
/* following functions are renamed in PG11 */
|
/* following functions are renamed in PG11 */
|
||||||
#define PreventInTransactionBlock PreventTransactionChain
|
#define PreventInTransactionBlock PreventTransactionChain
|
||||||
#define DatumGetJsonbP(d) DatumGetJsonb(d)
|
#define DatumGetJsonbP(d) DatumGetJsonb(d)
|
||||||
|
#define RequireTransactionBlock RequireTransactionChain
|
||||||
|
|
||||||
/* following defines also exist for PG11 */
|
/* following defines also exist for PG11 */
|
||||||
#define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION
|
#define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
typedef enum TargetWorkerSet
|
typedef enum TargetWorkerSet
|
||||||
{
|
{
|
||||||
WORKERS_WITH_METADATA,
|
WORKERS_WITH_METADATA,
|
||||||
OTHER_WORKERS_WITH_METADATA,
|
OTHER_WORKERS,
|
||||||
ALL_WORKERS
|
ALL_WORKERS
|
||||||
} TargetWorkerSet;
|
} TargetWorkerSet;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue