diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 71c3f6e29..b8e706226 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -37,9 +37,9 @@ * same connection since it may hold relevant locks or have uncommitted * writes. In that case we "assign" the task to a connection by adding * it to the task queue of specific connection (in - * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned - * and add it to the task queue of a worker pool, which means that it - * can be executed over any connection in the pool. + * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task + * unassigned and add it to the task queue of a worker pool, which means + * that it can be executed over any connection in the pool. * * A task may be executed on multiple placements in case of a reference * table or a replicated distributed table. Depending on the type of @@ -772,7 +772,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); /* - * We're deliberately not setting execution->rowsProceessed here. The main reason + * We're deliberately not setting execution->rowsProcessed here. The main reason * is that modifications to reference tables would end-up setting it both here * and in AdaptiveExecutor. Instead, we set executorState here and skip updating it * for reference table modifications in AdaptiveExecutor. @@ -884,7 +884,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo = NULL; /* - * The code-paths that rely on this function do not know how execute + * The code-paths that rely on this function do not know how to execute * commands locally. */ ErrorIfTransactionAccessedPlacementsLocally(); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7c60049a4..2006a6261 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -134,7 +134,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) if (paramListInfo != NULL) { - const char **parameterValues = NULL; /* not used anywhere, so decleare here */ + /* not used anywhere, so declare here */ + const char **parameterValues = NULL; ExtractParametersForLocalExecution(paramListInfo, ¶meterTypes, ¶meterValues); @@ -228,6 +229,25 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } +/* + * LogLocalCommand logs commands executed locally on this node. Although we're + * talking about local execution, the function relies on citus.log_remote_commands + * GUC. This makes sense because the local execution is still on a shard of a + * distributed table, meaning it is part of distributed execution. + */ +static void +LogLocalCommand(Task *task) +{ + if (!(LogRemoteCommands || LogLocalCommands)) + { + return; + } + + ereport(NOTICE, (errmsg("executing the command locally: %s", + ApplyLogRedaction(TaskQueryString(task))))); +} + + /* * ExtractLocalAndRemoteTasks gets a taskList and generates two * task lists namely localTaskList and remoteTaskList. The function goes @@ -236,9 +256,9 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT * the lists could be NIL depending on the input taskList. * * One slightly different case is modifications to replicated tables - * (e.g., reference tables) where a single task ends in two seperate tasks - * and the local task is added to localTaskList and the remanings to the - * remoteTaskList. + * (e.g., reference tables) where a single task ends in two separate tasks + * and the local task is added to localTaskList and the remaning ones to + * the remoteTaskList. */ void ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, @@ -280,8 +300,6 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - Task *remoteTask = NULL; - /* * At this point, we're dealing with reference tables or intermediate results * where the task has placements on both local and remote nodes. We always @@ -301,7 +319,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - remoteTask = copyObject(task); + Task *remoteTask = copyObject(task); remoteTask->taskPlacementList = remoteTaskPlacementList; *remoteTaskList = lappend(*remoteTaskList, remoteTask); @@ -353,7 +371,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que { EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore); + DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); int eflags = 0; @@ -363,14 +381,14 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que * Use the tupleStore provided by the scanState because it is shared accross * the other task executions and the adaptive executor. */ - SetTuplestoreDestReceiverParams(tupleStoreDestReceiever, + SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, scanState->tuplestorestate, CurrentMemoryContext, false); /* Create a QueryDesc for the query */ QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString, GetActiveSnapshot(), InvalidSnapshot, - tupleStoreDestReceiever, paramListInfo, + tupleStoreDestReceiver, paramListInfo, queryEnv, 0); ExecutorStart(queryDesc, eflags); @@ -430,6 +448,7 @@ ShouldExecuteTasksLocally(List *taskList) } bool singleTask = (list_length(taskList) == 1); + if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList))) { /* @@ -455,7 +474,7 @@ ShouldExecuteTasksLocally(List *taskList) { /* * For multi-task executions, switching to local execution would likely to - * perform poorly, because we'd lose the parallelizm. Note that the local + * perform poorly, because we'd lose the parallelism. Note that the local * execution is happening one task at a time (e.g., similar to sequential * distributed execution). */ @@ -515,25 +534,6 @@ ErrorIfTransactionAccessedPlacementsLocally(void) } -/* - * LogLocalCommand logs commands executed locally on this node. Although we're - * talking about local execution, the function relies on citus.log_remote_commands GUC. - * This makes sense because the local execution is still on a shard of a distributed table, - * meaning it is part of distributed execution. - */ -static void -LogLocalCommand(Task *task) -{ - if (!(LogRemoteCommands || LogLocalCommands)) - { - return; - } - - ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); -} - - /* * DisableLocalExecution simply a C interface for * setting the following: diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 8cd02dfc2..8303254c8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -27,6 +27,7 @@ #include "commands/dbcommands.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_client_executor.h" @@ -77,6 +78,12 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList, Node *deleteCriteria); static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList); +static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, + const char *shardRelationName, + const char *dropShardPlacementCommand); +static char * CreateDropShardPlacementCommand(const char *schemaName, + const char *shardRelationName, char + storageType); /* exports for SQL callable functions */ @@ -326,7 +333,7 @@ master_drop_sequences(PG_FUNCTION_ARGS) /* * CheckTableSchemaNameForDrop errors out if the current user does not - * have permission to undistribute the given relation, taking into + * have permission to un-distribute the given relation, taking into * account that it may be called from the drop trigger. If the table exists, * the function rewrites the given table and schema name. */ @@ -359,93 +366,76 @@ static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList) { - ListCell *shardIntervalCell = NULL; + Assert(OidIsValid(relationId)); + Assert(schemaName != NULL); + Assert(relationName != NULL); UseCoordinatedTransaction(); - /* At this point we intentionally decided to not use 2PC for reference tables */ + /* + * At this point we intentionally decided to not use 2PC for reference + * tables + */ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { CoordinatedTransactionUse2PC(); } - foreach(shardIntervalCell, deletableShardIntervalList) + ShardInterval *shardInterval = NULL; + + foreach_ptr(shardInterval, deletableShardIntervalList) { - ListCell *shardPlacementCell = NULL; - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; char *shardRelationName = pstrdup(relationName); Assert(shardInterval->relationId == relationId); - /* Build shard relation name. */ + /* build shard relation name */ AppendShardIdToName(&shardRelationName, shardId); - char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); List *shardPlacementList = ShardPlacementList(shardId); - foreach(shardPlacementCell, shardPlacementList) + + ShardPlacement *shardPlacement = NULL; + + foreach_ptr(shardPlacement, shardPlacementList) { - ShardPlacement *shardPlacement = - (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - StringInfo workerDropQuery = makeStringInfo(); - uint32 connectionFlags = FOR_DDL; + uint64 shardPlacementId = shardPlacement->placementId; - char storageType = shardInterval->storageType; - if (storageType == SHARD_STORAGE_TABLE) - { - appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, - quotedShardName); - } - else if (storageType == SHARD_STORAGE_COLUMNAR || - storageType == SHARD_STORAGE_FOREIGN) - { - appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, - quotedShardName); - } - - /* - * The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we - * try to drop it over another connection, we will get into a distributed - * deadlock. - */ if (shardPlacement->groupId == COORDINATOR_GROUP_ID && IsCoordinator() && DropSchemaOrDBInProgress()) { - DeleteShardPlacementRow(shardPlacement->placementId); - continue; + /* + * The active DROP SCHEMA/DATABASE ... CASCADE will drop the + * shard, if we try to drop it over another connection, we will + * get into a distributed deadlock. + */ } - - MultiConnection *connection = GetPlacementConnection(connectionFlags, - shardPlacement, - NULL); - - RemoteTransactionBeginIfNecessary(connection); - - if (PQstatus(connection->pgConn) != CONNECTION_OK) + else { - uint64 placementId = shardPlacement->placementId; + char storageType = shardInterval->storageType; - ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " - "\"%s:%u\"", shardRelationName, workerName, - workerPort), - errdetail("Marking this shard placement for " - "deletion"))); + const char *dropShardPlacementCommand = + CreateDropShardPlacementCommand(schemaName, shardRelationName, + storageType); - UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE); - - continue; + /* + * Try to open a new connection (or use an existing one) to + * connect to target node to drop shard placement over that + * remote connection + */ + ExecuteDropShardPlacementCommandRemotely(shardPlacement, + shardRelationName, + dropShardPlacementCommand); } - MarkRemoteTransactionCritical(connection); - - ExecuteCriticalRemoteCommand(connection, workerDropQuery->data); - - DeleteShardPlacementRow(shardPlacement->placementId); + DeleteShardPlacementRow(shardPlacementId); } + /* + * Now that we deleted all placements of the shard (or their metadata), + * delete the shard metadata as well. + */ DeleteShardRow(shardId); } @@ -455,6 +445,89 @@ DropShards(Oid relationId, char *schemaName, char *relationName, } +/* + * ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command + * via remote critical connection. + */ +static void +ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, + const char *shardRelationName, + const char *dropShardPlacementCommand) +{ + Assert(shardPlacement != NULL); + Assert(shardRelationName != NULL); + Assert(dropShardPlacementCommand != NULL); + + uint32 connectionFlags = FOR_DDL; + MultiConnection *connection = GetPlacementConnection(connectionFlags, + shardPlacement, + NULL); + + RemoteTransactionBeginIfNecessary(connection); + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + uint64 placementId = shardPlacement->placementId; + + char *workerName = shardPlacement->nodeName; + uint32 workerPort = shardPlacement->nodePort; + + ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " + "\"%s:%u\"", shardRelationName, workerName, + workerPort), + errdetail("Marking this shard placement for " + "deletion"))); + + UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE); + + return; + } + + MarkRemoteTransactionCritical(connection); + + ExecuteCriticalRemoteCommand(connection, dropShardPlacementCommand); +} + + +/* + * CreateDropShardPlacementCommand function builds the DROP command to drop + * the given shard relation by qualifying it with schema name according to + * shard relation's storage type. + */ +static char * +CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName, + char storageType) +{ + Assert(schemaName != NULL); + Assert(shardRelationName != NULL); + + StringInfo workerDropQuery = makeStringInfo(); + + const char *quotedShardName = quote_qualified_identifier(schemaName, + shardRelationName); + + /* build workerDropQuery according to shard storage type */ + if (storageType == SHARD_STORAGE_TABLE) + { + appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, + quotedShardName); + } + else if (storageType == SHARD_STORAGE_COLUMNAR || + storageType == SHARD_STORAGE_FOREIGN) + { + appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, + quotedShardName); + } + else + { + /* no other storage type is expected here */ + Assert(false); + } + + return workerDropQuery->data; +} + + /* Checks that delete is only on one table. */ static void CheckTableCount(Query *deleteQuery) diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 94de213cf..8647eaf5b 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -27,7 +27,6 @@ #include "utils/lsyscache.h" #include "utils/rel.h" - static List * TruncateTaskList(Oid relationId); @@ -89,29 +88,36 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) static List * TruncateTaskList(Oid relationId) { - List *shardIntervalList = LoadShardIntervalList(relationId); - ListCell *shardIntervalCell = NULL; + /* resulting task list */ List *taskList = NIL; + + /* enumerate the tasks when putting them to the taskList */ int taskId = 1; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *relationName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + /* lock metadata before getting placement lists */ LockShardListMetadata(shardIntervalList, ShareLock); + ListCell *shardIntervalCell = NULL; + foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; + char *shardRelationName = pstrdup(relationName); + + /* build shard relation name */ + AppendShardIdToName(&shardRelationName, shardId); + + char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); + StringInfo shardQueryString = makeStringInfo(); - char *shardName = pstrdup(relationName); - - AppendShardIdToName(&shardName, shardId); - - appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", - quote_qualified_identifier(schemaName, shardName)); + appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", quotedShardName); Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 1a0d8ff24..98e69bf97 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -20,16 +20,14 @@ extern bool LogLocalCommands; extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; +/* extern function declarations */ extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); +extern bool AnyTaskAccessesLocalNode(List *taskList); +extern bool TaskAccessesLocalNode(Task *task); extern void ErrorIfTransactionAccessedPlacementsLocally(void); -extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList); -extern char * TaskQueryString(Task *task); -extern bool TaskAccessesLocalNode(Task *task); extern void DisableLocalExecution(void); -extern bool AnyTaskAccessesRemoteNode(List *taskList); -extern bool TaskAccessesLocalNode(Task *task); #endif /* LOCAL_EXECUTION_H */