diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 75c099e3b..512284efc 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ - 6.2-1 6.2-2 + 6.2-1 6.2-2 6.2-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -134,6 +134,8 @@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sq cat $^ > $@ $(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql cat $^ > $@ +$(EXTENSION)--6.2-3.sql: $(EXTENSION)--6.2-2.sql $(EXTENSION)--6.2-2--6.2-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.2-2--6.2-3.sql b/src/backend/distributed/citus--6.2-2--6.2-3.sql new file mode 100644 index 000000000..ef52e3cac --- /dev/null +++ b/src/backend/distributed/citus--6.2-2--6.2-3.sql @@ -0,0 +1,54 @@ +/* citus--6.2-2--6.2-3.sql */ + +SET search_path = 'pg_catalog'; + +ALTER TABLE pg_dist_node ADD isactive bool; + +DROP FUNCTION IF EXISTS master_add_node(text, integer); + +CREATE FUNCTION master_add_node(nodename text, + nodeport integer, + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT isactive bool) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_add_node$$; +COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer) + IS 'add node to the cluster'; + +CREATE FUNCTION master_add_inactive_node(nodename text, + nodeport integer, + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT isactive bool) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_add_inactive_node$$; +COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer) + IS 'prepare node by adding it to pg_dist_node'; + +CREATE FUNCTION master_activate_node(nodename text, + nodeport integer, + OUT nodeid integer, + OUT groupid integer, + OUT nodename text, + OUT nodeport integer, + OUT noderack text, + OUT hasmetadata boolean, + OUT isactive bool) + RETURNS record + LANGUAGE C STRICT + AS 'MODULE_PATHNAME',$$master_activate_node$$; +COMMENT ON FUNCTION master_activate_node(nodename text, nodeport integer) + IS 'activate a node which is in the cluster'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 42fc2f856..b34908f53 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.2-2' +default_version = '6.2-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 4198837b2..1cced3549 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -242,7 +242,7 @@ static void CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; bool requireEmpty = true; diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 5e0f54db8..e6b1668c4 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job) const char *workerHashName = "Worker node hash"; WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList)); - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); /* initialize task execution structures for remote execution */ diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 81e12e91b..2fd75ab48 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -43,7 +43,7 @@ JobExecutorType(MultiPlan *multiPlan) { Job *job = multiPlan->workerJob; List *workerTaskList = job->taskList; - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); int taskCount = list_length(workerTaskList); int workerNodeCount = list_length(workerNodeList); double tasksPerNode = taskCount / ((double) workerNodeCount); diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 1d00452a4..44411ae16 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -191,7 +191,7 @@ MultiTaskTrackerExecute(Job *job) * assigning and checking the status of tasks. The second (temporary) hash * helps us in fetching results data from worker nodes to the master node. */ - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); taskTrackerCount = (uint32) list_length(workerNodeList); taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 9deeb9958..56ec7e788 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -33,6 +33,7 @@ #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" @@ -159,7 +160,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; /* load and sort the worker node list for deterministic placement */ - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* make sure we don't process cancel signals until all shards are created */ @@ -386,7 +387,7 @@ CreateReferenceTableShard(Oid distributedTableId) } /* load and sort the worker node list for deterministic placement */ - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* get the next shard id */ diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index 72cb87091..70eef82aa 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -47,7 +47,7 @@ master_expire_table_cache(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; int shardCount = cacheEntry->shardIntervalArrayLength; ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 289b59f21..10997e7ef 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -150,7 +150,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery) pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); foreach(workerNodeCell, workerNodeList) { diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 7784f1ead..a635cde62 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -390,7 +390,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) /* switch to memory context appropriate for multiple function calls */ oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - workerNodeList = WorkerNodeList(); + workerNodeList = ActiveWorkerNodeList(); workerNodeCount = (uint32) list_length(workerNodeList); functionContext->user_fctx = workerNodeList; diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 00561a943..f24bfc07c 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) { text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); uint64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index a1daddb11..42354299c 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -302,19 +302,19 @@ WorkerGetNodeWithName(const char *hostname) uint32 WorkerGetLiveNodeCount(void) { - HTAB *workerNodeHash = GetWorkerNodeHash(); - uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash); + List *workerNodeList = ActiveWorkerNodeList(); + uint32 liveWorkerCount = workerNodeList->length; return liveWorkerCount; } /* - * WorkerNodeList iterates over the hash table that includes the worker nodes, and adds - * them to a list which is returned. + * ActiveWorkerNodeList iterates over the hash table that includes the worker + * nodes and adds active nodes to a list, which is returned. */ List * -WorkerNodeList(void) +ActiveWorkerNodeList(void) { List *workerNodeList = NIL; WorkerNode *workerNode = NULL; @@ -325,9 +325,12 @@ WorkerNodeList(void) while ((workerNode = hash_seq_search(&status)) != NULL) { - WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); - memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); - workerNodeList = lappend(workerNodeList, workerNodeCopy); + if (workerNode->isActive) + { + WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); + memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); + workerNodeList = lappend(workerNodeList, workerNodeCopy); + } } return workerNodeList; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e965f362e..b59bdcd37 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -99,6 +99,15 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) "(%s,%d)", escapedNodeName, nodePort))); } + if (!workerNode->isActive) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("you cannot sync metadata to an inactive node"), + errhint("First, activate the node with " + "SELECT master_activate_node(%s,%d)", + escapedNodeName, nodePort))); + } + MarkNodeHasMetadata(nodeNameString, nodePort, true); /* generate and add the local group id's update query */ @@ -206,7 +215,7 @@ MetadataCreateCommands(void) List *metadataSnapshotCommandList = NIL; List *distributedTableList = DistributedTableList(); List *propagatedTableList = NIL; - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *distributedTableCell = NULL; char *nodeListInsertCommand = NULL; bool includeSequenceDefaults = true; @@ -401,24 +410,25 @@ NodeListInsertCommand(List *workerNodeList) /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, - "INSERT INTO pg_dist_node " - "(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) " - "VALUES "); + "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " + "noderack, hasmetadata, isactive) VALUES "); /* iterate over the worker nodes, add the values */ foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE"; + char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE"; + char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; appendStringInfo(nodeListInsertCommand, - "(%d, %d, %s, %d, %s, %s)", + "(%d, %d, %s, %d, %s, %s, %s)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, quote_literal_cstr(workerNode->workerRack), - hasMetadaString); + hasMetadataString, + isActiveString); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) @@ -685,6 +695,24 @@ NodeDeleteCommand(uint32 nodeId) } +/* + * NodeStateUpdateCommand generates a command that can be executed to update + * isactive column of a node in pg_dist_node table. + */ +char * +NodeStateUpdateCommand(uint32 nodeId, bool isActive) +{ + StringInfo nodeStateUpdateCommand = makeStringInfo(); + char *isActiveString = isActive ? "TRUE" : "FALSE"; + + appendStringInfo(nodeStateUpdateCommand, + "UPDATE pg_dist_node SET isactive = %s " + "WHERE nodeid = %u", isActiveString, nodeId); + + return nodeStateUpdateCommand->data; +} + + /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition @@ -973,7 +1001,7 @@ OwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; foreach(workerNodeCell, workerNodeList) diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a33808d5b..47aa1e375 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4912,7 +4912,7 @@ GreedyAssignTaskList(List *taskList) uint32 taskCount = list_length(taskList); /* get the worker node list and sort the list */ - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* @@ -5344,7 +5344,7 @@ AssignDualHashTaskList(List *taskList) * if subsequent jobs have a small number of tasks, we won't allocate the * tasks to the same worker repeatedly. */ - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); uint32 workerNodeCount = (uint32) list_length(workerNodeList); uint32 beginningNodeIndex = jobId % workerNodeCount; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index bea3c517c..b92a25fb7 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2295,7 +2295,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC } else if (replacePrunedQueryWithDummy) { - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); if (workerNodeList != NIL) { WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList); diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 8a95eb8b2..70e16bd1d 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -125,7 +125,7 @@ RecoverPreparedTransactions(void) */ LockRelationOid(DistTransactionRelationId(), ExclusiveLock); - workerList = WorkerNodeList(); + workerList = ActiveWorkerNodeList(); foreach(workerNodeCell, workerList) { diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 6f54de4c5..6cf1333c2 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -78,7 +78,7 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command) void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) { - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); ListCell *commandCell = NULL; @@ -128,7 +128,7 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, { List *connectionList = NIL; ListCell *connectionCell = NULL; - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 275441320..5d5fb7f75 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -1909,6 +1909,7 @@ InitializeWorkerNodeCache(void) workerNode->nodeId = currentNode->nodeId; strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH); workerNode->hasMetadata = currentNode->hasMetadata; + workerNode->isActive = currentNode->isActive; if (handleFound) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 7e26f17a6..28c4c88f7 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -33,10 +33,12 @@ #include "distributed/multi_router_planner.h" #include "distributed/pg_dist_node.h" #include "distributed/reference_table_utils.h" +#include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" +#include "storage/bufmgr.h" #include "storage/lock.h" #include "storage/fd.h" #include "utils/builtins.h" @@ -51,23 +53,29 @@ int GroupSize = 1; /* local function forward declarations */ -static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove); +static Datum ActivateNode(char *nodeName, int nodePort); +static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, - char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists); + char *nodeRack, bool hasMetadata, bool isActive, + bool *nodeAlreadyExists); +static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); +static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); static Datum GenerateNodeTuple(WorkerNode *workerNode); static int32 GetNextGroupId(void); static uint32 GetMaxGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId, - char *nodeRack, bool hasMetadata); + char *nodeRack, bool hasMetadata, bool isActive); static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_add_node); +PG_FUNCTION_INFO_V1(master_add_inactive_node); PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_disable_node); +PG_FUNCTION_INFO_V1(master_activate_node); PG_FUNCTION_INFO_V1(master_initialize_node_metadata); PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column); @@ -85,23 +93,47 @@ master_add_node(PG_FUNCTION_ARGS) int32 groupId = 0; char *nodeRack = WORKER_DEFAULT_RACK; bool hasMetadata = false; + bool isActive = false; bool nodeAlreadyExists = false; - Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, - hasMetadata, &nodeAlreadyExists); + Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); /* - * After adding new node, if the node is not already exist, we replicate all existing - * reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates - * reference tables to all nodes however, it skips nodes which already has healthy - * placement of particular reference table. + * After adding new node, if the node did not already exist, we will activate + * the node. This means we will replicate all reference tables to the new + * node. */ if (!nodeAlreadyExists) { - ReplicateAllReferenceTablesToAllNodes(); + nodeRecord = ActivateNode(nodeNameString, nodePort); } - PG_RETURN_CSTRING(returnData); + PG_RETURN_CSTRING(nodeRecord); +} + + +/* + * master_add_inactive_node function adds a new node to the cluster as inactive node + * and returns information about newly added node. It does not replicate reference + * tables to the new node, it only adds new node to the pg_dist_node table. + */ +Datum +master_add_inactive_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + char *nodeNameString = text_to_cstring(nodeName); + int32 groupId = 0; + char *nodeRack = WORKER_DEFAULT_RACK; + bool hasMetadata = false; + bool isActive = false; + bool nodeAlreadyExists = false; + + Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack, + hasMetadata, isActive, &nodeAlreadyExists); + + PG_RETURN_CSTRING(nodeRecord); } @@ -120,35 +152,108 @@ master_remove_node(PG_FUNCTION_ARGS) text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); char *nodeNameString = text_to_cstring(nodeName); - bool forceRemove = false; - RemoveNodeFromCluster(nodeNameString, nodePort, forceRemove); + + RemoveNodeFromCluster(nodeNameString, nodePort); PG_RETURN_VOID(); } /* - * master_disable_node function removes the provided node from the pg_dist_node table of - * the master node and all nodes with metadata regardless of the node having an active - * shard placement. - * The call to the master_remove_node should be done by the super user. - * This function also deletes all reference table placements belong to the given node from - * pg_dist_shard_placement, but it does not drop actual placement at the node. In the case - * of re-adding the node, master_add_node first drops and re-creates the reference tables. + * master_disable_node function sets isactive value of the provided node as inactive + * at master node and all nodes with metadata regardless of the node having an + * active shard placement. + * The call to the master_disable_node must be done by the super user. + * This function also deletes all reference table placements belong to the given + * node from pg_dist_shard_placement, but it does not drop actual placement at + * the node. In the case of re-activating the node, master_add_node first drops + * and re-creates the reference tables. */ Datum master_disable_node(PG_FUNCTION_ARGS) { - text *nodeName = PG_GETARG_TEXT_P(0); + text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeNameString = text_to_cstring(nodeName); - bool forceRemove = true; - RemoveNodeFromCluster(nodeNameString, nodePort, forceRemove); + + char *nodeName = text_to_cstring(nodeNameText); + bool hasShardPlacements = false; + bool isActive = false; + + DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); + + hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); + if (hasShardPlacements) + { + ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries " + "may fail after this operation. Use " + "SELECT master_activate_node('%s', %d) to activate this " + "node back.", + nodeName, nodePort, nodeName, nodePort))); + } + + SetNodeState(nodeName, nodePort, isActive); PG_RETURN_VOID(); } +/* + * master_activate_node UDF activates the given node. It sets the node's isactive + * value to active and replicates all reference tables to that node. + */ +Datum +master_activate_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + + char *nodeNameString = text_to_cstring(nodeName); + Datum nodeRecord = 0; + + nodeRecord = ActivateNode(nodeNameString, nodePort); + + PG_RETURN_CSTRING(nodeRecord); +} + + +/* + * ActivateNode activates the node with nodeName and nodePort. Currently, activation + * includes only replicating the reference tables and setting isactive column of the + * given node. + */ +static Datum +ActivateNode(char *nodeName, int nodePort) +{ + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort); + CommandId commandId = GetCurrentCommandId(true); + LockTupleMode lockTupleMode = LockTupleExclusive; + LockWaitPolicy lockWaitPolicy = LockWaitError; + bool followUpdates = false; + Buffer buffer = 0; + HeapUpdateFailureData heapUpdateFailureData; + + WorkerNode *workerNode = NULL; + bool isActive = true; + Datum nodeRecord = 0; + + heap_lock_tuple(pgDistNode, heapTuple, commandId, lockTupleMode, lockWaitPolicy, + followUpdates, &buffer, &heapUpdateFailureData); + ReleaseBuffer(buffer); + + SetNodeState(nodeName, nodePort, isActive); + + ReplicateAllReferenceTablesToNode(nodeName, nodePort); + + workerNode = FindWorkerNode(nodeName, nodePort); + nodeRecord = GenerateNodeTuple(workerNode); + + heap_close(pgDistNode, AccessShareLock); + + return nodeRecord; +} + + /* * master_initialize_node_metadata is run once, when upgrading citus. It ingests the * existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating @@ -166,7 +271,10 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0, - workerNode->workerRack, false, &nodeAlreadyExists); + workerNode->workerRack, false, workerNode->isActive, + &nodeAlreadyExists); + + ActivateNode(workerNode->workerName, workerNode->workerPort); } PG_RETURN_BOOL(true); @@ -329,15 +437,14 @@ ReadWorkerNodes() * RemoveNodeFromCluster removes the provided node from the pg_dist_node table of * the master node and all nodes with metadata. * The call to the master_remove_node should be done by the super user. If there are - * active shard placements on the node; the function removes the node when forceRemove - * flag is set, it errors out otherwise. + * active shard placements on the node; the function errors out. * This function also deletes all reference table placements belong to the given node from * pg_dist_shard_placement, but it does not drop actual placement at the node. It also * modifies replication factor of the colocation group of reference tables, so that * replication factor will be equal to worker count. */ static void -RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) +RemoveNodeFromCluster(char *nodeName, int32 nodePort) { char *nodeDeleteCommand = NULL; bool hasShardPlacements = false; @@ -370,7 +477,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) Oid firstReferenceTableId = linitial_oid(referenceTableList); uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId); - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); int workerCount = list_length(workerNodeList); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); @@ -379,19 +486,8 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort); if (hasShardPlacements) { - if (forceRemove) - { - ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some " - "queries may fail after this operation. Use " - "select master_add_node('%s', %d) to add this " - "node back.", - nodeName, nodePort, nodeName, nodePort))); - } - else - { - ereport(ERROR, (errmsg("you cannot remove a node which has active " - "shard placements"))); - } + ereport(ERROR, (errmsg("you cannot remove a node which has active " + "shard placements"))); } nodeDeleteCommand = NodeDeleteCommand(deletedNodeId); @@ -414,7 +510,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove) */ static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, - bool hasMetadata, bool *nodeAlreadyExists) + bool hasMetadata, bool isActive, bool *nodeAlreadyExists) { Relation pgDistNode = NULL; int nextNodeIdInt = 0; @@ -465,7 +561,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, /* generate the new node id from the sequence */ nextNodeIdInt = GetNextNodeId(); - InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata); + InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata, + isActive); workerNode = FindWorkerNode(nodeName, nodePort); @@ -488,6 +585,82 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, } +/* + * SetNodeState function sets the isactive column of the specified worker in + * pg_dist_node to true. + */ +static void +SetNodeState(char *nodeName, int32 nodePort, bool isActive) +{ + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort); + + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + char *nodeStateUpdateCommand = NULL; + WorkerNode *workerNode = NULL; + + memset(replace, 0, sizeof(replace)); + values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive); + isnull[Anum_pg_dist_node_isactive - 1] = false; + replace[Anum_pg_dist_node_isactive - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistNode, heapTuple); + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + CommandCounterIncrement(); + + heap_close(pgDistNode, AccessShareLock); + + /* we also update isactive column at worker nodes */ + workerNode = FindWorkerNode(nodeName, nodePort); + nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive); + SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand); +} + + +/* + * GetNodeTuple function returns heap tuple of given nodeName and nodePort. If + * there are no node tuple with specified nodeName and nodePort, this function + * errors out. + */ +static HeapTuple +GetNodeTuple(char *nodeName, int32 nodePort) +{ + Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); + const int scanKeyCount = 2; + const bool indexOK = false; + + ScanKeyData scanKey[scanKeyCount]; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, + BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, + BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort)); + scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + nodeName, nodePort))); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistNode, AccessShareLock); + + return heapTuple; +} + + /* * GenerateNodeTuple gets a worker node and return a heap tuple of * given worker node. @@ -512,6 +685,7 @@ GenerateNodeTuple(WorkerNode *workerNode) values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata); + values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); @@ -519,7 +693,6 @@ GenerateNodeTuple(WorkerNode *workerNode) /* generate the tuple */ tupleDescriptor = RelationGetDescr(pgDistNode); heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - nodeDatum = HeapTupleGetDatum(heapTuple); /* close the relation */ @@ -650,7 +823,7 @@ EnsureCoordinator(void) */ static void InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack, - bool hasMetadata) + bool hasMetadata, bool isActive) { Relation pgDistNode = NULL; TupleDesc tupleDescriptor = NULL; @@ -668,6 +841,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char * values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort); values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack); values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata); + values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive); /* open shard relation and insert new tuple */ pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock); @@ -867,6 +1041,7 @@ ParseWorkerNodeFileAndRename() strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH); workerNode->workerPort = nodePort; workerNode->hasMetadata = false; + workerNode->isActive = false; workerNodeList = lappend(workerNodeList, workerNode); } @@ -906,6 +1081,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata, tupleDescriptor, &isNull); + Datum isActive = heap_getattr(heapTuple, Anum_pg_dist_node_isactive, + tupleDescriptor, &isNull); Assert(!HeapTupleHasNulls(heapTuple)); @@ -916,6 +1093,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH); strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH); workerNode->hasMetadata = DatumGetBool(hasMetadata); + workerNode->isActive = DatumGetBool(isActive); return workerNode; } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 873d064c6..2b09f7878 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -36,6 +36,8 @@ /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); +static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, + int nodePort); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); static int CompareOids(const void *leftElement, const void *rightElement); @@ -114,22 +116,19 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) /* - * ReplicateAllReferenceTablesToAllNodes function finds all reference tables and - * replicates them to all worker nodes. It also modifies pg_dist_colocation table to - * update the replication factor column. This function skips a worker node if that node - * already has healthy placement of a particular reference table to prevent unnecessary - * data transfer. + * ReplicateAllReferenceTablesToNode function finds all reference tables and + * replicates them to the given worker node. It also modifies pg_dist_colocation + * table to update the replication factor column when necessary. This function + * skips reference tables if that node already has healthy placement of that + * reference table to prevent unnecessary data transfer. */ void -ReplicateAllReferenceTablesToAllNodes() +ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { List *referenceTableList = ReferenceTableOidList(); ListCell *referenceTableCell = NULL; - - Relation pgDistNode = NULL; - List *workerNodeList = NIL; - int workerCount = 0; - + List *workerNodeList = ActiveWorkerNodeList(); + uint32 workerCount = 0; Oid firstReferenceTableId = InvalidOid; uint32 referenceTableColocationId = INVALID_COLOCATION_ID; @@ -139,12 +138,6 @@ ReplicateAllReferenceTablesToAllNodes() return; } - /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ - pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - workerNodeList = WorkerNodeList(); - workerCount = list_length(workerNodeList); - - /* * We sort the reference table list to prevent deadlocks in concurrent * ReplicateAllReferenceTablesToAllNodes calls. @@ -156,14 +149,10 @@ ReplicateAllReferenceTablesToAllNodes() List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; - char *relationName = get_rel_name(referenceTableId); LockShardDistributionMetadata(shardId, ExclusiveLock); - ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers", - relationName))); - - ReplicateShardToAllWorkers(shardInterval); + ReplicateShardToNode(shardInterval, nodeName, nodePort); } /* @@ -171,10 +160,10 @@ ReplicateAllReferenceTablesToAllNodes() * colocation group of reference tables so that worker count will be equal to * replication factor again. */ + workerCount = list_length(workerNodeList); firstReferenceTableId = linitial_oid(referenceTableList); referenceTableColocationId = TableColocationId(firstReferenceTableId); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); - heap_close(pgDistNode, NoLock); } @@ -228,28 +217,18 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId) /* - * ReplicateShardToAllWorkers function replicates given shard to the given worker nodes - * in a separate transactions. While replicating, it only replicates the shard to the - * workers which does not have a healthy replica of the shard. This function also modifies - * metadata by inserting/updating related rows in pg_dist_shard_placement. However, this - * function does not obtain any lock on shard resource and shard metadata. It is caller's + * ReplicateShardToAllWorkers function replicates given shard to the all worker nodes + * in separate transactions. While replicating, it only replicates the shard to the + * workers which does not have a healthy replica of the shard. However, this function + * does not obtain any lock on shard resource and shard metadata. It is caller's * responsibility to take those locks. */ static void ReplicateShardToAllWorkers(ShardInterval *shardInterval) { - uint64 shardId = shardInterval->shardId; - List *shardPlacementList = ShardPlacementList(shardId); - bool missingOk = false; - ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); - char *srcNodeName = sourceShardPlacement->nodeName; - uint32 srcNodePort = sourceShardPlacement->nodePort; - char *tableOwner = TableOwner(shardInterval->relationId); - List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); - /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; /* @@ -263,59 +242,86 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; - bool missingWorkerOk = true; - ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, - nodeName, nodePort, - missingWorkerOk); - - /* - * Although this function is used for reference tables and reference table shard - * placements always have shardState = FILE_FINALIZED, in case of an upgrade of - * a non-reference table to reference table, unhealty placements may exist. In - * this case, we repair the shard placement and update its state in - * pg_dist_shard_placement table. - */ - if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) - { - uint64 placementId = 0; - - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); - if (targetPlacement == NULL) - { - placementId = GetNextPlacementId(); - InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, - nodeName, nodePort); - } - else - { - placementId = targetPlacement->placementId; - UpdateShardPlacementState(placementId, FILE_FINALIZED); - } - - /* - * Although ReplicateShardToAllWorkers is used only for reference tables, - * during the upgrade phase, the placements are created before the table is - * marked as a reference table. All metadata (including the placement - * metadata) will be copied to workers after all reference table changed - * are finished. - */ - if (ShouldSyncTableMetadata(shardInterval->relationId)) - { - char *placementCommand = PlacementUpsertCommand(shardId, placementId, - FILE_FINALIZED, 0, - nodeName, nodePort); - - SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); - } - } + ReplicateShardToNode(shardInterval, nodeName, nodePort); } heap_close(pgDistNode, NoLock); } +/* + * ReplicateShardToNode function replicates given shard to the given worker node + * in a separate transaction. While replicating, it only replicates the shard to the + * workers which does not have a healthy replica of the shard. This function also modifies + * metadata by inserting/updating related rows in pg_dist_shard_placement. + */ +static void +ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) +{ + uint64 shardId = shardInterval->shardId; + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); + char *srcNodeName = sourceShardPlacement->nodeName; + uint32 srcNodePort = sourceShardPlacement->nodePort; + List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); + + List *shardPlacementList = ShardPlacementList(shardId); + bool missingWorkerOk = true; + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + nodeName, nodePort, + missingWorkerOk); + char *tableOwner = TableOwner(shardInterval->relationId); + + /* + * Although this function is used for reference tables and reference table shard + * placements always have shardState = FILE_FINALIZED, in case of an upgrade of + * a non-reference table to reference table, unhealty placements may exist. In + * this case, we repair the shard placement and update its state in + * pg_dist_shard_placement table. + */ + if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) + { + uint64 placementId = 0; + + ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", + get_rel_name(shardInterval->relationId), nodeName, + nodePort))); + + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); + if (targetPlacement == NULL) + { + placementId = GetNextPlacementId(); + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, + nodeName, nodePort); + } + else + { + placementId = targetPlacement->placementId; + UpdateShardPlacementState(placementId, FILE_FINALIZED); + } + + /* + * Although ReplicateShardToAllWorkers is used only for reference tables, + * during the upgrade phase, the placements are created before the table is + * marked as a reference table. All metadata (including the placement + * metadata) will be copied to workers after all reference table changed + * are finished. + */ + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + char *placementCommand = PlacementUpsertCommand(shardId, placementId, + FILE_FINALIZED, 0, + nodeName, nodePort); + + SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); + } + } +} + + /* * ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to * reference table metadata. To do this, this function updates pg_dist_partition, @@ -355,7 +361,7 @@ uint32 CreateReferenceTableColocationId() { uint32 colocationId = INVALID_COLOCATION_ID; - List *workerNodeList = WorkerNodeList(); + List *workerNodeList = ActiveWorkerNodeList(); int shardCount = 1; int replicationFactor = list_length(workerNodeList); Oid distributionColumnType = InvalidOid; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index f14a9dcd3..53747a7e1 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -30,6 +30,7 @@ extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); +extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index c5db5129c..391f7a909 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -23,6 +23,7 @@ typedef struct FormData_pg_dist_node text nodename; int nodeport; bool hasmetadata; + bool isactive #endif } FormData_pg_dist_node; @@ -37,13 +38,14 @@ typedef FormData_pg_dist_node *Form_pg_dist_node; * compiler constants for pg_dist_node * ---------------- */ -#define Natts_pg_dist_node 6 +#define Natts_pg_dist_node 7 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 #define Anum_pg_dist_node_nodeport 4 #define Anum_pg_dist_node_noderack 5 #define Anum_pg_dist_node_hasmetadata 6 +#define Anum_pg_dist_node_isactive 7 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 5a84c6e36..f2b083bbf 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -13,7 +13,7 @@ #define REFERENCE_TABLE_UTILS_H_ extern uint32 CreateReferenceTableColocationId(void); -extern void ReplicateAllReferenceTablesToAllNodes(void); +extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort); extern List * ReferenceTableOidList(void); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 01ff53c4f..b3ec93dd9 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -30,7 +30,6 @@ #define WORKER_RACK_TRIES 5 #define WORKER_DEFAULT_RACK "default" - /* * In memory representation of pg_dist_node table elements. The elements are hold in * WorkerNodeHash table. @@ -43,6 +42,7 @@ typedef struct WorkerNode uint32 groupId; /* node's groupId; same for the nodes that are in the same group */ char workerRack[WORKER_LENGTH]; /* node's network location */ bool hasMetadata; /* node gets metadata changes */ + bool isActive; /* node's state */ } WorkerNode; @@ -59,7 +59,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern WorkerNode * WorkerGetNodeWithName(const char *hostname); extern uint32 WorkerGetLiveNodeCount(void); -extern List * WorkerNodeList(void); +extern List * ActiveWorkerNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void); extern void EnsureCoordinator(void); diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 12495eb82..d85f4890f 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -9,15 +9,15 @@ ERROR: cannot create reference table "test_reference_table" DETAIL: There are no active worker nodes. -- add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,t) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,t) (1 row) -- get the active nodes @@ -30,9 +30,9 @@ SELECT master_get_active_worker_nodes(); -- try to add a node that is already in the cluster SELECT * FROM master_add_node('localhost', :worker_1_port); - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | f | t (1 row) -- get the active nodes @@ -59,9 +59,9 @@ SELECT master_get_active_worker_nodes(); -- try to disable a node with no placements see that node is removed SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (3,3,localhost,57638,default,f) + master_add_node +----------------------------------- + (3,3,localhost,57638,default,f,t) (1 row) SELECT master_disable_node('localhost', :worker_2_port); @@ -77,10 +77,10 @@ SELECT master_get_active_worker_nodes(); (1 row) -- add some shard placements to the cluster -SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (4,4,localhost,57638,default,f) +SELECT master_activate_node('localhost', :worker_2_port); + master_activate_node +----------------------------------- + (3,3,localhost,57638,default,f,t) (1 row) CREATE TABLE cluster_management_test (col_1 text, col_2 int); @@ -125,7 +125,7 @@ INSERT INTO test_reference_table VALUES (1, '1'); -- try to disable a node with active placements see that node is removed -- observe that a notification is displayed SELECT master_disable_node('localhost', :worker_2_port); -NOTICE: Node localhost:57638 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57638) to add this node back. +NOTICE: Node localhost:57638 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57638) to activate this node back. master_disable_node --------------------- @@ -139,9 +139,9 @@ SELECT master_get_active_worker_nodes(); -- restore the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (5,5,localhost,57638,default,f) + master_add_node +----------------------------------- + (3,3,localhost,57638,default,f,f) (1 row) -- try to remove a node with active placements and see that node removal is failed @@ -177,9 +177,9 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (6,6,localhost,57638,default,f) + master_add_node +----------------------------------- + (4,4,localhost,57638,default,f,t) (1 row) UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; @@ -193,9 +193,9 @@ SELECT master_remove_node('localhost', :worker_2_port); UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (7,7,localhost,57638,default,f) + master_add_node +----------------------------------- + (5,5,localhost,57638,default,f,t) (1 row) \c - - - :worker_1_port @@ -222,9 +222,9 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep -- check that added nodes are not propagated to nodes with hasmetadata=false UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (8,8,localhost,57638,default,f) + master_add_node +----------------------------------- + (6,6,localhost,57638,default,f,t) (1 row) \c - - - :worker_1_port @@ -244,24 +244,24 @@ SELECT (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+----------+----------+----------+------------- + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+----------+----------+----------+-------------+---------- (0 rows) -- check that adding two nodes in the same transaction works SELECT master_add_node('localhost', :worker_1_port), master_add_node('localhost', :worker_2_port); - master_add_node | master_add_node ----------------------------------+----------------------------------- - (9,9,localhost,57637,default,f) | (10,10,localhost,57638,default,f) + master_add_node | master_add_node +-----------------------------------+----------------------------------- + (7,7,localhost,57637,default,f,t) | (8,8,localhost,57638,default,f,t) (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 9 | 9 | localhost | 57637 | default | f - 10 | 10 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+-----------+----------+----------+-------------+---------- + 7 | 7 | localhost | 57637 | default | f | t + 8 | 8 | localhost | 57638 | default | f | t (2 rows) -- check that mixed add/remove node commands work fine inside transaction @@ -275,7 +275,7 @@ SELECT master_remove_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port); master_add_node ----------------------------------- - (11,11,localhost,57638,default,f) + (9,9,localhost,57638,default,f,t) (1 row) SELECT master_remove_node('localhost', :worker_2_port); @@ -293,9 +293,9 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port; BEGIN; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (12,12,localhost,57638,default,f) + master_add_node +------------------------------------- + (10,10,localhost,57638,default,f,t) (1 row) SELECT master_remove_node('localhost', :worker_2_port); @@ -305,9 +305,9 @@ SELECT master_remove_node('localhost', :worker_2_port); (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (13,13,localhost,57638,default,f) + master_add_node +------------------------------------- + (11,11,localhost,57638,default,f,t) (1 row) COMMIT; @@ -333,15 +333,15 @@ SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; (2 rows) SELECT master_add_node('localhost', :worker_1_port); - master_add_node ------------------------------------ - (14,14,localhost,57637,default,f) + master_add_node +------------------------------------- + (12,12,localhost,57637,default,f,t) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (15,15,localhost,57638,default,f) + master_add_node +------------------------------------- + (13,13,localhost,57638,default,f,t) (1 row) -- check that a distributed table can be created after adding a node in a transaction @@ -353,9 +353,9 @@ SELECT master_remove_node('localhost', :worker_2_port); BEGIN; SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (16,16,localhost,57638,default,f) + master_add_node +------------------------------------- + (14,14,localhost,57638,default,f,t) (1 row) CREATE TABLE temp(col1 text, col2 int); diff --git a/src/test/regress/expected/multi_drop_extension.out b/src/test/regress/expected/multi_drop_extension.out index 645a7a935..d733afde5 100644 --- a/src/test/regress/expected/multi_drop_extension.out +++ b/src/test/regress/expected/multi_drop_extension.out @@ -21,15 +21,15 @@ RESET client_min_messages; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,t) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,t) (1 row) -- verify that a table can be created after the extension has been dropped and recreated diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3daa82f42..4a3968b2c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -78,6 +78,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.2-1'; ALTER EXTENSION citus UPDATE TO '6.2-2'; +ALTER EXTENSION citus UPDATE TO '6.2-3'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index a297de3fa..924c2b9f4 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -28,11 +28,11 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; -- Show that, with no MX tables, metadata snapshot contains only the delete commands, -- pg_dist_node entries and reference tables SELECT unnest(master_metadata_snapshot()); - unnest ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + unnest +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) (3 rows) -- Create a test table with constraints and SERIAL @@ -58,7 +58,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE @@ -80,7 +80,7 @@ SELECT unnest(master_metadata_snapshot()); -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE @@ -104,7 +104,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -134,7 +134,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -157,7 +157,7 @@ SELECT unnest(master_metadata_snapshot()); ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition - INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres @@ -203,10 +203,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | t - 2 | 2 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | t | t + 2 | 2 | localhost | 57638 | default | f | t (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -333,10 +333,10 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | t - 2 | 2 | localhost | 57638 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+-----------+----------+----------+-------------+---------- + 1 | 1 | localhost | 57637 | default | t | t + 2 | 2 | localhost | 57638 | default | f | t (2 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -1133,9 +1133,9 @@ SELECT create_distributed_table('mx_table', 'a'); \c - postgres - :master_port SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (4,4,localhost,57638,default,f) + master_add_node +----------------------------------- + (4,4,localhost,57638,default,f,t) (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -1316,10 +1316,10 @@ WHERE logicalrelid='mx_ref'::regclass; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "mx_ref" to all workers - master_add_node ---------------------------------- - (5,5,localhost,57638,default,f) +NOTICE: Replicating reference table "mx_ref" to the node localhost:57638 + master_add_node +----------------------------------- + (5,5,localhost,57638,default,f,t) (1 row) SELECT shardid, nodename, nodeport diff --git a/src/test/regress/expected/multi_mx_ddl.out b/src/test/regress/expected/multi_mx_ddl.out index 2572e532f..7f87e5b57 100644 --- a/src/test/regress/expected/multi_mx_ddl.out +++ b/src/test/regress/expected/multi_mx_ddl.out @@ -202,6 +202,19 @@ SELECT create_distributed_table('mx_sequence', 'key'); (1 row) \c - - - :worker_1_port +SELECT groupid FROM pg_dist_local_group; + groupid +--------- + 12 +(1 row) + +SELECT * FROM mx_sequence_value_seq; + sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called +-----------------------+------------------+------------------+--------------+------------------+------------------+-------------+---------+-----------+----------- + mx_sequence_value_seq | 3377699720527873 | 3377699720527873 | 1 | 3659174697238529 | 3377699720527873 | 1 | 0 | f | f +(1 row) + +\c - - - :worker_2_port SELECT groupid FROM pg_dist_local_group; groupid --------- @@ -214,19 +227,6 @@ SELECT * FROM mx_sequence_value_seq; mx_sequence_value_seq | 3940649673949185 | 3940649673949185 | 1 | 4222124650659841 | 3940649673949185 | 1 | 0 | f | f (1 row) -\c - - - :worker_2_port -SELECT groupid FROM pg_dist_local_group; - groupid ---------- - 16 -(1 row) - -SELECT * FROM mx_sequence_value_seq; - sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ------------------------+------------------+------------------+--------------+------------------+------------------+-------------+---------+-----------+----------- - mx_sequence_value_seq | 4503599627370497 | 4503599627370497 | 1 | 4785074604081153 | 4503599627370497 | 1 | 0 | f | f -(1 row) - \c - - - :master_port -- the type of sequences can't be changed ALTER TABLE mx_sequence ALTER value TYPE BIGINT; diff --git a/src/test/regress/expected/multi_mx_metadata.out b/src/test/regress/expected/multi_mx_metadata.out index 310cbb703..5798cb557 100644 --- a/src/test/regress/expected/multi_mx_metadata.out +++ b/src/test/regress/expected/multi_mx_metadata.out @@ -233,8 +233,8 @@ CREATE TABLE should_be_sorted_into_middle (value int); PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; \c - - - :master_port -- Add "fake" pg_dist_transaction records and run recovery -INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_commit'); -INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_be_forgotten'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); NOTICE: recovered a prepared transaction on localhost:57637 CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort' diff --git a/src/test/regress/expected/multi_mx_modifications.out b/src/test/regress/expected/multi_mx_modifications.out index 34a97a70e..e244c0578 100644 --- a/src/test/regress/expected/multi_mx_modifications.out +++ b/src/test/regress/expected/multi_mx_modifications.out @@ -440,18 +440,18 @@ SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data; INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; id ------------------ - 4503599627370497 + 3940649673949185 (1 row) INSERT INTO app_analytics_events_mx (app_id, name) VALUES (102, 'Wayz') RETURNING id; id ------------------ - 4503599627370498 + 3940649673949186 (1 row) INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNING *; id | app_id | name ------------------+--------+------ - 4503599627370499 | 103 | Mynt + 3940649673949187 | 103 | Mynt (1 row) diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 0f6934fe4..127ae2492 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -43,9 +43,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1380000,1380000,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380000,1380000,localhost,57638,default,f,t) (1 row) -- remove a node with reference table @@ -164,10 +164,10 @@ SELECT master_remove_node('localhost', :worker_2_port); ERROR: could not find valid entry for node "localhost:57638" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380001,1380001,localhost,57638,default,f) +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1380001,1380001,localhost,57638,default,f,t) (1 row) -- remove node in a transaction and ROLLBACK @@ -387,10 +387,10 @@ WHERE \c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380002,1380002,localhost,57638,default,f) +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1380002,1380002,localhost,57638,default,f,t) (1 row) -- test inserting a value then removing a node in a transaction @@ -516,10 +516,10 @@ SELECT * FROM remove_node_reference_table; \c - - - :master_port -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380003,1380003,localhost,57638,default,f) +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1380003,1380003,localhost,57638,default,f,t) (1 row) -- test executing DDL command then removing a node in a transaction @@ -641,10 +641,10 @@ Table "public.remove_node_reference_table" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ---------------------------------------------- - (1380004,1380004,localhost,57638,default,f) +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1380004,1380004,localhost,57638,default,f,t) (1 row) -- test DROP table after removing a node in a transaction @@ -710,9 +710,9 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1380005,1380005,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1380005,1380005,localhost,57638,default,f,t) (1 row) -- re-create remove_node_reference_table @@ -843,11 +843,11 @@ WHERE -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers -NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1380006,1380006,localhost,57638,default,f) +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 +NOTICE: Replicating reference table "table1" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1380006,1380006,localhost,57638,default,f,t) (1 row) -- test with master_disable_node @@ -915,7 +915,7 @@ SELECT master_disable_node('localhost', :worker_2_port); SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; count ------- - 0 + 1 (1 row) SELECT @@ -936,14 +936,14 @@ WHERE colocationid IN WHERE logicalrelid = 'remove_node_reference_table'::regclass); colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ - 1380001 | 1 | 1 | 0 + 1380001 | 1 | 2 | 0 (1 row) \c - - - :worker_1_port SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; count ------- - 0 + 1 (1 row) SELECT @@ -959,12 +959,12 @@ WHERE \c - - - :master_port -- re-add the node for next tests -SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "remove_node_reference_table" to all workers -NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1380007,1380007,localhost,57638,default,f) +SELECT master_activate_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638 +NOTICE: Replicating reference table "table1" to the node localhost:57638 + master_activate_node +----------------------------------------------- + (1380006,1380006,localhost,57638,default,f,t) (1 row) -- DROP tables to clean workspace diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 615eb09ad..1bf79a6f6 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -25,9 +25,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1370000,1370000,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370000,1370000,localhost,57638,default,f,t) (1 row) -- verify node is added @@ -71,7 +71,6 @@ SELECT create_reference_table('replicate_reference_table_unhealthy'); UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers ERROR: could not find any healthy placement for shard 1370000 -- verify node is not added SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; @@ -123,10 +122,10 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers - master_add_node ---------------------------------------------- - (1370002,1370002,localhost,57638,default,f) +NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370002,1370002,localhost,57638,default,f,t) (1 row) -- status after master_add_node @@ -177,9 +176,9 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------------------- - (1370002,1370002,localhost,57638,default,f) + master_add_node +----------------------------------------------- + (1370002,1370002,localhost,57638,default,f,t) (1 row) -- status after master_add_node @@ -244,10 +243,10 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers - master_add_node ---------------------------------------------- - (1370003,1370003,localhost,57638,default,f) +NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370003,1370003,localhost,57638,default,f,t) (1 row) ROLLBACK; @@ -306,10 +305,10 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers - master_add_node ---------------------------------------------- - (1370004,1370004,localhost,57638,default,f) +NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370004,1370004,localhost,57638,default,f,t) (1 row) COMMIT; @@ -401,13 +400,14 @@ ORDER BY logicalrelid; BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers - master_add_node ---------------------------------------------- - (1370005,1370005,localhost,57638,default,f) +NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370005,1370005,localhost,57638,default,f,t) (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); +NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -482,7 +482,7 @@ SELECT create_reference_table('replicate_reference_table_insert'); BEGIN; INSERT INTO replicate_reference_table_insert VALUES(1); SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers +NOTICE: Replicating reference table "replicate_reference_table_insert" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_insert; @@ -497,7 +497,7 @@ SELECT create_reference_table('replicate_reference_table_copy'); BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers +NOTICE: Replicating reference table "replicate_reference_table_copy" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_copy; @@ -514,7 +514,7 @@ ALTER TABLE replicate_reference_table_ddl ADD column2 int; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers +NOTICE: Replicating reference table "replicate_reference_table_ddl" to the node localhost:57638 ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; DROP TABLE replicate_reference_table_ddl; @@ -550,10 +550,10 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers - master_add_node ---------------------------------------------- - (1370009,1370009,localhost,57638,default,f) +NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370009,1370009,localhost,57638,default,f,t) (1 row) DROP TABLE replicate_reference_table_drop; @@ -612,10 +612,10 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "table1" to all workers - master_add_node ---------------------------------------------- - (1370010,1370010,localhost,57638,default,f) +NOTICE: Replicating reference table "table1" to the node localhost:57638 + master_add_node +----------------------------------------------- + (1370010,1370010,localhost,57638,default,f,t) (1 row) -- status after master_add_node @@ -643,6 +643,79 @@ WHERE colocationid IN DROP TABLE replicate_reference_table_schema.table1; DROP SCHEMA replicate_reference_table_schema CASCADE; +-- do some tests with inactive node +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE initially_not_replicated_reference_table (key int); +SELECT create_reference_table('initially_not_replicated_reference_table'); + create_reference_table +------------------------ + +(1 row) + +SELECT master_add_inactive_node('localhost', :worker_2_port); + master_add_inactive_node +----------------------------------------------- + (1370011,1370011,localhost,57638,default,f,f) +(1 row) + +-- we should see only one shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid = 'initially_not_replicated_reference_table'::regclass) +ORDER BY 1,4,5; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370012 | 1 | 0 | localhost | 57637 +(1 row) + +-- we should see the two shard placements after activation +SELECT master_activate_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638 + master_activate_node +----------------------------------------------- + (1370011,1370011,localhost,57638,default,f,t) +(1 row) + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid = 'initially_not_replicated_reference_table'::regclass) +ORDER BY 1,4,5; + shardid | shardstate | shardlength | nodename | nodeport +---------+------------+-------------+-----------+---------- + 1370012 | 1 | 0 | localhost | 57637 + 1370012 | 1 | 0 | localhost | 57638 +(2 rows) + +-- this should have no effect +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +----------------------------------------------- + (1370011,1370011,localhost,57638,default,f,t) +(1 row) + +-- drop unnecassary tables +DROP TABLE initially_not_replicated_reference_table; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); DROP TABLE tmp_shard_placement; diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index a5e2f4330..4fc76fc9a 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -78,15 +78,15 @@ DROP EXTENSION citus; CREATE EXTENSION citus; -- re-add the nodes to the cluster SELECT master_add_node('localhost', :worker_1_port); - master_add_node ---------------------------------- - (1,1,localhost,57637,default,f) + master_add_node +----------------------------------- + (1,1,localhost,57637,default,f,t) (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (2,2,localhost,57638,default,f) + master_add_node +----------------------------------- + (2,2,localhost,57638,default,f,t) (1 row) -- create a table with a SERIAL column diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 41a1474ee..09cefad64 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -224,8 +224,8 @@ SELECT master_add_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+----------+----------+----------+------------- + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +--------+---------+----------+----------+----------+-------------+---------- (0 rows) -- master_remove_node @@ -234,9 +234,9 @@ DROP INDEX mx_test_uniq_index; NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' SELECT master_add_node('localhost', 5432); - master_add_node --------------------------------------------- - (1370000,1370000,localhost,5432,default,f) + master_add_node +---------------------------------------------- + (1370000,1370000,localhost,5432,default,f,t) (1 row) \c - - - :worker_1_port @@ -244,9 +244,9 @@ SELECT master_remove_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ----------+---------+-----------+----------+----------+------------- - 1370000 | 1370000 | localhost | 5432 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive +---------+---------+-----------+----------+----------+-------------+---------- + 1370000 | 1370000 | localhost | 5432 | default | f | t (1 row) \c - - - :master_port diff --git a/src/test/regress/expected/multi_upgrade_reference_table.out b/src/test/regress/expected/multi_upgrade_reference_table.out index 1d5853d49..3aaf0b276 100644 --- a/src/test/regress/expected/multi_upgrade_reference_table.out +++ b/src/test/regress/expected/multi_upgrade_reference_table.out @@ -95,6 +95,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass; SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); +NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638 ERROR: type "public.upgrade_test_composite_type" does not exist CONTEXT: while executing command on localhost:57638 DROP TABLE upgrade_reference_table_composite; @@ -165,6 +166,7 @@ WHERE shardid IN (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_append'); +NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -277,6 +279,7 @@ WHERE shardid IN (1 row) SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker'); +NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -621,6 +624,7 @@ WHERE shardid IN BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback'); +NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -733,6 +737,7 @@ WHERE shardid IN BEGIN; SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit'); +NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638 upgrade_to_reference_table ---------------------------- @@ -980,6 +985,7 @@ ORDER BY nodeport; SELECT upgrade_to_reference_table('upgrade_reference_table_mx'); +NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638 upgrade_to_reference_table ---------------------------- diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 8a045b570..348d82b14 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -586,7 +586,7 @@ SELECT shardid, nodename, nodeport WHERE logicalrelid = 'numbers_append'::regclass order by placementid; -- add the node back -SELECT master_add_node('localhost', :worker_1_port); +SELECT master_activate_node('localhost', :worker_1_port); RESET citus.shard_replication_factor; -- add two new shards and verify they are created at both workers COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 7a61e81cb..f84f4a56d 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -740,7 +740,7 @@ SELECT shardid, nodename, nodeport -- disable the first node SELECT master_disable_node('localhost', :worker_1_port); -NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57637) to add this node back. +NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57637) to activate this node back. master_disable_node --------------------- @@ -766,12 +766,12 @@ SELECT shardid, nodename, nodeport (6 rows) -- add the node back -SELECT master_add_node('localhost', :worker_1_port); -NOTICE: Replicating reference table "nation" to all workers -NOTICE: Replicating reference table "supplier" to all workers - master_add_node ---------------------------------- - (3,3,localhost,57637,default,f) +SELECT master_activate_node('localhost', :worker_1_port); +NOTICE: Replicating reference table "nation" to the node localhost:57637 +NOTICE: Replicating reference table "supplier" to the node localhost:57637 + master_activate_node +----------------------------------- + (1,1,localhost,57637,default,f,t) (1 row) RESET citus.shard_replication_factor; diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index df020204f..5430933ff 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -33,7 +33,7 @@ SELECT master_disable_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster -SELECT master_add_node('localhost', :worker_2_port); +SELECT master_activate_node('localhost', :worker_2_port); CREATE TABLE cluster_management_test (col_1 text, col_2 int); SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash'); SELECT master_create_worker_shards('cluster_management_test', 16, 1); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 0be9c5096..dc52ac98e 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -79,6 +79,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; ALTER EXTENSION citus UPDATE TO '6.2-1'; ALTER EXTENSION citus UPDATE TO '6.2-2'; +ALTER EXTENSION citus UPDATE TO '6.2-3'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_mx_metadata.sql b/src/test/regress/sql/multi_mx_metadata.sql index 9a63bad5b..f86834beb 100644 --- a/src/test/regress/sql/multi_mx_metadata.sql +++ b/src/test/regress/sql/multi_mx_metadata.sql @@ -150,8 +150,8 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle'; \c - - - :master_port -- Add "fake" pg_dist_transaction records and run recovery -INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_commit'); -INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_be_forgotten'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit'); +INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten'); SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_transaction; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index fe44a50da..b1ae4e861 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -575,7 +575,7 @@ WHERE \c - - - :master_port -- re-add the node for next tests -SELECT master_add_node('localhost', :worker_2_port); +SELECT master_activate_node('localhost', :worker_2_port); -- DROP tables to clean workspace diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 7f9813b77..608299f4f 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -417,6 +417,49 @@ WHERE colocationid IN DROP TABLE replicate_reference_table_schema.table1; DROP SCHEMA replicate_reference_table_schema CASCADE; +-- do some tests with inactive node +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE TABLE initially_not_replicated_reference_table (key int); +SELECT create_reference_table('initially_not_replicated_reference_table'); + +SELECT master_add_inactive_node('localhost', :worker_2_port); + +-- we should see only one shard placements +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid = 'initially_not_replicated_reference_table'::regclass) +ORDER BY 1,4,5; + +-- we should see the two shard placements after activation +SELECT master_activate_node('localhost', :worker_2_port); + +SELECT + shardid, shardstate, shardlength, nodename, nodeport +FROM + pg_dist_shard_placement +WHERE + shardid IN (SELECT + shardid + FROM + pg_dist_shard + WHERE + logicalrelid = 'initially_not_replicated_reference_table'::regclass) +ORDER BY 1,4,5; + +-- this should have no effect +SELECT master_add_node('localhost', :worker_2_port); + +-- drop unnecassary tables +DROP TABLE initially_not_replicated_reference_table; -- reload pg_dist_shard_placement table INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);