From 738b41c0554780dc6097d2e0d39b1699457e384e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 17 Aug 2023 09:46:01 +0200 Subject: [PATCH] Add citus.distributed_data_dump to only return results from local shards --- src/backend/distributed/cdc/cdc_decoder.c | 7 ++ src/backend/distributed/commands/multi_copy.c | 27 ++++- .../distributed/executor/adaptive_executor.c | 42 +++++++ src/backend/distributed/shared_library_init.c | 16 +++ .../distributed/utils/task_execution_utils.c | 104 ++++++++++++++++++ src/include/distributed/multi_executor.h | 3 + .../distributed/task_execution_utils.h | 2 + 7 files changed, 200 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index 2beb27772..f96752646 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -315,6 +315,13 @@ PublishDistributedTableChanges(LogicalDecodingContext *ctx, ReorderBufferTXN *tx return; } + /* + * TODO: consider replicated shards + * + * We should only emit from one of the replicas, and it should align with the + * IsDistributedDataDump logic. + */ + /* translate and publish from shard relation to distributed table relation for CDC. */ TranslateAndPublishRelationForCDC(ctx, txn, relation, change, shardId, distRelationId); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index eeed0a025..3b7c4fb56 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -92,6 +92,7 @@ #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" #include "distributed/shared_connection_stats.h" +#include "distributed/task_execution_utils.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/local_multi_copy.h" @@ -3010,11 +3011,33 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag) ShardInterval *shardInterval = lfirst(shardIntervalCell); List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId); ListCell *shardPlacementCell = NULL; - int placementIndex = 0; + int placementIndex = -1; StringInfo copyCommand = ConstructCopyStatement(copyStatement, shardInterval->shardId); + /* + * When citus.distributed_data_dump is enabled, only emit from local shards. + * + * That way, users can run COPY table TO STDOUT on all nodes to get a full + * copy of the data with much higher bandwidth than running it via the + * coordinator. Moreover, each command can use a snapshot that aligns with + * a specific replication slot. + */ + if (IsDistributedDataDump) + { + List *newPlacementList = + TaskPlacementListForDistributedDataDump(shardPlacementList); + + if (newPlacementList == NIL) + { + /* shard does not have local placements */ + continue; + } + + shardPlacementList = newPlacementList; + } + foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *shardPlacement = lfirst(shardPlacementCell); @@ -3022,6 +3045,8 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag) char *userName = NULL; const bool raiseErrors = true; + placementIndex++; + MultiConnection *connection = GetPlacementConnection(connectionFlags, shardPlacement, userName); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 18dbed49d..1900f3d36 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -161,6 +161,7 @@ #include "distributed/resource_lock.h" #include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" +#include "distributed/task_execution_utils.h" #include "distributed/transaction_management.h" #include "distributed/transaction_identifier.h" #include "distributed/tuple_destination.h" @@ -854,6 +855,47 @@ AdaptiveExecutor(CitusScanState *scanState) MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo); } + /* + * When citus.distributed_data_dump is enabled, only emit from local shards. + * + * That way, users can run SELECT * FROM table on all nodes to get a full + * copy of the data with much higher bandwidth than running it via the + * coordinator. Moreover, each command can use a snapshot that aligns with + * a specific replication slot. + */ + if (IsDistributedDataDump) + { + /* + * Throw errors for writes and complex queries to not cause confusion / + * data integrity issues. + */ + if (job->jobQuery->commandType != CMD_SELECT) + { + ereport(ERROR, (errmsg("can only use citus.distributed_data_dump for " + "SELECT queries"))); + } + + if (distributedPlan->modLevel != ROW_MODIFY_READONLY) + { + ereport(ERROR, (errmsg("can only use citus.distributed_data_dump for " + "read-only queries"))); + } + + if (list_length(distributedPlan->relationIdList) != 1) + { + ereport(ERROR, (errmsg("can only use citus.distributed_data_dump for " + "single-table queries"))); + } + + if (hasDependentJobs) + { + ereport(ERROR, (errmsg("cannot use citus.distributed_data_dump for " + "re-partition joins"))); + } + + taskList = TaskListForDistributedDataDump(taskList); + } + DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, taskList, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e84aa282a..9b560805c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1166,6 +1166,22 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.distributed_data_dump", + gettext_noop("When enabled, queries only return data from local shards"), + gettext_noop("When you need a full copy of a set of Citus tables, it can be " + "useful to only return data from local shards, or from the first " + "replica of replicated shards (incl. reference tables). That way, " + "you can pull from all nodes concurrently and construct a complete " + "snapshot. This is also necessary for logical replications " + "scenarios, since the snapshot of the data on each node needs to " + "be aligned with the replication slot on that node."), + &IsDistributedDataDump, + false, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomRealVariable( "citus.distributed_deadlock_detection_factor", gettext_noop("Sets the time to wait before checking for distributed " diff --git a/src/backend/distributed/utils/task_execution_utils.c b/src/backend/distributed/utils/task_execution_utils.c index 50652b6bd..f48ae57c8 100644 --- a/src/backend/distributed/utils/task_execution_utils.c +++ b/src/backend/distributed/utils/task_execution_utils.c @@ -57,6 +57,9 @@ static Task * TaskHashEnter(HTAB *taskHash, Task *task); static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId, uint32 taskId); +/* citus.distributed_data_dump GUC */ +bool IsDistributedDataDump; + /* * CreateTaskListForJobTree visits all tasks in the job tree (by following dependentTaskList), * starting with the given job's task list. The function then returns the list. @@ -194,3 +197,104 @@ TaskHashLookup(HTAB *taskHash, TaskType taskType, uint64 jobId, uint32 taskId) return task; } + + +/* + * TaskListForDistributedDataDump returns a task list that can be used + * in cases where citus.distributed_data_dump is true by only including + * local tasks. + */ +List * +TaskListForDistributedDataDump(List *taskList) +{ + List *newTaskList = NIL; + + Task *task = NULL; + foreach_ptr(task, taskList) + { + List *newPlacementList = + TaskPlacementListForDistributedDataDump(task->taskPlacementList); + + if (newPlacementList == NIL) + { + /* skip task if there are no placements */ + continue; + } + + task->taskPlacementList = newPlacementList; + newTaskList = lappend(newTaskList, task); + } + + return newTaskList; +} + + +/* + * TaskPlacementListForDistributedDataDump implements the logic for find task + * placements to use when doing a distributed data dump. + * + * It returns a new task placement list that only includes a local + * placement (if any). In case of a replicated placement (e.g. reference + * table), it only returns the placement on the coordinator or the one with + * the lowest ID. + * + * This logic should align with how CDC handles replicated placements. + */ +List * +TaskPlacementListForDistributedDataDump(List *taskPlacementList) +{ + List *newPlacementList = NIL; + ShardPlacement *minPlacement = NULL; + bool isCoordinator = IsCoordinator(); + int localGroupId = GetLocalGroupId(); + + ShardPlacement *taskPlacement = NULL; + foreach_ptr(taskPlacement, taskPlacementList) + { + if (taskPlacement->groupId != localGroupId) + { + /* skip all non-local shard placements */ + continue; + } + + if (isCoordinator) + { + /* + * If the coordinator has a placement, we prefer to emit from + * the coordinator. This is mainly to align with + * PublishDistributedTableChanges, which only emits changes + * to reference tables from the coordinator. + */ + newPlacementList = lappend(newPlacementList, taskPlacement); + break; + } + else if (taskPlacement->groupId == COORDINATOR_GROUP_ID) + { + /* + * We are not the coordinator, but there is a coordinator placement. + * The coordinator should emit instead. + */ + minPlacement = NULL; + } + else if (minPlacement == NULL || + taskPlacement->placementId < minPlacement->placementId) + { + /* + * For replicated shards that do not have a coordinator placement, + * use the placement with the lowest ID. + */ + minPlacement = taskPlacement; + } + } + + if (minPlacement != NULL) + { + /* + * Emit the local placement if it is the one with the lowest + * placement ID. + */ + newPlacementList = lappend(newPlacementList, minPlacement); + } + + return newPlacementList; +} diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 5ae010d87..6f4212b5c 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -72,6 +72,9 @@ extern int ExecutorSlowStartInterval; extern bool SortReturning; extern int ExecutorLevel; +/* citus.distributed_data_dump GUC */ +extern bool IsDistributedDataDump; + extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, diff --git a/src/include/distributed/task_execution_utils.h b/src/include/distributed/task_execution_utils.h index e7b5a2ee4..250cc4ec5 100644 --- a/src/include/distributed/task_execution_utils.h +++ b/src/include/distributed/task_execution_utils.h @@ -2,5 +2,7 @@ #define TASK_EXECUTION_UTILS_H extern List * CreateTaskListForJobTree(List *jobTaskList); +extern List * TaskListForDistributedDataDump(List *taskList); +extern List * TaskPlacementListForDistributedDataDump(List *taskPlacementList); #endif /* TASK_EXECUTION_UTILS_H */