mirror of https://github.com/citusdata/citus.git
Decouple reference table replication
With this change we add an option to add a node without replicating all reference tables to that node. If a node is added with this option, we mark the node as inactive and no queries will sent to that node. We also added two new UDFs; - master_activate_node(host, port): - marks node as active and replicates all reference tables to that node - master_add_inactive_node(host, port): - only adds node to pg_dist_nodepull/1283/head
parent
7097336972
commit
e9095e62ec
|
@ -10,7 +10,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||||
6.2-1 6.2-2
|
6.2-1 6.2-2 6.2-3
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -134,6 +134,8 @@ $(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sq
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql
|
$(EXTENSION)--6.2-2.sql: $(EXTENSION)--6.2-1.sql $(EXTENSION)--6.2-1--6.2-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.2-3.sql: $(EXTENSION)--6.2-2.sql $(EXTENSION)--6.2-2--6.2-3.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/* citus--6.2-2--6.2-3.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
ALTER TABLE pg_dist_node ADD isactive bool;
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS master_add_node(text, integer);
|
||||||
|
|
||||||
|
CREATE FUNCTION master_add_node(nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
OUT nodeid integer,
|
||||||
|
OUT groupid integer,
|
||||||
|
OUT nodename text,
|
||||||
|
OUT nodeport integer,
|
||||||
|
OUT noderack text,
|
||||||
|
OUT hasmetadata boolean,
|
||||||
|
OUT isactive bool)
|
||||||
|
RETURNS record
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME',$$master_add_node$$;
|
||||||
|
COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer)
|
||||||
|
IS 'add node to the cluster';
|
||||||
|
|
||||||
|
CREATE FUNCTION master_add_inactive_node(nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
OUT nodeid integer,
|
||||||
|
OUT groupid integer,
|
||||||
|
OUT nodename text,
|
||||||
|
OUT nodeport integer,
|
||||||
|
OUT noderack text,
|
||||||
|
OUT hasmetadata boolean,
|
||||||
|
OUT isactive bool)
|
||||||
|
RETURNS record
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME',$$master_add_inactive_node$$;
|
||||||
|
COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer)
|
||||||
|
IS 'prepare node by adding it to pg_dist_node';
|
||||||
|
|
||||||
|
CREATE FUNCTION master_activate_node(nodename text,
|
||||||
|
nodeport integer,
|
||||||
|
OUT nodeid integer,
|
||||||
|
OUT groupid integer,
|
||||||
|
OUT nodename text,
|
||||||
|
OUT nodeport integer,
|
||||||
|
OUT noderack text,
|
||||||
|
OUT hasmetadata boolean,
|
||||||
|
OUT isactive bool)
|
||||||
|
RETURNS record
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME',$$master_activate_node$$;
|
||||||
|
COMMENT ON FUNCTION master_activate_node(nodename text, nodeport integer)
|
||||||
|
IS 'activate a node which is in the cluster';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.2-2'
|
default_version = '6.2-3'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -242,7 +242,7 @@ static void
|
||||||
CreateReferenceTable(Oid relationId)
|
CreateReferenceTable(Oid relationId)
|
||||||
{
|
{
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
int replicationFactor = list_length(workerNodeList);
|
int replicationFactor = list_length(workerNodeList);
|
||||||
char *distributionColumnName = NULL;
|
char *distributionColumnName = NULL;
|
||||||
bool requireEmpty = true;
|
bool requireEmpty = true;
|
||||||
|
|
|
@ -82,7 +82,7 @@ MultiRealTimeExecute(Job *job)
|
||||||
const char *workerHashName = "Worker node hash";
|
const char *workerHashName = "Worker node hash";
|
||||||
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
|
WaitInfo *waitInfo = MultiClientCreateWaitInfo(list_length(taskList));
|
||||||
|
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||||
|
|
||||||
/* initialize task execution structures for remote execution */
|
/* initialize task execution structures for remote execution */
|
||||||
|
|
|
@ -43,7 +43,7 @@ JobExecutorType(MultiPlan *multiPlan)
|
||||||
{
|
{
|
||||||
Job *job = multiPlan->workerJob;
|
Job *job = multiPlan->workerJob;
|
||||||
List *workerTaskList = job->taskList;
|
List *workerTaskList = job->taskList;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
int taskCount = list_length(workerTaskList);
|
int taskCount = list_length(workerTaskList);
|
||||||
int workerNodeCount = list_length(workerNodeList);
|
int workerNodeCount = list_length(workerNodeList);
|
||||||
double tasksPerNode = taskCount / ((double) workerNodeCount);
|
double tasksPerNode = taskCount / ((double) workerNodeCount);
|
||||||
|
|
|
@ -191,7 +191,7 @@ MultiTaskTrackerExecute(Job *job)
|
||||||
* assigning and checking the status of tasks. The second (temporary) hash
|
* assigning and checking the status of tasks. The second (temporary) hash
|
||||||
* helps us in fetching results data from worker nodes to the master node.
|
* helps us in fetching results data from worker nodes to the master node.
|
||||||
*/
|
*/
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
taskTrackerCount = (uint32) list_length(workerNodeList);
|
taskTrackerCount = (uint32) list_length(workerNodeList);
|
||||||
|
|
||||||
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
|
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/pg_dist_shard.h"
|
#include "distributed/pg_dist_shard.h"
|
||||||
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
@ -159,7 +160,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
|
||||||
|
|
||||||
/* load and sort the worker node list for deterministic placement */
|
/* load and sort the worker node list for deterministic placement */
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
/* make sure we don't process cancel signals until all shards are created */
|
/* make sure we don't process cancel signals until all shards are created */
|
||||||
|
@ -386,7 +387,7 @@ CreateReferenceTableShard(Oid distributedTableId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* load and sort the worker node list for deterministic placement */
|
/* load and sort the worker node list for deterministic placement */
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
/* get the next shard id */
|
/* get the next shard id */
|
||||||
|
|
|
@ -47,7 +47,7 @@ master_expire_table_cache(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||||
|
|
|
@ -150,7 +150,7 @@ DistributedTableSize(Oid relationId, char *sizeQuery)
|
||||||
|
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
|
|
||||||
foreach(workerNodeCell, workerNodeList)
|
foreach(workerNodeCell, workerNodeList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -390,7 +390,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
|
||||||
/* switch to memory context appropriate for multiple function calls */
|
/* switch to memory context appropriate for multiple function calls */
|
||||||
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
||||||
|
|
||||||
workerNodeList = WorkerNodeList();
|
workerNodeList = ActiveWorkerNodeList();
|
||||||
workerNodeCount = (uint32) list_length(workerNodeList);
|
workerNodeCount = (uint32) list_length(workerNodeList);
|
||||||
|
|
||||||
functionContext->user_fctx = workerNodeList;
|
functionContext->user_fctx = workerNodeList;
|
||||||
|
|
|
@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *relationNameText = PG_GETARG_TEXT_P(0);
|
text *relationNameText = PG_GETARG_TEXT_P(0);
|
||||||
char *relationName = text_to_cstring(relationNameText);
|
char *relationName = text_to_cstring(relationNameText);
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
List *ddlEventList = NULL;
|
List *ddlEventList = NULL;
|
||||||
uint32 attemptableNodeCount = 0;
|
uint32 attemptableNodeCount = 0;
|
||||||
|
|
|
@ -302,19 +302,19 @@ WorkerGetNodeWithName(const char *hostname)
|
||||||
uint32
|
uint32
|
||||||
WorkerGetLiveNodeCount(void)
|
WorkerGetLiveNodeCount(void)
|
||||||
{
|
{
|
||||||
HTAB *workerNodeHash = GetWorkerNodeHash();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
uint32 liveWorkerCount = hash_get_num_entries(workerNodeHash);
|
uint32 liveWorkerCount = workerNodeList->length;
|
||||||
|
|
||||||
return liveWorkerCount;
|
return liveWorkerCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerNodeList iterates over the hash table that includes the worker nodes, and adds
|
* ActiveWorkerNodeList iterates over the hash table that includes the worker
|
||||||
* them to a list which is returned.
|
* nodes and adds active nodes to a list, which is returned.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
WorkerNodeList(void)
|
ActiveWorkerNodeList(void)
|
||||||
{
|
{
|
||||||
List *workerNodeList = NIL;
|
List *workerNodeList = NIL;
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
@ -324,11 +324,14 @@ WorkerNodeList(void)
|
||||||
hash_seq_init(&status, workerNodeHash);
|
hash_seq_init(&status, workerNodeHash);
|
||||||
|
|
||||||
while ((workerNode = hash_seq_search(&status)) != NULL)
|
while ((workerNode = hash_seq_search(&status)) != NULL)
|
||||||
|
{
|
||||||
|
if (workerNode->isActive)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
|
WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode));
|
||||||
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode));
|
||||||
workerNodeList = lappend(workerNodeList, workerNodeCopy);
|
workerNodeList = lappend(workerNodeList, workerNodeCopy);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return workerNodeList;
|
return workerNodeList;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,15 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
||||||
"(%s,%d)", escapedNodeName, nodePort)));
|
"(%s,%d)", escapedNodeName, nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!workerNode->isActive)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("you cannot sync metadata to an inactive node"),
|
||||||
|
errhint("First, activate the node with "
|
||||||
|
"SELECT master_activate_node(%s,%d)",
|
||||||
|
escapedNodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
MarkNodeHasMetadata(nodeNameString, nodePort, true);
|
MarkNodeHasMetadata(nodeNameString, nodePort, true);
|
||||||
|
|
||||||
/* generate and add the local group id's update query */
|
/* generate and add the local group id's update query */
|
||||||
|
@ -206,7 +215,7 @@ MetadataCreateCommands(void)
|
||||||
List *metadataSnapshotCommandList = NIL;
|
List *metadataSnapshotCommandList = NIL;
|
||||||
List *distributedTableList = DistributedTableList();
|
List *distributedTableList = DistributedTableList();
|
||||||
List *propagatedTableList = NIL;
|
List *propagatedTableList = NIL;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *distributedTableCell = NULL;
|
ListCell *distributedTableCell = NULL;
|
||||||
char *nodeListInsertCommand = NULL;
|
char *nodeListInsertCommand = NULL;
|
||||||
bool includeSequenceDefaults = true;
|
bool includeSequenceDefaults = true;
|
||||||
|
@ -401,24 +410,25 @@ NodeListInsertCommand(List *workerNodeList)
|
||||||
|
|
||||||
/* generate the query without any values yet */
|
/* generate the query without any values yet */
|
||||||
appendStringInfo(nodeListInsertCommand,
|
appendStringInfo(nodeListInsertCommand,
|
||||||
"INSERT INTO pg_dist_node "
|
"INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, "
|
||||||
"(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) "
|
"noderack, hasmetadata, isactive) VALUES ");
|
||||||
"VALUES ");
|
|
||||||
|
|
||||||
/* iterate over the worker nodes, add the values */
|
/* iterate over the worker nodes, add the values */
|
||||||
foreach(workerNodeCell, workerNodeList)
|
foreach(workerNodeCell, workerNodeList)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||||
char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE";
|
char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE";
|
||||||
|
char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE";
|
||||||
|
|
||||||
appendStringInfo(nodeListInsertCommand,
|
appendStringInfo(nodeListInsertCommand,
|
||||||
"(%d, %d, %s, %d, %s, %s)",
|
"(%d, %d, %s, %d, %s, %s, %s)",
|
||||||
workerNode->nodeId,
|
workerNode->nodeId,
|
||||||
workerNode->groupId,
|
workerNode->groupId,
|
||||||
quote_literal_cstr(workerNode->workerName),
|
quote_literal_cstr(workerNode->workerName),
|
||||||
workerNode->workerPort,
|
workerNode->workerPort,
|
||||||
quote_literal_cstr(workerNode->workerRack),
|
quote_literal_cstr(workerNode->workerRack),
|
||||||
hasMetadaString);
|
hasMetadataString,
|
||||||
|
isActiveString);
|
||||||
|
|
||||||
processedWorkerNodeCount++;
|
processedWorkerNodeCount++;
|
||||||
if (processedWorkerNodeCount != workerCount)
|
if (processedWorkerNodeCount != workerCount)
|
||||||
|
@ -685,6 +695,24 @@ NodeDeleteCommand(uint32 nodeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodeStateUpdateCommand generates a command that can be executed to update
|
||||||
|
* isactive column of a node in pg_dist_node table.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
NodeStateUpdateCommand(uint32 nodeId, bool isActive)
|
||||||
|
{
|
||||||
|
StringInfo nodeStateUpdateCommand = makeStringInfo();
|
||||||
|
char *isActiveString = isActive ? "TRUE" : "FALSE";
|
||||||
|
|
||||||
|
appendStringInfo(nodeStateUpdateCommand,
|
||||||
|
"UPDATE pg_dist_node SET isactive = %s "
|
||||||
|
"WHERE nodeid = %u", isActiveString, nodeId);
|
||||||
|
|
||||||
|
return nodeStateUpdateCommand->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
|
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
|
||||||
* of the table with the given name to the given colocationId in pg_dist_partition
|
* of the table with the given name to the given colocationId in pg_dist_partition
|
||||||
|
@ -973,7 +1001,7 @@ OwnerName(Oid objectId)
|
||||||
static bool
|
static bool
|
||||||
HasMetadataWorkers(void)
|
HasMetadataWorkers(void)
|
||||||
{
|
{
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
|
|
||||||
foreach(workerNodeCell, workerNodeList)
|
foreach(workerNodeCell, workerNodeList)
|
||||||
|
|
|
@ -4912,7 +4912,7 @@ GreedyAssignTaskList(List *taskList)
|
||||||
uint32 taskCount = list_length(taskList);
|
uint32 taskCount = list_length(taskList);
|
||||||
|
|
||||||
/* get the worker node list and sort the list */
|
/* get the worker node list and sort the list */
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -5344,7 +5344,7 @@ AssignDualHashTaskList(List *taskList)
|
||||||
* if subsequent jobs have a small number of tasks, we won't allocate the
|
* if subsequent jobs have a small number of tasks, we won't allocate the
|
||||||
* tasks to the same worker repeatedly.
|
* tasks to the same worker repeatedly.
|
||||||
*/
|
*/
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
|
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
|
||||||
uint32 beginningNodeIndex = jobId % workerNodeCount;
|
uint32 beginningNodeIndex = jobId % workerNodeCount;
|
||||||
|
|
||||||
|
|
|
@ -2295,7 +2295,7 @@ RouterSelectQuery(Query *originalQuery, RelationRestrictionContext *restrictionC
|
||||||
}
|
}
|
||||||
else if (replacePrunedQueryWithDummy)
|
else if (replacePrunedQueryWithDummy)
|
||||||
{
|
{
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
if (workerNodeList != NIL)
|
if (workerNodeList != NIL)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);
|
WorkerNode *workerNode = (WorkerNode *) linitial(workerNodeList);
|
||||||
|
|
|
@ -125,7 +125,7 @@ RecoverPreparedTransactions(void)
|
||||||
*/
|
*/
|
||||||
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
|
LockRelationOid(DistTransactionRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
workerList = WorkerNodeList();
|
workerList = ActiveWorkerNodeList();
|
||||||
|
|
||||||
foreach(workerNodeCell, workerList)
|
foreach(workerNodeCell, workerList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -78,7 +78,7 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command)
|
||||||
void
|
void
|
||||||
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
|
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
|
||||||
{
|
{
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
char *nodeUser = CitusExtensionOwnerName();
|
char *nodeUser = CitusExtensionOwnerName();
|
||||||
ListCell *commandCell = NULL;
|
ListCell *commandCell = NULL;
|
||||||
|
@ -128,7 +128,7 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
|
||||||
{
|
{
|
||||||
List *connectionList = NIL;
|
List *connectionList = NIL;
|
||||||
ListCell *connectionCell = NULL;
|
ListCell *connectionCell = NULL;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
char *nodeUser = CitusExtensionOwnerName();
|
char *nodeUser = CitusExtensionOwnerName();
|
||||||
|
|
||||||
|
|
|
@ -1909,6 +1909,7 @@ InitializeWorkerNodeCache(void)
|
||||||
workerNode->nodeId = currentNode->nodeId;
|
workerNode->nodeId = currentNode->nodeId;
|
||||||
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
|
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
|
||||||
workerNode->hasMetadata = currentNode->hasMetadata;
|
workerNode->hasMetadata = currentNode->hasMetadata;
|
||||||
|
workerNode->isActive = currentNode->isActive;
|
||||||
|
|
||||||
if (handleFound)
|
if (handleFound)
|
||||||
{
|
{
|
||||||
|
|
|
@ -33,10 +33,12 @@
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/pg_dist_node.h"
|
#include "distributed/pg_dist_node.h"
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
|
#include "storage/bufmgr.h"
|
||||||
#include "storage/lock.h"
|
#include "storage/lock.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
@ -51,23 +53,29 @@ int GroupSize = 1;
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove);
|
static Datum ActivateNode(char *nodeName, int nodePort);
|
||||||
|
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
|
||||||
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
static Datum AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId,
|
||||||
char *nodeRack, bool hasMetadata, bool *nodeAlreadyExists);
|
char *nodeRack, bool hasMetadata, bool isActive,
|
||||||
|
bool *nodeAlreadyExists);
|
||||||
|
static void SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
||||||
|
static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort);
|
||||||
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
static Datum GenerateNodeTuple(WorkerNode *workerNode);
|
||||||
static int32 GetNextGroupId(void);
|
static int32 GetNextGroupId(void);
|
||||||
static uint32 GetMaxGroupId(void);
|
static uint32 GetMaxGroupId(void);
|
||||||
static int GetNextNodeId(void);
|
static int GetNextNodeId(void);
|
||||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
|
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, uint32 groupId,
|
||||||
char *nodeRack, bool hasMetadata);
|
char *nodeRack, bool hasMetadata, bool isActive);
|
||||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||||
static List * ParseWorkerNodeFileAndRename(void);
|
static List * ParseWorkerNodeFileAndRename(void);
|
||||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||||
|
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(master_add_node);
|
PG_FUNCTION_INFO_V1(master_add_node);
|
||||||
|
PG_FUNCTION_INFO_V1(master_add_inactive_node);
|
||||||
PG_FUNCTION_INFO_V1(master_remove_node);
|
PG_FUNCTION_INFO_V1(master_remove_node);
|
||||||
PG_FUNCTION_INFO_V1(master_disable_node);
|
PG_FUNCTION_INFO_V1(master_disable_node);
|
||||||
|
PG_FUNCTION_INFO_V1(master_activate_node);
|
||||||
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
|
PG_FUNCTION_INFO_V1(master_initialize_node_metadata);
|
||||||
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
||||||
|
|
||||||
|
@ -85,23 +93,47 @@ master_add_node(PG_FUNCTION_ARGS)
|
||||||
int32 groupId = 0;
|
int32 groupId = 0;
|
||||||
char *nodeRack = WORKER_DEFAULT_RACK;
|
char *nodeRack = WORKER_DEFAULT_RACK;
|
||||||
bool hasMetadata = false;
|
bool hasMetadata = false;
|
||||||
|
bool isActive = false;
|
||||||
bool nodeAlreadyExists = false;
|
bool nodeAlreadyExists = false;
|
||||||
|
|
||||||
Datum returnData = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||||
hasMetadata, &nodeAlreadyExists);
|
hasMetadata, isActive, &nodeAlreadyExists);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* After adding new node, if the node is not already exist, we replicate all existing
|
* After adding new node, if the node did not already exist, we will activate
|
||||||
* reference tables to the new node. ReplicateAllReferenceTablesToAllNodes replicates
|
* the node. This means we will replicate all reference tables to the new
|
||||||
* reference tables to all nodes however, it skips nodes which already has healthy
|
* node.
|
||||||
* placement of particular reference table.
|
|
||||||
*/
|
*/
|
||||||
if (!nodeAlreadyExists)
|
if (!nodeAlreadyExists)
|
||||||
{
|
{
|
||||||
ReplicateAllReferenceTablesToAllNodes();
|
nodeRecord = ActivateNode(nodeNameString, nodePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_CSTRING(returnData);
|
PG_RETURN_CSTRING(nodeRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_add_inactive_node function adds a new node to the cluster as inactive node
|
||||||
|
* and returns information about newly added node. It does not replicate reference
|
||||||
|
* tables to the new node, it only adds new node to the pg_dist_node table.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_add_inactive_node(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
int32 groupId = 0;
|
||||||
|
char *nodeRack = WORKER_DEFAULT_RACK;
|
||||||
|
bool hasMetadata = false;
|
||||||
|
bool isActive = false;
|
||||||
|
bool nodeAlreadyExists = false;
|
||||||
|
|
||||||
|
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||||
|
hasMetadata, isActive, &nodeAlreadyExists);
|
||||||
|
|
||||||
|
PG_RETURN_CSTRING(nodeRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -120,35 +152,108 @@ master_remove_node(PG_FUNCTION_ARGS)
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
bool forceRemove = false;
|
|
||||||
RemoveNodeFromCluster(nodeNameString, nodePort, forceRemove);
|
RemoveNodeFromCluster(nodeNameString, nodePort);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_disable_node function removes the provided node from the pg_dist_node table of
|
* master_disable_node function sets isactive value of the provided node as inactive
|
||||||
* the master node and all nodes with metadata regardless of the node having an active
|
* at master node and all nodes with metadata regardless of the node having an
|
||||||
* shard placement.
|
* active shard placement.
|
||||||
* The call to the master_remove_node should be done by the super user.
|
* The call to the master_disable_node must be done by the super user.
|
||||||
* This function also deletes all reference table placements belong to the given node from
|
* This function also deletes all reference table placements belong to the given
|
||||||
* pg_dist_shard_placement, but it does not drop actual placement at the node. In the case
|
* node from pg_dist_shard_placement, but it does not drop actual placement at
|
||||||
* of re-adding the node, master_add_node first drops and re-creates the reference tables.
|
* the node. In the case of re-activating the node, master_add_node first drops
|
||||||
|
* and re-creates the reference tables.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
master_disable_node(PG_FUNCTION_ARGS)
|
master_disable_node(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
text *nodeNameText = PG_GETARG_TEXT_P(0);
|
||||||
int32 nodePort = PG_GETARG_INT32(1);
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
char *nodeNameString = text_to_cstring(nodeName);
|
|
||||||
bool forceRemove = true;
|
char *nodeName = text_to_cstring(nodeNameText);
|
||||||
RemoveNodeFromCluster(nodeNameString, nodePort, forceRemove);
|
bool hasShardPlacements = false;
|
||||||
|
bool isActive = false;
|
||||||
|
|
||||||
|
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
|
||||||
|
|
||||||
|
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
|
||||||
|
if (hasShardPlacements)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries "
|
||||||
|
"may fail after this operation. Use "
|
||||||
|
"SELECT master_activate_node('%s', %d) to activate this "
|
||||||
|
"node back.",
|
||||||
|
nodeName, nodePort, nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
SetNodeState(nodeName, nodePort, isActive);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_activate_node UDF activates the given node. It sets the node's isactive
|
||||||
|
* value to active and replicates all reference tables to that node.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_activate_node(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
|
||||||
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
Datum nodeRecord = 0;
|
||||||
|
|
||||||
|
nodeRecord = ActivateNode(nodeNameString, nodePort);
|
||||||
|
|
||||||
|
PG_RETURN_CSTRING(nodeRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
|
||||||
|
* includes only replicating the reference tables and setting isactive column of the
|
||||||
|
* given node.
|
||||||
|
*/
|
||||||
|
static Datum
|
||||||
|
ActivateNode(char *nodeName, int nodePort)
|
||||||
|
{
|
||||||
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
|
CommandId commandId = GetCurrentCommandId(true);
|
||||||
|
LockTupleMode lockTupleMode = LockTupleExclusive;
|
||||||
|
LockWaitPolicy lockWaitPolicy = LockWaitError;
|
||||||
|
bool followUpdates = false;
|
||||||
|
Buffer buffer = 0;
|
||||||
|
HeapUpdateFailureData heapUpdateFailureData;
|
||||||
|
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
bool isActive = true;
|
||||||
|
Datum nodeRecord = 0;
|
||||||
|
|
||||||
|
heap_lock_tuple(pgDistNode, heapTuple, commandId, lockTupleMode, lockWaitPolicy,
|
||||||
|
followUpdates, &buffer, &heapUpdateFailureData);
|
||||||
|
ReleaseBuffer(buffer);
|
||||||
|
|
||||||
|
SetNodeState(nodeName, nodePort, isActive);
|
||||||
|
|
||||||
|
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
|
||||||
|
|
||||||
|
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
|
nodeRecord = GenerateNodeTuple(workerNode);
|
||||||
|
|
||||||
|
heap_close(pgDistNode, AccessShareLock);
|
||||||
|
|
||||||
|
return nodeRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_initialize_node_metadata is run once, when upgrading citus. It ingests the
|
* master_initialize_node_metadata is run once, when upgrading citus. It ingests the
|
||||||
* existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating
|
* existing pg_worker_list.conf into pg_dist_node, then adds a header to the file stating
|
||||||
|
@ -166,7 +271,10 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
||||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||||
|
|
||||||
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
|
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
|
||||||
workerNode->workerRack, false, &nodeAlreadyExists);
|
workerNode->workerRack, false, workerNode->isActive,
|
||||||
|
&nodeAlreadyExists);
|
||||||
|
|
||||||
|
ActivateNode(workerNode->workerName, workerNode->workerPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_BOOL(true);
|
PG_RETURN_BOOL(true);
|
||||||
|
@ -329,15 +437,14 @@ ReadWorkerNodes()
|
||||||
* RemoveNodeFromCluster removes the provided node from the pg_dist_node table of
|
* RemoveNodeFromCluster removes the provided node from the pg_dist_node table of
|
||||||
* the master node and all nodes with metadata.
|
* the master node and all nodes with metadata.
|
||||||
* The call to the master_remove_node should be done by the super user. If there are
|
* The call to the master_remove_node should be done by the super user. If there are
|
||||||
* active shard placements on the node; the function removes the node when forceRemove
|
* active shard placements on the node; the function errors out.
|
||||||
* flag is set, it errors out otherwise.
|
|
||||||
* This function also deletes all reference table placements belong to the given node from
|
* This function also deletes all reference table placements belong to the given node from
|
||||||
* pg_dist_shard_placement, but it does not drop actual placement at the node. It also
|
* pg_dist_shard_placement, but it does not drop actual placement at the node. It also
|
||||||
* modifies replication factor of the colocation group of reference tables, so that
|
* modifies replication factor of the colocation group of reference tables, so that
|
||||||
* replication factor will be equal to worker count.
|
* replication factor will be equal to worker count.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
RemoveNodeFromCluster(char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
char *nodeDeleteCommand = NULL;
|
char *nodeDeleteCommand = NULL;
|
||||||
bool hasShardPlacements = false;
|
bool hasShardPlacements = false;
|
||||||
|
@ -370,7 +477,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
||||||
Oid firstReferenceTableId = linitial_oid(referenceTableList);
|
Oid firstReferenceTableId = linitial_oid(referenceTableList);
|
||||||
uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId);
|
uint32 referenceTableColocationId = TableColocationId(firstReferenceTableId);
|
||||||
|
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
int workerCount = list_length(workerNodeList);
|
int workerCount = list_length(workerNodeList);
|
||||||
|
|
||||||
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
|
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
|
||||||
|
@ -378,21 +485,10 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
||||||
|
|
||||||
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
|
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
|
||||||
if (hasShardPlacements)
|
if (hasShardPlacements)
|
||||||
{
|
|
||||||
if (forceRemove)
|
|
||||||
{
|
|
||||||
ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some "
|
|
||||||
"queries may fail after this operation. Use "
|
|
||||||
"select master_add_node('%s', %d) to add this "
|
|
||||||
"node back.",
|
|
||||||
nodeName, nodePort, nodeName, nodePort)));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("you cannot remove a node which has active "
|
ereport(ERROR, (errmsg("you cannot remove a node which has active "
|
||||||
"shard placements")));
|
"shard placements")));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
|
nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
|
||||||
|
|
||||||
|
@ -414,7 +510,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
|
||||||
*/
|
*/
|
||||||
static Datum
|
static Datum
|
||||||
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
bool hasMetadata, bool *nodeAlreadyExists)
|
bool hasMetadata, bool isActive, bool *nodeAlreadyExists)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = NULL;
|
Relation pgDistNode = NULL;
|
||||||
int nextNodeIdInt = 0;
|
int nextNodeIdInt = 0;
|
||||||
|
@ -465,7 +561,8 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
/* generate the new node id from the sequence */
|
/* generate the new node id from the sequence */
|
||||||
nextNodeIdInt = GetNextNodeId();
|
nextNodeIdInt = GetNextNodeId();
|
||||||
|
|
||||||
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata);
|
InsertNodeRow(nextNodeIdInt, nodeName, nodePort, groupId, nodeRack, hasMetadata,
|
||||||
|
isActive);
|
||||||
|
|
||||||
workerNode = FindWorkerNode(nodeName, nodePort);
|
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
|
|
||||||
|
@ -488,6 +585,82 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetNodeState function sets the isactive column of the specified worker in
|
||||||
|
* pg_dist_node to true.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SetNodeState(char *nodeName, int32 nodePort, bool isActive)
|
||||||
|
{
|
||||||
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort);
|
||||||
|
|
||||||
|
Datum values[Natts_pg_dist_node];
|
||||||
|
bool isnull[Natts_pg_dist_node];
|
||||||
|
bool replace[Natts_pg_dist_node];
|
||||||
|
|
||||||
|
char *nodeStateUpdateCommand = NULL;
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
memset(replace, 0, sizeof(replace));
|
||||||
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
||||||
|
isnull[Anum_pg_dist_node_isactive - 1] = false;
|
||||||
|
replace[Anum_pg_dist_node_isactive - 1] = true;
|
||||||
|
|
||||||
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple);
|
||||||
|
|
||||||
|
CatalogUpdateIndexes(pgDistNode, heapTuple);
|
||||||
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
heap_close(pgDistNode, AccessShareLock);
|
||||||
|
|
||||||
|
/* we also update isactive column at worker nodes */
|
||||||
|
workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
|
nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive);
|
||||||
|
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNodeTuple function returns heap tuple of given nodeName and nodePort. If
|
||||||
|
* there are no node tuple with specified nodeName and nodePort, this function
|
||||||
|
* errors out.
|
||||||
|
*/
|
||||||
|
static HeapTuple
|
||||||
|
GetNodeTuple(char *nodeName, int32 nodePort)
|
||||||
|
{
|
||||||
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
const int scanKeyCount = 2;
|
||||||
|
const bool indexOK = false;
|
||||||
|
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
|
||||||
|
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
|
||||||
|
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
|
||||||
|
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort));
|
||||||
|
scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistNode, AccessShareLock);
|
||||||
|
|
||||||
|
return heapTuple;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateNodeTuple gets a worker node and return a heap tuple of
|
* GenerateNodeTuple gets a worker node and return a heap tuple of
|
||||||
* given worker node.
|
* given worker node.
|
||||||
|
@ -512,6 +685,7 @@ GenerateNodeTuple(WorkerNode *workerNode)
|
||||||
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
|
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(workerNode->workerPort);
|
||||||
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
|
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(workerNode->workerRack);
|
||||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata);
|
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(workerNode->hasMetadata);
|
||||||
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(workerNode->isActive);
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and insert new tuple */
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
@ -519,7 +693,6 @@ GenerateNodeTuple(WorkerNode *workerNode)
|
||||||
/* generate the tuple */
|
/* generate the tuple */
|
||||||
tupleDescriptor = RelationGetDescr(pgDistNode);
|
tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
nodeDatum = HeapTupleGetDatum(heapTuple);
|
nodeDatum = HeapTupleGetDatum(heapTuple);
|
||||||
|
|
||||||
/* close the relation */
|
/* close the relation */
|
||||||
|
@ -650,7 +823,7 @@ EnsureCoordinator(void)
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
|
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *nodeRack,
|
||||||
bool hasMetadata)
|
bool hasMetadata, bool isActive)
|
||||||
{
|
{
|
||||||
Relation pgDistNode = NULL;
|
Relation pgDistNode = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
@ -668,6 +841,7 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, uint32 groupId, char *
|
||||||
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
|
values[Anum_pg_dist_node_nodeport - 1] = UInt32GetDatum(nodePort);
|
||||||
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
|
values[Anum_pg_dist_node_noderack - 1] = CStringGetTextDatum(nodeRack);
|
||||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
|
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
|
||||||
|
values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive);
|
||||||
|
|
||||||
/* open shard relation and insert new tuple */
|
/* open shard relation and insert new tuple */
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
pgDistNode = heap_open(DistNodeRelationId(), AccessExclusiveLock);
|
||||||
|
@ -867,6 +1041,7 @@ ParseWorkerNodeFileAndRename()
|
||||||
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
|
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
|
||||||
workerNode->workerPort = nodePort;
|
workerNode->workerPort = nodePort;
|
||||||
workerNode->hasMetadata = false;
|
workerNode->hasMetadata = false;
|
||||||
|
workerNode->isActive = false;
|
||||||
|
|
||||||
workerNodeList = lappend(workerNodeList, workerNode);
|
workerNodeList = lappend(workerNodeList, workerNode);
|
||||||
}
|
}
|
||||||
|
@ -906,6 +1081,8 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
tupleDescriptor, &isNull);
|
tupleDescriptor, &isNull);
|
||||||
Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata,
|
Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata,
|
||||||
tupleDescriptor, &isNull);
|
tupleDescriptor, &isNull);
|
||||||
|
Datum isActive = heap_getattr(heapTuple, Anum_pg_dist_node_isactive,
|
||||||
|
tupleDescriptor, &isNull);
|
||||||
|
|
||||||
Assert(!HeapTupleHasNulls(heapTuple));
|
Assert(!HeapTupleHasNulls(heapTuple));
|
||||||
|
|
||||||
|
@ -916,6 +1093,7 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
|
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
|
||||||
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
|
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
|
||||||
workerNode->hasMetadata = DatumGetBool(hasMetadata);
|
workerNode->hasMetadata = DatumGetBool(hasMetadata);
|
||||||
|
workerNode->isActive = DatumGetBool(isActive);
|
||||||
|
|
||||||
return workerNode;
|
return workerNode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,8 @@
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
|
static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
|
||||||
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
|
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
|
||||||
|
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
|
||||||
|
int nodePort);
|
||||||
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
|
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
|
||||||
static int CompareOids(const void *leftElement, const void *rightElement);
|
static int CompareOids(const void *leftElement, const void *rightElement);
|
||||||
|
|
||||||
|
@ -114,22 +116,19 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReplicateAllReferenceTablesToAllNodes function finds all reference tables and
|
* ReplicateAllReferenceTablesToNode function finds all reference tables and
|
||||||
* replicates them to all worker nodes. It also modifies pg_dist_colocation table to
|
* replicates them to the given worker node. It also modifies pg_dist_colocation
|
||||||
* update the replication factor column. This function skips a worker node if that node
|
* table to update the replication factor column when necessary. This function
|
||||||
* already has healthy placement of a particular reference table to prevent unnecessary
|
* skips reference tables if that node already has healthy placement of that
|
||||||
* data transfer.
|
* reference table to prevent unnecessary data transfer.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
ReplicateAllReferenceTablesToAllNodes()
|
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
List *referenceTableList = ReferenceTableOidList();
|
List *referenceTableList = ReferenceTableOidList();
|
||||||
ListCell *referenceTableCell = NULL;
|
ListCell *referenceTableCell = NULL;
|
||||||
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
Relation pgDistNode = NULL;
|
uint32 workerCount = 0;
|
||||||
List *workerNodeList = NIL;
|
|
||||||
int workerCount = 0;
|
|
||||||
|
|
||||||
Oid firstReferenceTableId = InvalidOid;
|
Oid firstReferenceTableId = InvalidOid;
|
||||||
uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
|
uint32 referenceTableColocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
|
@ -139,12 +138,6 @@ ReplicateAllReferenceTablesToAllNodes()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
|
|
||||||
pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
|
||||||
workerNodeList = WorkerNodeList();
|
|
||||||
workerCount = list_length(workerNodeList);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We sort the reference table list to prevent deadlocks in concurrent
|
* We sort the reference table list to prevent deadlocks in concurrent
|
||||||
* ReplicateAllReferenceTablesToAllNodes calls.
|
* ReplicateAllReferenceTablesToAllNodes calls.
|
||||||
|
@ -156,14 +149,10 @@ ReplicateAllReferenceTablesToAllNodes()
|
||||||
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
||||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
char *relationName = get_rel_name(referenceTableId);
|
|
||||||
|
|
||||||
LockShardDistributionMetadata(shardId, ExclusiveLock);
|
LockShardDistributionMetadata(shardId, ExclusiveLock);
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to all workers",
|
ReplicateShardToNode(shardInterval, nodeName, nodePort);
|
||||||
relationName)));
|
|
||||||
|
|
||||||
ReplicateShardToAllWorkers(shardInterval);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -171,10 +160,10 @@ ReplicateAllReferenceTablesToAllNodes()
|
||||||
* colocation group of reference tables so that worker count will be equal to
|
* colocation group of reference tables so that worker count will be equal to
|
||||||
* replication factor again.
|
* replication factor again.
|
||||||
*/
|
*/
|
||||||
|
workerCount = list_length(workerNodeList);
|
||||||
firstReferenceTableId = linitial_oid(referenceTableList);
|
firstReferenceTableId = linitial_oid(referenceTableList);
|
||||||
referenceTableColocationId = TableColocationId(firstReferenceTableId);
|
referenceTableColocationId = TableColocationId(firstReferenceTableId);
|
||||||
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
|
UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount);
|
||||||
heap_close(pgDistNode, NoLock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -228,28 +217,18 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReplicateShardToAllWorkers function replicates given shard to the given worker nodes
|
* ReplicateShardToAllWorkers function replicates given shard to the all worker nodes
|
||||||
* in a separate transactions. While replicating, it only replicates the shard to the
|
* in separate transactions. While replicating, it only replicates the shard to the
|
||||||
* workers which does not have a healthy replica of the shard. This function also modifies
|
* workers which does not have a healthy replica of the shard. However, this function
|
||||||
* metadata by inserting/updating related rows in pg_dist_shard_placement. However, this
|
* does not obtain any lock on shard resource and shard metadata. It is caller's
|
||||||
* function does not obtain any lock on shard resource and shard metadata. It is caller's
|
|
||||||
* responsibility to take those locks.
|
* responsibility to take those locks.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ReplicateShardToAllWorkers(ShardInterval *shardInterval)
|
ReplicateShardToAllWorkers(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardInterval->shardId;
|
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
|
||||||
bool missingOk = false;
|
|
||||||
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk);
|
|
||||||
char *srcNodeName = sourceShardPlacement->nodeName;
|
|
||||||
uint32 srcNodePort = sourceShardPlacement->nodePort;
|
|
||||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
|
||||||
List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort);
|
|
||||||
|
|
||||||
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
|
/* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */
|
||||||
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock);
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
ListCell *workerNodeCell = NULL;
|
ListCell *workerNodeCell = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -263,11 +242,37 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
|
||||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||||
char *nodeName = workerNode->workerName;
|
char *nodeName = workerNode->workerName;
|
||||||
uint32 nodePort = workerNode->workerPort;
|
uint32 nodePort = workerNode->workerPort;
|
||||||
bool missingWorkerOk = true;
|
|
||||||
|
|
||||||
|
ReplicateShardToNode(shardInterval, nodeName, nodePort);
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(pgDistNode, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReplicateShardToNode function replicates given shard to the given worker node
|
||||||
|
* in a separate transaction. While replicating, it only replicates the shard to the
|
||||||
|
* workers which does not have a healthy replica of the shard. This function also modifies
|
||||||
|
* metadata by inserting/updating related rows in pg_dist_shard_placement.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
||||||
|
{
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
bool missingOk = false;
|
||||||
|
ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk);
|
||||||
|
char *srcNodeName = sourceShardPlacement->nodeName;
|
||||||
|
uint32 srcNodePort = sourceShardPlacement->nodePort;
|
||||||
|
List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort);
|
||||||
|
|
||||||
|
List *shardPlacementList = ShardPlacementList(shardId);
|
||||||
|
bool missingWorkerOk = true;
|
||||||
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
nodeName, nodePort,
|
nodeName, nodePort,
|
||||||
missingWorkerOk);
|
missingWorkerOk);
|
||||||
|
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Although this function is used for reference tables and reference table shard
|
* Although this function is used for reference tables and reference table shard
|
||||||
|
@ -280,6 +285,10 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
uint64 placementId = 0;
|
uint64 placementId = 0;
|
||||||
|
|
||||||
|
ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d",
|
||||||
|
get_rel_name(shardInterval->relationId), nodeName,
|
||||||
|
nodePort)));
|
||||||
|
|
||||||
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
|
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
|
||||||
ddlCommandList);
|
ddlCommandList);
|
||||||
if (targetPlacement == NULL)
|
if (targetPlacement == NULL)
|
||||||
|
@ -312,9 +321,6 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(pgDistNode, NoLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to
|
* ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to
|
||||||
|
@ -355,7 +361,7 @@ uint32
|
||||||
CreateReferenceTableColocationId()
|
CreateReferenceTableColocationId()
|
||||||
{
|
{
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = ActiveWorkerNodeList();
|
||||||
int shardCount = 1;
|
int shardCount = 1;
|
||||||
int replicationFactor = list_length(workerNodeList);
|
int replicationFactor = list_length(workerNodeList);
|
||||||
Oid distributionColumnType = InvalidOid;
|
Oid distributionColumnType = InvalidOid;
|
||||||
|
|
|
@ -30,6 +30,7 @@ extern char * NodeListInsertCommand(List *workerNodeList);
|
||||||
extern List * ShardListInsertCommand(List *shardIntervalList);
|
extern List * ShardListInsertCommand(List *shardIntervalList);
|
||||||
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
|
extern List * ShardDeleteCommandList(ShardInterval *shardInterval);
|
||||||
extern char * NodeDeleteCommand(uint32 nodeId);
|
extern char * NodeDeleteCommand(uint32 nodeId);
|
||||||
|
extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive);
|
||||||
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
|
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
|
||||||
extern char * CreateSchemaDDLCommand(Oid schemaId);
|
extern char * CreateSchemaDDLCommand(Oid schemaId);
|
||||||
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
|
||||||
|
|
|
@ -23,6 +23,7 @@ typedef struct FormData_pg_dist_node
|
||||||
text nodename;
|
text nodename;
|
||||||
int nodeport;
|
int nodeport;
|
||||||
bool hasmetadata;
|
bool hasmetadata;
|
||||||
|
bool isactive
|
||||||
#endif
|
#endif
|
||||||
} FormData_pg_dist_node;
|
} FormData_pg_dist_node;
|
||||||
|
|
||||||
|
@ -37,13 +38,14 @@ typedef FormData_pg_dist_node *Form_pg_dist_node;
|
||||||
* compiler constants for pg_dist_node
|
* compiler constants for pg_dist_node
|
||||||
* ----------------
|
* ----------------
|
||||||
*/
|
*/
|
||||||
#define Natts_pg_dist_node 6
|
#define Natts_pg_dist_node 7
|
||||||
#define Anum_pg_dist_node_nodeid 1
|
#define Anum_pg_dist_node_nodeid 1
|
||||||
#define Anum_pg_dist_node_groupid 2
|
#define Anum_pg_dist_node_groupid 2
|
||||||
#define Anum_pg_dist_node_nodename 3
|
#define Anum_pg_dist_node_nodename 3
|
||||||
#define Anum_pg_dist_node_nodeport 4
|
#define Anum_pg_dist_node_nodeport 4
|
||||||
#define Anum_pg_dist_node_noderack 5
|
#define Anum_pg_dist_node_noderack 5
|
||||||
#define Anum_pg_dist_node_hasmetadata 6
|
#define Anum_pg_dist_node_hasmetadata 6
|
||||||
|
#define Anum_pg_dist_node_isactive 7
|
||||||
|
|
||||||
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
|
#define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq"
|
||||||
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
|
#define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq"
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
#define REFERENCE_TABLE_UTILS_H_
|
#define REFERENCE_TABLE_UTILS_H_
|
||||||
|
|
||||||
extern uint32 CreateReferenceTableColocationId(void);
|
extern uint32 CreateReferenceTableColocationId(void);
|
||||||
extern void ReplicateAllReferenceTablesToAllNodes(void);
|
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
|
||||||
extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName,
|
extern void DeleteAllReferenceTablePlacementsFromNode(char *workerName,
|
||||||
uint32 workerPort);
|
uint32 workerPort);
|
||||||
extern List * ReferenceTableOidList(void);
|
extern List * ReferenceTableOidList(void);
|
||||||
|
|
|
@ -30,7 +30,6 @@
|
||||||
#define WORKER_RACK_TRIES 5
|
#define WORKER_RACK_TRIES 5
|
||||||
#define WORKER_DEFAULT_RACK "default"
|
#define WORKER_DEFAULT_RACK "default"
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In memory representation of pg_dist_node table elements. The elements are hold in
|
* In memory representation of pg_dist_node table elements. The elements are hold in
|
||||||
* WorkerNodeHash table.
|
* WorkerNodeHash table.
|
||||||
|
@ -43,6 +42,7 @@ typedef struct WorkerNode
|
||||||
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
|
uint32 groupId; /* node's groupId; same for the nodes that are in the same group */
|
||||||
char workerRack[WORKER_LENGTH]; /* node's network location */
|
char workerRack[WORKER_LENGTH]; /* node's network location */
|
||||||
bool hasMetadata; /* node gets metadata changes */
|
bool hasMetadata; /* node gets metadata changes */
|
||||||
|
bool isActive; /* node's state */
|
||||||
} WorkerNode;
|
} WorkerNode;
|
||||||
|
|
||||||
|
|
||||||
|
@ -59,7 +59,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
|
||||||
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
|
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
|
||||||
extern WorkerNode * WorkerGetNodeWithName(const char *hostname);
|
extern WorkerNode * WorkerGetNodeWithName(const char *hostname);
|
||||||
extern uint32 WorkerGetLiveNodeCount(void);
|
extern uint32 WorkerGetLiveNodeCount(void);
|
||||||
extern List * WorkerNodeList(void);
|
extern List * ActiveWorkerNodeList(void);
|
||||||
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
|
||||||
extern List * ReadWorkerNodes(void);
|
extern List * ReadWorkerNodes(void);
|
||||||
extern void EnsureCoordinator(void);
|
extern void EnsureCoordinator(void);
|
||||||
|
|
|
@ -10,14 +10,14 @@ DETAIL: There are no active worker nodes.
|
||||||
-- add the nodes to the cluster
|
-- add the nodes to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(1,1,localhost,57637,default,f)
|
(1,1,localhost,57637,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(2,2,localhost,57638,default,f)
|
(2,2,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- get the active nodes
|
-- get the active nodes
|
||||||
|
@ -30,9 +30,9 @@ SELECT master_get_active_worker_nodes();
|
||||||
|
|
||||||
-- try to add a node that is already in the cluster
|
-- try to add a node that is already in the cluster
|
||||||
SELECT * FROM master_add_node('localhost', :worker_1_port);
|
SELECT * FROM master_add_node('localhost', :worker_1_port);
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+-----------+----------+----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------
|
||||||
1 | 1 | localhost | 57637 | default | f
|
1 | 1 | localhost | 57637 | default | f | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- get the active nodes
|
-- get the active nodes
|
||||||
|
@ -60,8 +60,8 @@ SELECT master_get_active_worker_nodes();
|
||||||
-- try to disable a node with no placements see that node is removed
|
-- try to disable a node with no placements see that node is removed
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(3,3,localhost,57638,default,f)
|
(3,3,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_disable_node('localhost', :worker_2_port);
|
SELECT master_disable_node('localhost', :worker_2_port);
|
||||||
|
@ -77,10 +77,10 @@ SELECT master_get_active_worker_nodes();
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- add some shard placements to the cluster
|
-- add some shard placements to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_activate_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(4,4,localhost,57638,default,f)
|
(3,3,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
|
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
|
||||||
|
@ -125,7 +125,7 @@ INSERT INTO test_reference_table VALUES (1, '1');
|
||||||
-- try to disable a node with active placements see that node is removed
|
-- try to disable a node with active placements see that node is removed
|
||||||
-- observe that a notification is displayed
|
-- observe that a notification is displayed
|
||||||
SELECT master_disable_node('localhost', :worker_2_port);
|
SELECT master_disable_node('localhost', :worker_2_port);
|
||||||
NOTICE: Node localhost:57638 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57638) to add this node back.
|
NOTICE: Node localhost:57638 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57638) to activate this node back.
|
||||||
master_disable_node
|
master_disable_node
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
@ -140,8 +140,8 @@ SELECT master_get_active_worker_nodes();
|
||||||
-- restore the node for next tests
|
-- restore the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(5,5,localhost,57638,default,f)
|
(3,3,localhost,57638,default,f,f)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- try to remove a node with active placements and see that node removal is failed
|
-- try to remove a node with active placements and see that node removal is failed
|
||||||
|
@ -178,8 +178,8 @@ SELECT master_get_active_worker_nodes();
|
||||||
-- clean-up
|
-- clean-up
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(6,6,localhost,57638,default,f)
|
(4,4,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
|
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
|
||||||
|
@ -194,8 +194,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(7,7,localhost,57638,default,f)
|
(5,5,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
@ -223,8 +223,8 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
|
||||||
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
UPDATE pg_dist_node SET hasmetadata=false WHERE nodeport=:worker_1_port;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(8,8,localhost,57638,default,f)
|
(6,6,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
@ -244,8 +244,8 @@ SELECT
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+----------+----------+----------+-------------
|
--------+---------+----------+----------+----------+-------------+----------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- check that adding two nodes in the same transaction works
|
-- check that adding two nodes in the same transaction works
|
||||||
|
@ -253,15 +253,15 @@ SELECT
|
||||||
master_add_node('localhost', :worker_1_port),
|
master_add_node('localhost', :worker_1_port),
|
||||||
master_add_node('localhost', :worker_2_port);
|
master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node | master_add_node
|
master_add_node | master_add_node
|
||||||
---------------------------------+-----------------------------------
|
-----------------------------------+-----------------------------------
|
||||||
(9,9,localhost,57637,default,f) | (10,10,localhost,57638,default,f)
|
(7,7,localhost,57637,default,f,t) | (8,8,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+-----------+----------+----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------
|
||||||
9 | 9 | localhost | 57637 | default | f
|
7 | 7 | localhost | 57637 | default | f | t
|
||||||
10 | 10 | localhost | 57638 | default | f
|
8 | 8 | localhost | 57638 | default | f | t
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- check that mixed add/remove node commands work fine inside transaction
|
-- check that mixed add/remove node commands work fine inside transaction
|
||||||
|
@ -275,7 +275,7 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-----------------------------------
|
||||||
(11,11,localhost,57638,default,f)
|
(9,9,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
@ -294,8 +294,8 @@ UPDATE pg_dist_node SET hasmetadata=true WHERE nodeport=:worker_1_port;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-------------------------------------
|
||||||
(12,12,localhost,57638,default,f)
|
(10,10,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_remove_node('localhost', :worker_2_port);
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
@ -306,8 +306,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-------------------------------------
|
||||||
(13,13,localhost,57638,default,f)
|
(11,11,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -334,14 +334,14 @@ SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-------------------------------------
|
||||||
(14,14,localhost,57637,default,f)
|
(12,12,localhost,57637,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-------------------------------------
|
||||||
(15,15,localhost,57638,default,f)
|
(13,13,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- check that a distributed table can be created after adding a node in a transaction
|
-- check that a distributed table can be created after adding a node in a transaction
|
||||||
|
@ -354,8 +354,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
-----------------------------------
|
-------------------------------------
|
||||||
(16,16,localhost,57638,default,f)
|
(14,14,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
CREATE TABLE temp(col1 text, col2 int);
|
CREATE TABLE temp(col1 text, col2 int);
|
||||||
|
|
|
@ -22,14 +22,14 @@ CREATE EXTENSION citus;
|
||||||
-- re-add the nodes to the cluster
|
-- re-add the nodes to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(1,1,localhost,57637,default,f)
|
(1,1,localhost,57637,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(2,2,localhost,57638,default,f)
|
(2,2,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify that a table can be created after the extension has been dropped and recreated
|
-- verify that a table can be created after the extension has been dropped and recreated
|
||||||
|
|
|
@ -78,6 +78,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-16';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.2-3';
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
|
|
|
@ -29,10 +29,10 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
||||||
-- pg_dist_node entries and reference tables
|
-- pg_dist_node entries and reference tables
|
||||||
SELECT unnest(master_metadata_snapshot());
|
SELECT unnest(master_metadata_snapshot());
|
||||||
unnest
|
unnest
|
||||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
-- Create a test table with constraints and SERIAL
|
-- Create a test table with constraints and SERIAL
|
||||||
|
@ -58,7 +58,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||

|

|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||||
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
||||||
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||||
|
@ -80,7 +80,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||

|

|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||||
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
||||||
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||||
|
@ -104,7 +104,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||

|

|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||||
|
@ -134,7 +134,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||

|

|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||||
|
@ -157,7 +157,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||

|

|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||||
|
@ -203,10 +203,10 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+-----------+----------+----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------
|
||||||
1 | 1 | localhost | 57637 | default | t
|
1 | 1 | localhost | 57637 | default | t | t
|
||||||
2 | 2 | localhost | 57638 | default | f
|
2 | 2 | localhost | 57638 | default | f | t
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
|
@ -333,10 +333,10 @@ SELECT * FROM pg_dist_local_group;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+-----------+----------+----------+-------------
|
--------+---------+-----------+----------+----------+-------------+----------
|
||||||
1 | 1 | localhost | 57637 | default | t
|
1 | 1 | localhost | 57637 | default | t | t
|
||||||
2 | 2 | localhost | 57638 | default | f
|
2 | 2 | localhost | 57638 | default | f | t
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
SELECT * FROM pg_dist_partition ORDER BY logicalrelid;
|
||||||
|
@ -1134,8 +1134,8 @@ SELECT create_distributed_table('mx_table', 'a');
|
||||||
\c - postgres - :master_port
|
\c - postgres - :master_port
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(4,4,localhost,57638,default,f)
|
(4,4,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
|
@ -1316,10 +1316,10 @@ WHERE logicalrelid='mx_ref'::regclass;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "mx_ref" to all workers
|
NOTICE: Replicating reference table "mx_ref" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(5,5,localhost,57638,default,f)
|
(5,5,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT shardid, nodename, nodeport
|
SELECT shardid, nodename, nodeport
|
||||||
|
|
|
@ -202,6 +202,19 @@ SELECT create_distributed_table('mx_sequence', 'key');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
SELECT groupid FROM pg_dist_local_group;
|
||||||
|
groupid
|
||||||
|
---------
|
||||||
|
12
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM mx_sequence_value_seq;
|
||||||
|
sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called
|
||||||
|
-----------------------+------------------+------------------+--------------+------------------+------------------+-------------+---------+-----------+-----------
|
||||||
|
mx_sequence_value_seq | 3377699720527873 | 3377699720527873 | 1 | 3659174697238529 | 3377699720527873 | 1 | 0 | f | f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
SELECT groupid FROM pg_dist_local_group;
|
SELECT groupid FROM pg_dist_local_group;
|
||||||
groupid
|
groupid
|
||||||
---------
|
---------
|
||||||
|
@ -214,19 +227,6 @@ SELECT * FROM mx_sequence_value_seq;
|
||||||
mx_sequence_value_seq | 3940649673949185 | 3940649673949185 | 1 | 4222124650659841 | 3940649673949185 | 1 | 0 | f | f
|
mx_sequence_value_seq | 3940649673949185 | 3940649673949185 | 1 | 4222124650659841 | 3940649673949185 | 1 | 0 | f | f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
|
||||||
SELECT groupid FROM pg_dist_local_group;
|
|
||||||
groupid
|
|
||||||
---------
|
|
||||||
16
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT * FROM mx_sequence_value_seq;
|
|
||||||
sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called
|
|
||||||
-----------------------+------------------+------------------+--------------+------------------+------------------+-------------+---------+-----------+-----------
|
|
||||||
mx_sequence_value_seq | 4503599627370497 | 4503599627370497 | 1 | 4785074604081153 | 4503599627370497 | 1 | 0 | f | f
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- the type of sequences can't be changed
|
-- the type of sequences can't be changed
|
||||||
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
|
ALTER TABLE mx_sequence ALTER value TYPE BIGINT;
|
||||||
|
|
|
@ -233,8 +233,8 @@ CREATE TABLE should_be_sorted_into_middle (value int);
|
||||||
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
|
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- Add "fake" pg_dist_transaction records and run recovery
|
-- Add "fake" pg_dist_transaction records and run recovery
|
||||||
INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_commit');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit');
|
||||||
INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_be_forgotten');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten');
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
NOTICE: recovered a prepared transaction on localhost:57637
|
NOTICE: recovered a prepared transaction on localhost:57637
|
||||||
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
|
CONTEXT: ROLLBACK PREPARED 'citus_0_should_abort'
|
||||||
|
|
|
@ -440,18 +440,18 @@ SELECT * FROM multiple_hash_mx WHERE category = '2' ORDER BY category, data;
|
||||||
INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
INSERT INTO app_analytics_events_mx VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
|
||||||
id
|
id
|
||||||
------------------
|
------------------
|
||||||
4503599627370497
|
3940649673949185
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (102, 'Wayz') RETURNING id;
|
||||||
id
|
id
|
||||||
------------------
|
------------------
|
||||||
4503599627370498
|
3940649673949186
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
INSERT INTO app_analytics_events_mx (app_id, name) VALUES (103, 'Mynt') RETURNING *;
|
||||||
id | app_id | name
|
id | app_id | name
|
||||||
------------------+--------+------
|
------------------+--------+------
|
||||||
4503599627370499 | 103 | Mynt
|
3940649673949187 | 103 | Mynt
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
|
|
@ -44,8 +44,8 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380000,1380000,localhost,57638,default,f)
|
(1380000,1380000,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- remove a node with reference table
|
-- remove a node with reference table
|
||||||
|
@ -164,10 +164,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
ERROR: could not find valid entry for node "localhost:57638"
|
ERROR: could not find valid entry for node "localhost:57638"
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380001,1380001,localhost,57638,default,f)
|
(1380001,1380001,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- remove node in a transaction and ROLLBACK
|
-- remove node in a transaction and ROLLBACK
|
||||||
|
@ -387,10 +387,10 @@ WHERE
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380002,1380002,localhost,57638,default,f)
|
(1380002,1380002,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test inserting a value then removing a node in a transaction
|
-- test inserting a value then removing a node in a transaction
|
||||||
|
@ -516,10 +516,10 @@ SELECT * FROM remove_node_reference_table;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380003,1380003,localhost,57638,default,f)
|
(1380003,1380003,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test executing DDL command then removing a node in a transaction
|
-- test executing DDL command then removing a node in a transaction
|
||||||
|
@ -641,10 +641,10 @@ Table "public.remove_node_reference_table"
|
||||||
|
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380004,1380004,localhost,57638,default,f)
|
(1380004,1380004,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test DROP table after removing a node in a transaction
|
-- test DROP table after removing a node in a transaction
|
||||||
|
@ -711,8 +711,8 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 1380000;
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380005,1380005,localhost,57638,default,f)
|
(1380005,1380005,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- re-create remove_node_reference_table
|
-- re-create remove_node_reference_table
|
||||||
|
@ -843,11 +843,11 @@ WHERE
|
||||||
|
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
NOTICE: Replicating reference table "table1" to all workers
|
NOTICE: Replicating reference table "table1" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380006,1380006,localhost,57638,default,f)
|
(1380006,1380006,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test with master_disable_node
|
-- test with master_disable_node
|
||||||
|
@ -915,7 +915,7 @@ SELECT master_disable_node('localhost', :worker_2_port);
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
0
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -936,14 +936,14 @@ WHERE colocationid IN
|
||||||
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
|
WHERE logicalrelid = 'remove_node_reference_table'::regclass);
|
||||||
colocationid | shardcount | replicationfactor | distributioncolumntype
|
colocationid | shardcount | replicationfactor | distributioncolumntype
|
||||||
--------------+------------+-------------------+------------------------
|
--------------+------------+-------------------+------------------------
|
||||||
1380001 | 1 | 1 | 0
|
1380001 | 1 | 2 | 0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
0
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -959,12 +959,12 @@ WHERE
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "remove_node_reference_table" to all workers
|
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:57638
|
||||||
NOTICE: Replicating reference table "table1" to all workers
|
NOTICE: Replicating reference table "table1" to the node localhost:57638
|
||||||
master_add_node
|
master_activate_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1380007,1380007,localhost,57638,default,f)
|
(1380006,1380006,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- DROP tables to clean workspace
|
-- DROP tables to clean workspace
|
||||||
|
|
|
@ -26,8 +26,8 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370000,1370000,localhost,57638,default,f)
|
(1370000,1370000,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify node is added
|
-- verify node is added
|
||||||
|
@ -71,7 +71,6 @@ SELECT create_reference_table('replicate_reference_table_unhealthy');
|
||||||
|
|
||||||
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
|
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_unhealthy" to all workers
|
|
||||||
ERROR: could not find any healthy placement for shard 1370000
|
ERROR: could not find any healthy placement for shard 1370000
|
||||||
-- verify node is not added
|
-- verify node is not added
|
||||||
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
|
@ -123,10 +122,10 @@ WHERE colocationid IN
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370002,1370002,localhost,57638,default,f)
|
(1370002,1370002,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- status after master_add_node
|
-- status after master_add_node
|
||||||
|
@ -178,8 +177,8 @@ WHERE colocationid IN
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370002,1370002,localhost,57638,default,f)
|
(1370002,1370002,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- status after master_add_node
|
-- status after master_add_node
|
||||||
|
@ -244,10 +243,10 @@ WHERE colocationid IN
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370003,1370003,localhost,57638,default,f)
|
(1370003,1370003,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
@ -306,10 +305,10 @@ WHERE colocationid IN
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370004,1370004,localhost,57638,default,f)
|
(1370004,1370004,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -401,13 +400,14 @@ ORDER BY logicalrelid;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370005,1370005,localhost,57638,default,f)
|
(1370005,1370005,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
|
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
|
||||||
|
NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
@ -482,7 +482,7 @@ SELECT create_reference_table('replicate_reference_table_insert');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO replicate_reference_table_insert VALUES(1);
|
INSERT INTO replicate_reference_table_insert VALUES(1);
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_insert" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_insert" to the node localhost:57638
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
DROP TABLE replicate_reference_table_insert;
|
DROP TABLE replicate_reference_table_insert;
|
||||||
|
@ -497,7 +497,7 @@ SELECT create_reference_table('replicate_reference_table_copy');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
COPY replicate_reference_table_copy FROM STDIN;
|
COPY replicate_reference_table_copy FROM STDIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_copy" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_copy" to the node localhost:57638
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
DROP TABLE replicate_reference_table_copy;
|
DROP TABLE replicate_reference_table_copy;
|
||||||
|
@ -514,7 +514,7 @@ ALTER TABLE replicate_reference_table_ddl ADD column2 int;
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_ddl" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_ddl" to the node localhost:57638
|
||||||
ERROR: cannot open new connections after the first modification command within a transaction
|
ERROR: cannot open new connections after the first modification command within a transaction
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
DROP TABLE replicate_reference_table_ddl;
|
DROP TABLE replicate_reference_table_ddl;
|
||||||
|
@ -550,10 +550,10 @@ WHERE colocationid IN
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers
|
NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370009,1370009,localhost,57638,default,f)
|
(1370009,1370009,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE replicate_reference_table_drop;
|
DROP TABLE replicate_reference_table_drop;
|
||||||
|
@ -612,10 +612,10 @@ WHERE colocationid IN
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
NOTICE: Replicating reference table "table1" to all workers
|
NOTICE: Replicating reference table "table1" to the node localhost:57638
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------------------
|
-----------------------------------------------
|
||||||
(1370010,1370010,localhost,57638,default,f)
|
(1370010,1370010,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- status after master_add_node
|
-- status after master_add_node
|
||||||
|
@ -643,6 +643,79 @@ WHERE colocationid IN
|
||||||
|
|
||||||
DROP TABLE replicate_reference_table_schema.table1;
|
DROP TABLE replicate_reference_table_schema.table1;
|
||||||
DROP SCHEMA replicate_reference_table_schema CASCADE;
|
DROP SCHEMA replicate_reference_table_schema CASCADE;
|
||||||
|
-- do some tests with inactive node
|
||||||
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
master_remove_node
|
||||||
|
--------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE initially_not_replicated_reference_table (key int);
|
||||||
|
SELECT create_reference_table('initially_not_replicated_reference_table');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_add_inactive_node('localhost', :worker_2_port);
|
||||||
|
master_add_inactive_node
|
||||||
|
-----------------------------------------------
|
||||||
|
(1370011,1370011,localhost,57638,default,f,f)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we should see only one shard placements
|
||||||
|
SELECT
|
||||||
|
shardid, shardstate, shardlength, nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT
|
||||||
|
shardid
|
||||||
|
FROM
|
||||||
|
pg_dist_shard
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
|
||||||
|
ORDER BY 1,4,5;
|
||||||
|
shardid | shardstate | shardlength | nodename | nodeport
|
||||||
|
---------+------------+-------------+-----------+----------
|
||||||
|
1370012 | 1 | 0 | localhost | 57637
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we should see the two shard placements after activation
|
||||||
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
|
NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:57638
|
||||||
|
master_activate_node
|
||||||
|
-----------------------------------------------
|
||||||
|
(1370011,1370011,localhost,57638,default,f,t)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
shardid, shardstate, shardlength, nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT
|
||||||
|
shardid
|
||||||
|
FROM
|
||||||
|
pg_dist_shard
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
|
||||||
|
ORDER BY 1,4,5;
|
||||||
|
shardid | shardstate | shardlength | nodename | nodeport
|
||||||
|
---------+------------+-------------+-----------+----------
|
||||||
|
1370012 | 1 | 0 | localhost | 57637
|
||||||
|
1370012 | 1 | 0 | localhost | 57638
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- this should have no effect
|
||||||
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
|
master_add_node
|
||||||
|
-----------------------------------------------
|
||||||
|
(1370011,1370011,localhost,57638,default,f,t)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- drop unnecassary tables
|
||||||
|
DROP TABLE initially_not_replicated_reference_table;
|
||||||
-- reload pg_dist_shard_placement table
|
-- reload pg_dist_shard_placement table
|
||||||
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
||||||
DROP TABLE tmp_shard_placement;
|
DROP TABLE tmp_shard_placement;
|
||||||
|
|
|
@ -79,14 +79,14 @@ CREATE EXTENSION citus;
|
||||||
-- re-add the nodes to the cluster
|
-- re-add the nodes to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(1,1,localhost,57637,default,f)
|
(1,1,localhost,57637,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
master_add_node
|
master_add_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(2,2,localhost,57638,default,f)
|
(2,2,localhost,57638,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- create a table with a SERIAL column
|
-- create a table with a SERIAL column
|
||||||
|
|
|
@ -224,8 +224,8 @@ SELECT master_add_node('localhost', 5432);
|
||||||
ERROR: operation is not allowed on this node
|
ERROR: operation is not allowed on this node
|
||||||
HINT: Connect to the coordinator and run it again.
|
HINT: Connect to the coordinator and run it again.
|
||||||
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
|
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
--------+---------+----------+----------+----------+-------------
|
--------+---------+----------+----------+----------+-------------+----------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- master_remove_node
|
-- master_remove_node
|
||||||
|
@ -235,8 +235,8 @@ NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
SELECT master_add_node('localhost', 5432);
|
SELECT master_add_node('localhost', 5432);
|
||||||
master_add_node
|
master_add_node
|
||||||
--------------------------------------------
|
----------------------------------------------
|
||||||
(1370000,1370000,localhost,5432,default,f)
|
(1370000,1370000,localhost,5432,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
@ -244,9 +244,9 @@ SELECT master_remove_node('localhost', 5432);
|
||||||
ERROR: operation is not allowed on this node
|
ERROR: operation is not allowed on this node
|
||||||
HINT: Connect to the coordinator and run it again.
|
HINT: Connect to the coordinator and run it again.
|
||||||
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
|
SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive
|
||||||
---------+---------+-----------+----------+----------+-------------
|
---------+---------+-----------+----------+----------+-------------+----------
|
||||||
1370000 | 1370000 | localhost | 5432 | default | f
|
1370000 | 1370000 | localhost | 5432 | default | f | t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
|
@ -95,6 +95,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
|
||||||
|
|
||||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
|
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638
|
||||||
ERROR: type "public.upgrade_test_composite_type" does not exist
|
ERROR: type "public.upgrade_test_composite_type" does not exist
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
DROP TABLE upgrade_reference_table_composite;
|
DROP TABLE upgrade_reference_table_composite;
|
||||||
|
@ -165,6 +166,7 @@ WHERE shardid IN
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
@ -277,6 +279,7 @@ WHERE shardid IN
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
@ -621,6 +624,7 @@ WHERE shardid IN
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
@ -733,6 +737,7 @@ WHERE shardid IN
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
@ -980,6 +985,7 @@ ORDER BY nodeport;
|
||||||
|
|
||||||
|
|
||||||
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
|
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
|
||||||
|
NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638
|
||||||
upgrade_to_reference_table
|
upgrade_to_reference_table
|
||||||
----------------------------
|
----------------------------
|
||||||
|
|
||||||
|
|
|
@ -586,7 +586,7 @@ SELECT shardid, nodename, nodeport
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
|
|
||||||
-- add the node back
|
-- add the node back
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_activate_node('localhost', :worker_1_port);
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
-- add two new shards and verify they are created at both workers
|
-- add two new shards and verify they are created at both workers
|
||||||
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
|
||||||
|
|
|
@ -740,7 +740,7 @@ SELECT shardid, nodename, nodeport
|
||||||
|
|
||||||
-- disable the first node
|
-- disable the first node
|
||||||
SELECT master_disable_node('localhost', :worker_1_port);
|
SELECT master_disable_node('localhost', :worker_1_port);
|
||||||
NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use select master_add_node('localhost', 57637) to add this node back.
|
NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57637) to activate this node back.
|
||||||
master_disable_node
|
master_disable_node
|
||||||
---------------------
|
---------------------
|
||||||
|
|
||||||
|
@ -766,12 +766,12 @@ SELECT shardid, nodename, nodeport
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
-- add the node back
|
-- add the node back
|
||||||
SELECT master_add_node('localhost', :worker_1_port);
|
SELECT master_activate_node('localhost', :worker_1_port);
|
||||||
NOTICE: Replicating reference table "nation" to all workers
|
NOTICE: Replicating reference table "nation" to the node localhost:57637
|
||||||
NOTICE: Replicating reference table "supplier" to all workers
|
NOTICE: Replicating reference table "supplier" to the node localhost:57637
|
||||||
master_add_node
|
master_activate_node
|
||||||
---------------------------------
|
-----------------------------------
|
||||||
(3,3,localhost,57637,default,f)
|
(1,1,localhost,57637,default,f,t)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
|
|
|
@ -33,7 +33,7 @@ SELECT master_disable_node('localhost', :worker_2_port);
|
||||||
SELECT master_get_active_worker_nodes();
|
SELECT master_get_active_worker_nodes();
|
||||||
|
|
||||||
-- add some shard placements to the cluster
|
-- add some shard placements to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
|
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
|
||||||
SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash');
|
SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash');
|
||||||
SELECT master_create_worker_shards('cluster_management_test', 16, 1);
|
SELECT master_create_worker_shards('cluster_management_test', 16, 1);
|
||||||
|
|
|
@ -79,6 +79,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-16';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.2-3';
|
||||||
|
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
|
|
|
@ -150,8 +150,8 @@ PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- Add "fake" pg_dist_transaction records and run recovery
|
-- Add "fake" pg_dist_transaction records and run recovery
|
||||||
INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_commit');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_commit');
|
||||||
INSERT INTO pg_dist_transaction VALUES (14, 'citus_0_should_be_forgotten');
|
INSERT INTO pg_dist_transaction VALUES (12, 'citus_0_should_be_forgotten');
|
||||||
|
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
|
@ -575,7 +575,7 @@ WHERE
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
-- re-add the node for next tests
|
-- re-add the node for next tests
|
||||||
SELECT master_add_node('localhost', :worker_2_port);
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
|
||||||
-- DROP tables to clean workspace
|
-- DROP tables to clean workspace
|
||||||
|
|
|
@ -417,6 +417,49 @@ WHERE colocationid IN
|
||||||
DROP TABLE replicate_reference_table_schema.table1;
|
DROP TABLE replicate_reference_table_schema.table1;
|
||||||
DROP SCHEMA replicate_reference_table_schema CASCADE;
|
DROP SCHEMA replicate_reference_table_schema CASCADE;
|
||||||
|
|
||||||
|
-- do some tests with inactive node
|
||||||
|
SELECT master_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
CREATE TABLE initially_not_replicated_reference_table (key int);
|
||||||
|
SELECT create_reference_table('initially_not_replicated_reference_table');
|
||||||
|
|
||||||
|
SELECT master_add_inactive_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- we should see only one shard placements
|
||||||
|
SELECT
|
||||||
|
shardid, shardstate, shardlength, nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT
|
||||||
|
shardid
|
||||||
|
FROM
|
||||||
|
pg_dist_shard
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
|
||||||
|
ORDER BY 1,4,5;
|
||||||
|
|
||||||
|
-- we should see the two shard placements after activation
|
||||||
|
SELECT master_activate_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
shardid, shardstate, shardlength, nodename, nodeport
|
||||||
|
FROM
|
||||||
|
pg_dist_shard_placement
|
||||||
|
WHERE
|
||||||
|
shardid IN (SELECT
|
||||||
|
shardid
|
||||||
|
FROM
|
||||||
|
pg_dist_shard
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
|
||||||
|
ORDER BY 1,4,5;
|
||||||
|
|
||||||
|
-- this should have no effect
|
||||||
|
SELECT master_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- drop unnecassary tables
|
||||||
|
DROP TABLE initially_not_replicated_reference_table;
|
||||||
|
|
||||||
-- reload pg_dist_shard_placement table
|
-- reload pg_dist_shard_placement table
|
||||||
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
|
||||||
|
|
Loading…
Reference in New Issue