Create citus.use_secondary_nodes GUC

This GUC has two settings, 'always' and 'never'. When it's set to
'never' all behavior stays exactly as it was prior to this commit. When
it's set to 'always' only SELECT queries are allowed to run, and only
secondary nodes are used when processing those queries.

Add some helper functions:
- WorkerNodeIsSecondary(), checks the noderole of the worker node
- WorkerNodeIsReadable(), returns whether we're currently allowed to
  read from this node
- ActiveReadableNodeList(), some functions (namely, the ones on the
  SELECT path) don't require working with Primary Nodes. They should call
  this function instead of ActivePrimaryNodeList(), because the latter
  will error out in contexts where we're not allowed to write to nodes.
- ActiveReadableNodeCount(), like the above, replaces
  ActivePrimaryNodeCount().
- EnsureModificationsCanRun(), error out if we're not currently allowed
  to run queries which modify data. (Either we're in read-only mode or
  use_secondary_nodes is set)

Some parts of the code were switched over to use readable nodes instead
of primary nodes:
- Deadlock detection
- DistributedTableSize,
- the router, real-time, and task tracker executors
- ShardPlacement resolution
pull/1549/head
Brian Cloutier 2017-08-08 16:59:18 +03:00 committed by Brian Cloutier
parent c854d51cd8
commit 9d93fb5551
18 changed files with 329 additions and 22 deletions

View File

@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job)
const char *workerHashName = "Worker node hash";
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
workerNodeList = ActivePrimaryNodeList();
workerNodeList = ActiveReadableNodeList();
workerHash = WorkerHash(workerHashName, workerNodeList);
/* initialize task execution structures for remote execution */

View File

@ -71,7 +71,9 @@ JobExecutorType(MultiPlan *multiPlan)
" queries on the workers.")));
}
workerNodeList = ActivePrimaryNodeList();
Assert(multiPlan->operation == CMD_SELECT);
workerNodeList = ActiveReadableNodeList();
workerNodeCount = list_length(workerNodeList);
taskCount = list_length(job->taskList);
tasksPerNode = taskCount / ((double) workerNodeCount);

View File

@ -190,7 +190,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node.
*/
workerNodeList = ActivePrimaryNodeList();
workerNodeList = ActiveReadableNodeList();
taskTrackerCount = (uint32) list_length(workerNodeList);
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);

View File

@ -174,7 +174,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
ErrorIfNotSuitableToGetSize(relationId);
workerNodeList = ActivePrimaryNodeList();
workerNodeList = ActiveReadableNodeList();
foreach(workerNodeCell, workerNodeList)
{

View File

@ -308,6 +308,19 @@ ActivePrimaryNodeCount(void)
}
/*
* ActiveReadableNodeCount returns the number of groups with a node we can read from.
*/
uint32
ActiveReadableNodeCount(void)
{
List *workerNodeList = ActiveReadableNodeList();
uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount;
}
/*
* ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash
*/
@ -319,6 +332,8 @@ ActivePrimaryNodeList(void)
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
EnsureModificationsCanRun();
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
@ -335,6 +350,42 @@ ActivePrimaryNodeList(void)
}
/*
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from.
*/
List *
ActiveReadableNodeList(void)
{
List *workerNodeList = NIL;
WorkerNode *workerNode = NULL;
HTAB *workerNodeHash = GetWorkerNodeHash();
HASH_SEQ_STATUS status;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
WorkerNode *workerNodeCopy;
if (!workerNode->isActive)
{
continue;
}
if (!WorkerNodeIsReadable(workerNode))
{
continue;
}
workerNodeCopy = palloc0(sizeof(WorkerNode));
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
workerNodeList = lappend(workerNodeList, workerNodeCopy);
}
return workerNodeList;
}
/*
* PrimaryNodesNotInList scans through the worker node hash and returns a list of all
* primary nodes which are not in currentList. It runs in O(n*m) but currentList is

View File

@ -87,6 +87,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureCoordinator();
EnsureSuperUser();
EnsureModificationsCanRun();
CheckCitusVersion(ERROR);
PreventTransactionChain(true, "start_metadata_sync_to_node");

View File

@ -1863,7 +1863,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
static uint32
HashPartitionCount(void)
{
uint32 groupCount = ActivePrimaryNodeCount();
uint32 groupCount = ActiveReadableNodeCount();
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
@ -4791,7 +4791,7 @@ GreedyAssignTaskList(List *taskList)
uint32 taskCount = list_length(taskList);
/* get the worker node list and sort the list */
List *workerNodeList = ActivePrimaryNodeList();
List *workerNodeList = ActiveReadableNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*
@ -5223,7 +5223,7 @@ AssignDualHashTaskList(List *taskList)
* if subsequent jobs have a small number of tasks, we won't allocate the
* tasks to the same worker repeatedly.
*/
List *workerNodeList = ActivePrimaryNodeList();
List *workerNodeList = ActiveReadableNodeList();
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
uint32 beginningNodeIndex = jobId % workerNodeCount;

View File

@ -322,6 +322,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
if (IsModifyCommand(query))
{
EnsureModificationsCanRun();
if (InsertSelectIntoDistributedTable(originalQuery))
{
distributedPlan =

View File

@ -1617,7 +1617,7 @@ PlanRouterQuery(Query *originalQuery, RelationRestrictionContext *restrictionCon
}
else if (replacePrunedQueryWithDummy)
{
List *workerNodeList = ActivePrimaryNodeList();
List *workerNodeList = ActiveReadableNodeList();
if (workerNodeList != NIL)
{
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);

View File

@ -26,6 +26,7 @@
#include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_join_order.h"
@ -89,6 +90,12 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
{ NULL, 0, false }
};
static const struct config_enum_entry use_secondary_nodes_options[] = {
{ "never", USE_SECONDARY_NODES_NEVER, false },
{ "always", USE_SECONDARY_NODES_ALWAYS, false },
{ NULL, 0, false }
};
static const struct config_enum_entry multi_shard_commit_protocol_options[] = {
{ "1pc", COMMIT_PROTOCOL_1PC, false },
{ "2pc", COMMIT_PROTOCOL_2PC, false },
@ -650,6 +657,16 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.use_secondary_nodes",
gettext_noop("Sets the policy to use when choosing nodes for SELECT queries."),
NULL,
&ReadFromSecondaries,
USE_SECONDARY_NODES_NEVER, use_secondary_nodes_options,
PGC_SU_BACKEND,
0,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.multi_task_query_log_level",
gettext_noop("Sets the level of multi task query execution log messages"),

View File

@ -91,7 +91,7 @@ dump_global_wait_edges(PG_FUNCTION_ARGS)
WaitGraph *
BuildGlobalWaitGraph(void)
{
List *workerNodeList = ActivePrimaryNodeList();
List *workerNodeList = ActiveReadableNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL;

View File

@ -58,6 +58,9 @@
#include "utils/typcache.h"
/* user configuration */
int ReadFromSecondaries = USE_SECONDARY_NODES_NEVER;
/*
* ShardCacheEntry represents an entry in the shardId -> ShardInterval cache.
* To avoid duplicating data and invalidation logic between this cache and the
@ -186,6 +189,7 @@ static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry);
static WorkerNode * LookupNodeForGroup(uint32 groupid);
/* exports for SQL callable functions */
@ -196,6 +200,27 @@ PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
/*
* EnsureModificationsCanRun checks if the current node is in recovery mode or
* citus.use_secondary_nodes is 'alwaus'. If either is true the function errors out.
*/
void
EnsureModificationsCanRun(void)
{
if (RecoveryInProgress())
{
ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"),
errdetail("the database is in recovery mode")));
}
if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS)
{
ereport(ERROR, (errmsg("writing to worker nodes is not currently allowed"),
errdetail("citus.use_secondary_nodes is set to 'always'")));
}
}
/*
* IsDistributedTable returns whether relationId is a distributed relation or
* not.
@ -428,22 +453,10 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
int shardIndex = shardEntry->shardIndex;
ShardInterval *shardInterval = tableEntry->sortedShardIntervalArray[shardIndex];
bool groupContainsNodes = false;
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
uint32 groupId = groupShardPlacement->groupId;
WorkerNode *workerNode = PrimaryNodeForGroup(groupId, &groupContainsNodes);
if (workerNode == NULL && !groupContainsNodes)
{
ereport(ERROR, (errmsg("there is a shard placement in node group %u but "
"there are no nodes in that group", groupId)));
}
if (workerNode == NULL && groupContainsNodes)
{
ereport(ERROR, (errmsg("node group %u does not have a primary node", groupId)));
}
WorkerNode *workerNode = LookupNodeForGroup(groupId);
/* copy everything into shardPlacement but preserve the header */
memcpy((((CitusNode *) shardPlacement) + 1),
@ -478,6 +491,67 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
}
/*
* LookupNodeForGroup searches the WorkerNodeHash for a worker which is a member of the
* given group and also readable (a primary if we're reading from primaries, a secondary
* if we're reading from secondaries). If such a node does not exist it emits an
* appropriate error message.
*/
static WorkerNode *
LookupNodeForGroup(uint32 groupId)
{
WorkerNode *workerNode = NULL;
HASH_SEQ_STATUS status;
HTAB *workerNodeHash = GetWorkerNodeHash();
bool foundAnyNodes = false;
hash_seq_init(&status, workerNodeHash);
while ((workerNode = hash_seq_search(&status)) != NULL)
{
uint32 workerNodeGroupId = workerNode->groupId;
if (workerNodeGroupId != groupId)
{
continue;
}
foundAnyNodes = true;
if (WorkerNodeIsReadable(workerNode))
{
hash_seq_term(&status);
return workerNode;
}
}
if (!foundAnyNodes)
{
ereport(ERROR, (errmsg("there is a shard placement in node group %u but "
"there are no nodes in that group", groupId)));
}
switch (ReadFromSecondaries)
{
case USE_SECONDARY_NODES_NEVER:
{
ereport(ERROR, (errmsg("node group %u does not have a primary node",
groupId)));
}
case USE_SECONDARY_NODES_ALWAYS:
{
ereport(ERROR, (errmsg("node group %u does not have a secondary node",
groupId)));
}
default:
{
ereport(FATAL, (errmsg("unrecognized value for use_secondary_nodes")));
}
}
}
/*
* ShardPlacementList returns the list of placements for the given shard from
* the cache.

View File

@ -343,6 +343,47 @@ WorkerNodeIsPrimary(WorkerNode *worker)
}
/*
* WorkerNodeIsSecondary returns whether the argument represents a secondary node.
*/
bool
WorkerNodeIsSecondary(WorkerNode *worker)
{
Oid secondaryRole = SecondaryNodeRoleId();
/* if nodeRole does not yet exist, all nodes are primary nodes */
if (secondaryRole == InvalidOid)
{
return false;
}
return worker->nodeRole == secondaryRole;
}
/*
* WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this
* node.
*/
bool
WorkerNodeIsReadable(WorkerNode *workerNode)
{
if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER &&
WorkerNodeIsPrimary(workerNode))
{
return true;
}
if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS &&
WorkerNodeIsSecondary(workerNode))
{
return true;
}
return false;
}
/*
* PrimaryNodeForGroup returns the (unique) primary in the specified group.
*

View File

@ -19,6 +19,14 @@
extern bool EnableVersionChecks;
/* managed via guc.c */
typedef enum
{
USE_SECONDARY_NODES_NEVER = 0,
USE_SECONDARY_NODES_ALWAYS = 1
} ReadFromSecondariesType;
extern int ReadFromSecondaries;
/*
* Representation of a table's metadata that is frequently used for
* distributed execution. Cached.
@ -84,6 +92,8 @@ extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
extern void EnsureModificationsCanRun(void);
/* access WorkerNodeHash */
extern HTAB * GetWorkerNodeHash(void);

View File

@ -64,6 +64,8 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 ActivePrimaryNodeCount(void);
extern List * ActivePrimaryNodeList(void);
extern uint32 ActiveReadableNodeCount(void);
extern List * ActiveReadableNodeList(void);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
@ -71,6 +73,8 @@ extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
extern WorkerNode * PrimaryNodeForGroup(uint32 groupId, bool *groupContainsNodes);
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
extern bool WorkerNodeIsReadable(WorkerNode *worker);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);

View File

@ -0,0 +1,61 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1600000;
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"
CREATE TABLE dest_table (a int, b int);
CREATE TABLE source_table (a int, b int);
-- attempts to change metadata should fail while reading from secondaries
SELECT create_distributed_table('dest_table', 'a');
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
SELECT create_distributed_table('dest_table', 'a');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('source_table', 'a');
create_distributed_table
--------------------------
(1 row)
INSERT INTO dest_table (a, b) VALUES (1, 1);
INSERT INTO dest_table (a, b) VALUES (2, 1);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simluate actually having secondary nodes
SELECT * FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------
1 | 1 | localhost | 57637 | default | f | t | primary | default
2 | 2 | localhost | 57638 | default | f | t | primary | default
(2 rows)
UPDATE pg_dist_node SET noderole = 'secondary';
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"
-- inserts are disallowed
INSERT INTO dest_table (a, b) VALUES (1, 2);
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
-- router selects are allowed
SELECT a FROM dest_table WHERE a = 1;
a
---
1
(1 row)
-- real-time selects are also allowed
SELECT a FROM dest_table;
a
---
1
2
(2 rows)
-- insert into is definitely not allowed
INSERT INTO dest_table (a, b)
SELECT a, b FROM source_table;
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
UPDATE pg_dist_node SET noderole = 'primary';
DROP TABLE dest_table;

View File

@ -23,6 +23,8 @@ test: multi_table_ddl
test: multi_name_lengths
test: multi_metadata_access
test: multi_read_from_secondaries
# ----------
# The following distributed tests depend on creating a partitioned table and
# uploading data to it.

View File

@ -0,0 +1,42 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1600000;
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"
CREATE TABLE dest_table (a int, b int);
CREATE TABLE source_table (a int, b int);
-- attempts to change metadata should fail while reading from secondaries
SELECT create_distributed_table('dest_table', 'a');
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
SELECT create_distributed_table('dest_table', 'a');
SELECT create_distributed_table('source_table', 'a');
INSERT INTO dest_table (a, b) VALUES (1, 1);
INSERT INTO dest_table (a, b) VALUES (2, 1);
INSERT INTO source_table (a, b) VALUES (10, 10);
-- simluate actually having secondary nodes
SELECT * FROM pg_dist_node;
UPDATE pg_dist_node SET noderole = 'secondary';
\c "dbname=regression options='-c\ citus.use_secondary_nodes=always'"
-- inserts are disallowed
INSERT INTO dest_table (a, b) VALUES (1, 2);
-- router selects are allowed
SELECT a FROM dest_table WHERE a = 1;
-- real-time selects are also allowed
SELECT a FROM dest_table;
-- insert into is definitely not allowed
INSERT INTO dest_table (a, b)
SELECT a, b FROM source_table;
\c "dbname=regression options='-c\ citus.use_secondary_nodes=never'"
UPDATE pg_dist_node SET noderole = 'primary';
DROP TABLE dest_table;