diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 0a0adc02f..7089c4a4b 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job) const char *workerHashName = "Worker node hash"; WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList)); - workerNodeList = ActivePrimaryNodeList(); + workerNodeList = ActiveReadableNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); /* initialize task execution structures for remote execution */ diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index a7b2c42a0..5d248c8a4 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -71,7 +71,9 @@ JobExecutorType(MultiPlan *multiPlan) " queries on the workers."))); } - workerNodeList = ActivePrimaryNodeList(); + Assert(multiPlan->operation == CMD_SELECT); + + workerNodeList = ActiveReadableNodeList(); workerNodeCount = list_length(workerNodeList); taskCount = list_length(job->taskList); tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 00fcf8f3e..ce3327807 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -190,7 +190,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - workerNodeList = ActivePrimaryNodeList(); + workerNodeList = ActiveReadableNodeList(); taskTrackerCount = (uint32) list_length(workerNodeList); taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList); diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index f2bbab8b2..ccbb35bcc 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -174,7 +174,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery) ErrorIfNotSuitableToGetSize(relationId); - workerNodeList = ActivePrimaryNodeList(); + workerNodeList = ActiveReadableNodeList(); foreach(workerNodeCell, workerNodeList) { diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index aab1f7f4c..d4855f956 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -308,6 +308,19 @@ ActivePrimaryNodeCount(void) } +/* + * ActiveReadableNodeCount returns the number of groups with a node we can read from. + */ +uint32 +ActiveReadableNodeCount(void) +{ + List *workerNodeList = ActiveReadableNodeList(); + uint32 liveWorkerCount = list_length(workerNodeList); + + return liveWorkerCount; +} + + /* * ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash */ @@ -319,6 +332,8 @@ ActivePrimaryNodeList(void) HTAB *workerNodeHash = GetWorkerNodeHash(); HASH_SEQ_STATUS status; + EnsureModificationsCanRun(); + hash_seq_init(&status, workerNodeHash); while ((workerNode = hash_seq_search(&status)) != NULL) @@ -335,6 +350,42 @@ ActivePrimaryNodeList(void) } +/* + * ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from. + */ +List * +ActiveReadableNodeList(void) +{ + List *workerNodeList = NIL; + WorkerNode *workerNode = NULL; + HTAB *workerNodeHash = GetWorkerNodeHash(); + HASH_SEQ_STATUS status; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + WorkerNode *workerNodeCopy; + + if (!workerNode->isActive) + { + continue; + } + + if (!WorkerNodeIsReadable(workerNode)) + { + continue; + } + + workerNodeCopy = palloc0(sizeof(WorkerNode)); + memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); + workerNodeList = lappend(workerNodeList, workerNodeCopy); + } + + return workerNodeList; +} + + /* * PrimaryNodesNotInList scans through the worker node hash and returns a list of all * primary nodes which are not in currentList. It runs in O(n*m) but currentList is diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index d1e5cd987..55c96e596 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -87,6 +87,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) EnsureCoordinator(); EnsureSuperUser(); + EnsureModificationsCanRun(); CheckCitusVersion(ERROR); PreventTransactionChain(true, "start_metadata_sync_to_node"); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c1f25d80f..fcee39f40 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1863,7 +1863,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey, static uint32 HashPartitionCount(void) { - uint32 groupCount = ActivePrimaryNodeCount(); + uint32 groupCount = ActiveReadableNodeCount(); double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0; uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode); @@ -4791,7 +4791,7 @@ GreedyAssignTaskList(List *taskList) uint32 taskCount = list_length(taskList); /* get the worker node list and sort the list */ - List *workerNodeList = ActivePrimaryNodeList(); + List *workerNodeList = ActiveReadableNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* @@ -5223,7 +5223,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = ActivePrimaryNodeList(); + List *workerNodeList = ActiveReadableNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 347279798..74af951f2 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -322,6 +322,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query if (IsModifyCommand(query)) { + EnsureModificationsCanRun(); + if (InsertSelectIntoDistributedTable(originalQuery)) { distributedPlan = diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1f4a2b7bf..ac71a5f29 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1617,7 +1617,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon } else if (replacePrunedQueryWithDummy) { - List *workerNodeList = ActivePrimaryNodeList(); + List *workerNodeList = ActiveReadableNodeList(); if (workerNodeList != NIL) { WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 32ca840b0..fe842faca 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -26,6 +26,7 @@ #include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_explain.h" #include "distributed/multi_join_order.h" @@ -89,6 +90,12 @@ static const struct config_enum_entry shard_placement_policy_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry use_secondary_nodes_options[] = { + { "never", USE_SECONDARY_NODES_NEVER, false }, + { "always", USE_SECONDARY_NODES_ALWAYS, false }, + { NULL, 0, false } +}; + static const struct config_enum_entry multi_shard_commit_protocol_options[] = { { "1pc", COMMIT_PROTOCOL_1PC, false }, { "2pc", COMMIT_PROTOCOL_2PC, false }, @@ -650,6 +657,16 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.use_secondary_nodes", + gettext_noop("Sets the policy to use when choosing nodes for SELECT queries."), + NULL, + &ReadFromSecondaries, + USE_SECONDARY_NODES_NEVER, use_secondary_nodes_options, + PGC_SU_BACKEND, + 0, + NULL, NULL, NULL); + DefineCustomEnumVariable( "citus.multi_task_query_log_level", gettext_noop("Sets the level of multi task query execution log messages"), diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index e2090061d..d3f2d51d8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -91,7 +91,7 @@ dump_global_wait_edges(PG_FUNCTION_ARGS) WaitGraph * BuildGlobalWaitGraph(void) { - List *workerNodeList = ActivePrimaryNodeList(); + List *workerNodeList = ActiveReadableNodeList(); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); List *connectionList = NIL; diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 76e718e8d..f02e4e4c8 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,9 @@ #include "utils/typcache.h" +/* user configuration */ +int ReadFromSecondaries = USE_SECONDARY_NODES_NEVER; + /* * ShardCacheEntry represents an entry in the shardId -> ShardInterval cache. * To avoid duplicating data and invalidation logic between this cache and the @@ -186,6 +189,7 @@ static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); +static WorkerNode * LookupNodeForGroup(uint32 groupid); /* exports for SQL callable functions */ @@ -196,6 +200,27 @@ PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); +/* + * EnsureModificationsCanRun checks if the current node is in recovery mode or + * citus.use_secondary_nodes is 'alwaus'. If either is true the function errors out. + */ +void +EnsureModificationsCanRun(void) +{ + if (RecoveryInProgress()) + { + ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"), + errdetail("the database is in recovery mode"))); + } + + if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS) + { + ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"), + errdetail("citus.use_secondary_nodes is set to 'always'"))); + } +} + + /* * IsDistributedTable returns whether relationId is a distributed relation or * not. @@ -428,22 +453,10 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, DistTableCacheEntry *tableEntry = shardEntry->tableEntry; int shardIndex = shardEntry->shardIndex; ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex]; - bool groupContainsNodes = false; ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); uint32 groupId = groupShardPlacement->groupId; - WorkerNode *workerNode = PrimaryNodeForGroup(groupId, &groupContainsNodes); - - if (workerNode == NULL && !groupContainsNodes) - { - ereport(ERROR, (errmsg("there is a shard placement in node group %u but " - "there are no nodes in that group", groupId))); - } - - if (workerNode == NULL && groupContainsNodes) - { - ereport(ERROR, (errmsg("node group %u does not have a primary node", groupId))); - } + WorkerNode *workerNode = LookupNodeForGroup(groupId); /* copy everything into shardPlacement but preserve the header */ memcpy((((CitusNode *) shardPlacement) + 1), @@ -478,6 +491,67 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement, } +/* + * LookupNodeForGroup searches the WorkerNodeHash for a worker which is a member of the + * given group and also readable (a primary if we're reading from primaries, a secondary + * if we're reading from secondaries). If such a node does not exist it emits an + * appropriate error message. + */ +static WorkerNode * +LookupNodeForGroup(uint32 groupId) +{ + WorkerNode *workerNode = NULL; + HASH_SEQ_STATUS status; + HTAB *workerNodeHash = GetWorkerNodeHash(); + bool foundAnyNodes = false; + + hash_seq_init(&status, workerNodeHash); + + while ((workerNode = hash_seq_search(&status)) != NULL) + { + uint32 workerNodeGroupId = workerNode->groupId; + if (workerNodeGroupId != groupId) + { + continue; + } + + foundAnyNodes = true; + + if (WorkerNodeIsReadable(workerNode)) + { + hash_seq_term(&status); + return workerNode; + } + } + + if (!foundAnyNodes) + { + ereport(ERROR, (errmsg("there is a shard placement in node group %u but " + "there are no nodes in that group", groupId))); + } + + switch (ReadFromSecondaries) + { + case USE_SECONDARY_NODES_NEVER: + { + ereport(ERROR, (errmsg("node group %u does not have a primary node", + groupId))); + } + + case USE_SECONDARY_NODES_ALWAYS: + { + ereport(ERROR, (errmsg("node group %u does not have a secondary node", + groupId))); + } + + default: + { + ereport(FATAL, (errmsg("unrecognized value for use_secondary_nodes"))); + } + } +} + + /* * ShardPlacementList returns the list of placements for the given shard from * the cache. diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index fb13067a0..0d68b96dc 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -343,6 +343,47 @@ WorkerNodeIsPrimary(WorkerNode *worker) } +/* + * WorkerNodeIsSecondary returns whether the argument represents a secondary node. + */ +bool +WorkerNodeIsSecondary(WorkerNode *worker) +{ + Oid secondaryRole = SecondaryNodeRoleId(); + + /* if nodeRole does not yet exist, all nodes are primary nodes */ + if (secondaryRole == InvalidOid) + { + return false; + } + + return worker->nodeRole == secondaryRole; +} + + +/* + * WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this + * node. + */ +bool +WorkerNodeIsReadable(WorkerNode *workerNode) +{ + if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER && + WorkerNodeIsPrimary(workerNode)) + { + return true; + } + + if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS && + WorkerNodeIsSecondary(workerNode)) + { + return true; + } + + return false; +} + + /* * PrimaryNodeForGroup returns the (unique) primary in the specified group. * diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 089915ae1..95f1d4fd7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -19,6 +19,14 @@ extern bool EnableVersionChecks; +/* managed via guc.c */ +typedef enum +{ + USE_SECONDARY_NODES_NEVER = 0, + USE_SECONDARY_NODES_ALWAYS = 1 +} ReadFromSecondariesType; +extern int ReadFromSecondaries; + /* * Representation of a table's metadata that is frequently used for * distributed execution. Cached. @@ -84,6 +92,8 @@ extern bool CheckCitusVersion(int elevel); extern bool CheckAvailableVersion(int elevel); bool MajorVersionsCompatible(char *leftVersion, char *rightVersion); +extern void EnsureModificationsCanRun(void); + /* access WorkerNodeHash */ extern HTAB * GetWorkerNodeHash(void); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index cf770370d..4e6d017b5 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -64,6 +64,8 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNodeList(void); +extern uint32 ActiveReadableNodeCount(void); +extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters); @@ -71,6 +73,8 @@ extern void EnsureCoordinator(void); extern uint32 GroupForNode(char *nodeName, int32 nodePorT); extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes); extern bool WorkerNodeIsPrimary(WorkerNode *worker); +extern bool WorkerNodeIsSecondary(WorkerNode *worker); +extern bool WorkerNodeIsReadable(WorkerNode *worker); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/test/regress/expected/multi_read_from_secondaries.out b/src/test/regress/expected/multi_read_from_secondaries.out new file mode 100644 index 000000000..3a7c4aa2c --- /dev/null +++ b/src/test/regress/expected/multi_read_from_secondaries.out @@ -0,0 +1,61 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1600000; +\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" +CREATE TABLE dest_table (a int, b int); +CREATE TABLE source_table (a int, b int); +-- attempts to change metadata should fail while reading from secondaries +SELECT create_distributed_table('dest_table', 'a'); +ERROR: writing to worker nodes is not currently allowed +DETAIL: citus.use_secondary_nodes is set to 'always' +\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" +SELECT create_distributed_table('dest_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO dest_table (a, b) VALUES (1, 1); +INSERT INTO dest_table (a, b) VALUES (2, 1); +INSERT INTO source_table (a, b) VALUES (10, 10); +-- simluate actually having secondary nodes +SELECT * FROM pg_dist_node; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster +--------+---------+-----------+----------+----------+-------------+----------+----------+------------- + 1 | 1 | localhost | 57637 | default | f | t | primary | default + 2 | 2 | localhost | 57638 | default | f | t | primary | default +(2 rows) + +UPDATE pg_dist_node SET noderole = 'secondary'; +\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" +-- inserts are disallowed +INSERT INTO dest_table (a, b) VALUES (1, 2); +ERROR: writing to worker nodes is not currently allowed +DETAIL: citus.use_secondary_nodes is set to 'always' +-- router selects are allowed +SELECT a FROM dest_table WHERE a = 1; + a +--- + 1 +(1 row) + +-- real-time selects are also allowed +SELECT a FROM dest_table; + a +--- + 1 + 2 +(2 rows) + +-- insert into is definitely not allowed +INSERT INTO dest_table (a, b) + SELECT a, b FROM source_table; +ERROR: writing to worker nodes is not currently allowed +DETAIL: citus.use_secondary_nodes is set to 'always' +\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" +UPDATE pg_dist_node SET noderole = 'primary'; +DROP TABLE dest_table; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index e0f63a6bd..9311b5d23 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -23,6 +23,8 @@ test: multi_table_ddl test: multi_name_lengths test: multi_metadata_access +test: multi_read_from_secondaries + # ---------- # The following distributed tests depend on creating a partitioned table and # uploading data to it. diff --git a/src/test/regress/sql/multi_read_from_secondaries.sql b/src/test/regress/sql/multi_read_from_secondaries.sql new file mode 100644 index 000000000..18c34496d --- /dev/null +++ b/src/test/regress/sql/multi_read_from_secondaries.sql @@ -0,0 +1,42 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1600000; + +\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" + +CREATE TABLE dest_table (a int, b int); +CREATE TABLE source_table (a int, b int); + +-- attempts to change metadata should fail while reading from secondaries +SELECT create_distributed_table('dest_table', 'a'); + +\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" + +SELECT create_distributed_table('dest_table', 'a'); +SELECT create_distributed_table('source_table', 'a'); + +INSERT INTO dest_table (a, b) VALUES (1, 1); +INSERT INTO dest_table (a, b) VALUES (2, 1); + +INSERT INTO source_table (a, b) VALUES (10, 10); + +-- simluate actually having secondary nodes +SELECT * FROM pg_dist_node; +UPDATE pg_dist_node SET noderole = 'secondary'; + +\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'" + +-- inserts are disallowed +INSERT INTO dest_table (a, b) VALUES (1, 2); + +-- router selects are allowed +SELECT a FROM dest_table WHERE a = 1; + +-- real-time selects are also allowed +SELECT a FROM dest_table; + +-- insert into is definitely not allowed +INSERT INTO dest_table (a, b) + SELECT a, b FROM source_table; + +\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'" +UPDATE pg_dist_node SET noderole = 'primary'; +DROP TABLE dest_table;