Merge pull request #3155 from citusdata/repref_base

Replicate reference tables to coordinator, except planner changes
pull/3170/head
Hadi Moshayedi 2019-11-15 05:59:19 -08:00 committed by GitHub
commit f8459f81a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 1424 additions and 413 deletions

View File

@ -161,7 +161,7 @@ function.
Then copy the `latest.sql` file to `{version}.sql`, where `{version}` is the
version for which this sql change is, e.g. `{9.0-1.sql}`. Now that you've
created this stable snapshot of the function definition for your version you
should use it in your actual sql file, .e.g.
should use it in your actual sql file, e.g.
`src/backend/distributed/sql/citus--8.3-1--9.0-1.sql`. You do this by using C
style `#include` statements like this:
```

View File

@ -93,7 +93,7 @@ EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target)
* either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
workerNodeList = ActivePrimaryNodeList(RowShareLock);
workerNodeList = ActivePrimaryWorkerNodeList(RowShareLock);
/*
* right after we acquired the lock we mark our objects as distributed, these changes

View File

@ -1074,7 +1074,7 @@ EnsureSequentialModeForFunctionDDL(void)
static void
TriggerSyncMetadataToPrimaryNodes(void)
{
List *workerList = ActivePrimaryNodeList(ShareLock);
List *workerList = ActivePrimaryWorkerNodeList(ShareLock);
ListCell *workerCell = NULL;
bool triggerMetadataSync = false;

View File

@ -65,6 +65,7 @@ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */
static bool shouldInvalidateForeignKeyGraph = false;
static int activeAlterTables = 0;
static int activeDropSchemaOrDBs = 0;
/* Local functions forward declarations for helper functions */
@ -76,6 +77,7 @@ static List * PlanRenameAttributeStmt(RenameStmt *stmt, const char *queryString)
static List * PlanAlterOwnerStmt(AlterOwnerStmt *stmt, const char *queryString);
static List * PlanAlterObjectDependsStmt(AlterObjectDependsStmt *stmt,
const char *queryString);
static bool IsDropSchemaOrDB(Node *parsetree);
/*
@ -655,6 +657,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
activeAlterTables++;
}
if (IsDropSchemaOrDB(parsetree))
{
activeDropSchemaOrDBs++;
}
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
@ -678,6 +685,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
{
activeAlterTables--;
}
if (IsDropSchemaOrDB(parsetree))
{
activeDropSchemaOrDBs--;
}
}
PG_CATCH();
{
@ -686,6 +698,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
activeAlterTables--;
}
if (IsDropSchemaOrDB(parsetree))
{
activeDropSchemaOrDBs--;
}
PG_RE_THROW();
}
PG_END_TRY();
@ -796,6 +813,26 @@ multi_ProcessUtility(PlannedStmt *pstmt,
}
/*
* IsDropSchemaOrDB returns true if parsetree represents DROP SCHEMA ...or
* a DROP DATABASE.
*/
static bool
IsDropSchemaOrDB(Node *parsetree)
{
DropStmt *dropStatement = NULL;
if (!IsA(parsetree, DropStmt))
{
return false;
}
dropStatement = (DropStmt *) parsetree;
return (dropStatement->removeType == OBJECT_SCHEMA) ||
(dropStatement->removeType == OBJECT_DATABASE);
}
/*
* PlanRenameAttributeStmt called for RenameStmt's that are targetting an attribute eg.
* type attributes. Based on the relation type the attribute gets renamed it dispatches to
@ -1198,3 +1235,14 @@ AlterTableInProgress(void)
{
return activeAlterTables > 0;
}
/*
* DropSchemaOrDBInProgress returns true if we're processing a DROP SCHEMA
* or a DROP DATABASE command right now.
*/
bool
DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}

View File

@ -127,7 +127,7 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS)
*/
BeginOrContinueCoordinatedTransaction();
nodeList = ActivePrimaryNodeList(NoLock);
nodeList = ActivePrimaryWorkerNodeList(NoLock);
estate = CreateExecutorState();
resultDest = (RemoteFileDestReceiver *) CreateRemoteFileDestReceiver(resultIdString,
estate, nodeList,

View File

@ -114,7 +114,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
}
else
{
List *workerNodeList = ActiveReadableNodeList();
List *workerNodeList = ActiveReadableWorkerNodeList();
int workerNodeCount = list_length(workerNodeList);
int taskCount = list_length(job->taskList);
double tasksPerNode = taskCount / ((double) workerNodeCount);

View File

@ -212,7 +212,7 @@ MultiTaskTrackerExecute(Job *job)
* assigning and checking the status of tasks. The second (temporary) hash
* helps us in fetching results data from worker nodes to the master node.
*/
workerNodeList = ActivePrimaryNodeList(NoLock);
workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
taskTrackerCount = (uint32) list_length(workerNodeList);
/* connect as the current user for running queries */

View File

@ -56,7 +56,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
*/
BeginOrContinueCoordinatedTransaction();
nodeList = ActiveReadableNodeList();
nodeList = ActiveReadableWorkerNodeList();
foreach(subPlanCell, subPlanList)
{

View File

@ -121,7 +121,7 @@ OpenConnectionsToAllWorkerNodes(LOCKMODE lockMode)
ListCell *workerNodeCell = NULL;
int connectionFlags = FORCE_NEW_CONNECTION;
workerNodeList = ActivePrimaryNodeList(lockMode);
workerNodeList = ActivePrimaryWorkerNodeList(lockMode);
foreach(workerNodeCell, workerNodeList)
{

View File

@ -177,7 +177,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
LockRelationOid(DistNodeRelationId(), RowShareLock);
/* load and sort the worker node list for deterministic placement */
workerNodeList = ActivePrimaryShouldHaveShardsNodeList(NoLock);
workerNodeList = DistributedTablePlacementNodeList(NoLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
workerNodeCount = list_length(workerNodeList);
@ -348,8 +348,7 @@ void
CreateReferenceTableShard(Oid distributedTableId)
{
char shardStorageType = 0;
List *workerNodeList = NIL;
int32 workerNodeCount = 0;
List *nodeList = NIL;
List *existingShardList = NIL;
uint64 shardId = INVALID_SHARD_ID;
int workerStartIndex = 0;
@ -386,18 +385,16 @@ CreateReferenceTableShard(Oid distributedTableId)
/*
* load and sort the worker node list for deterministic placements
* create_reference_table has already acquired ActivePrimaryNodeList lock
* create_reference_table has already acquired pg_dist_node lock
*/
workerNodeList = ActivePrimaryNodeList(NoLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
nodeList = ReferenceTablePlacementNodeList(ShareLock);
nodeList = SortList(nodeList, CompareWorkerNodes);
replicationFactor = ReferenceTableReplicationFactor();
/* get the next shard id */
shardId = GetNextShardId();
/* set the replication factor equal to the number of worker nodes */
workerNodeCount = list_length(workerNodeList);
replicationFactor = workerNodeCount;
/*
* Grabbing the shard metadata lock isn't technically necessary since
* we already hold an exclusive lock on the partition table, but we'll
@ -410,7 +407,7 @@ CreateReferenceTableShard(Oid distributedTableId)
shardMaxValue);
insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
workerNodeList, workerStartIndex,
nodeList, workerStartIndex,
replicationFactor);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,

View File

@ -422,7 +422,21 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
quotedShardName);
}
connection = GetPlacementConnection(connectionFlags, shardPlacement, NULL);
/*
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we
* try to drop it over another connection, we will get into a distributed
* deadlock.
*/
if (shardPlacement->groupId == COORDINATOR_GROUP_ID &&
IsCoordinator() &&
DropSchemaOrDBInProgress())
{
DeleteShardPlacementRow(shardPlacement->placementId);
continue;
}
connection = GetPlacementConnection(connectionFlags, shardPlacement,
NULL);
RemoteTransactionBeginIfNecessary(connection);

View File

@ -41,6 +41,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"

View File

@ -475,7 +475,7 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
/* switch to memory context appropriate for multiple function calls */
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
workerNodeList = ActiveReadableNodeList();
workerNodeList = ActiveReadableWorkerNodeList();
workerNodeCount = (uint32) list_length(workerNodeList);
functionContext->user_fctx = workerNodeList;

View File

@ -160,7 +160,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
shardId = GetNextShardId();
/* if enough live groups, add an extra candidate node as backup */
workerNodeList = ActivePrimaryNodeList(NoLock);
workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (list_length(workerNodeList) > ShardReplicationFactor)
{

View File

@ -44,6 +44,9 @@ static List * PrimaryNodesNotInList(List *currentList);
static WorkerNode * FindRandomNodeFromList(List *candidateWorkerNodeList);
static bool OddNumber(uint32 number);
static bool ListMember(List *currentList, WorkerNode *workerNode);
static bool NodeIsPrimaryWorker(WorkerNode *node);
static bool NodeCanHaveDistTablePlacements(WorkerNode *node);
static bool NodeIsReadableWorker(WorkerNode *node);
/* ------------------------------------------------------------
@ -294,12 +297,12 @@ WorkerGetNodeWithName(const char *hostname)
/*
* ActivePrimaryNodeCount returns the number of groups with a primary in the cluster.
* ActivePrimaryWorkerNodeCount returns the number of groups with a primary in the cluster.
*/
uint32
ActivePrimaryNodeCount(void)
ActivePrimaryWorkerNodeCount(void)
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount;
@ -307,18 +310,28 @@ ActivePrimaryNodeCount(void)
/*
* ActiveReadableNodeCount returns the number of groups with a node we can read from.
* ActiveReadableWorkerNodeCount returns the number of groups with a node we can read from.
*/
uint32
ActiveReadableNodeCount(void)
ActiveReadableWorkerNodeCount(void)
{
List *workerNodeList = ActiveReadableNodeList();
List *workerNodeList = ActiveReadableWorkerNodeList();
uint32 liveWorkerCount = list_length(workerNodeList);
return liveWorkerCount;
}
/*
* NodeIsCoordinator returns true if the given node represents the coordinator.
*/
bool
NodeIsCoordinator(WorkerNode *node)
{
return node->groupId == COORDINATOR_GROUP_ID;
}
/*
* ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction
* returns true for.
@ -358,37 +371,111 @@ FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *))
/*
* ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash
* lockMode specifies which lock to use on pg_dist_node, this is necessary when
* the caller wouldn't want nodes to be added concurrent to their use of this list
* ActivePrimaryWorkerNodeList returns a list of all active primary worker nodes
* in workerNodeHash. lockMode specifies which lock to use on pg_dist_node,
* this is necessary when the caller wouldn't want nodes to be added concurrent
* to their use of this list.
*/
List *
ActivePrimaryWorkerNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryWorker);
}
/*
* ActivePrimaryNodeList returns a list of all active primary nodes in
* workerNodeHash.
*/
List *
ActivePrimaryNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimary);
return FilterActiveNodeListFunc(lockMode, NodeIsPrimary);
}
/*
* ActivePrimaryShouldHaveShardsNodeList returns a list of all active, primary
* NodeIsPrimaryWorker returns true if the node is a primary worker node.
*/
static bool
NodeIsPrimaryWorker(WorkerNode *node)
{
return !NodeIsCoordinator(node) && NodeIsPrimary(node);
}
/*
* ReferenceTablePlacementNodeList returns the set of nodes that should have
* reference table placements. This includes all primaries, including the
* coordinator if known.
*/
List *
ReferenceTablePlacementNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimary);
}
/*
* DistributedTablePlacementNodeList returns a list of all active, primary
* worker nodes that can store new data, i.e shouldstoreshards is 'true'
*/
List *
ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode)
DistributedTablePlacementNodeList(LOCKMODE lockMode)
{
EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimaryShouldHaveShardsNode);
return FilterActiveNodeListFunc(lockMode, NodeCanHaveDistTablePlacements);
}
/*
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from.
* NodeCanHaveDistTablePlacements returns true if the given node can have
* shards of a distributed table.
*/
static bool
NodeCanHaveDistTablePlacements(WorkerNode *node)
{
if (!NodeIsPrimary(node))
{
return false;
}
return node->shouldHaveShards;
}
/*
* ActiveReadableWorkerNodeList returns a list of all nodes in workerNodeHash
* that are readable workers.
*/
List *
ActiveReadableWorkerNodeList(void)
{
return FilterActiveNodeListFunc(NoLock, NodeIsReadableWorker);
}
/*
* ActiveReadableNodeList returns a list of all nodes in workerNodeHash
* that are readable workers.
*/
List *
ActiveReadableNodeList(void)
{
return FilterActiveNodeListFunc(NoLock, WorkerNodeIsReadable);
return FilterActiveNodeListFunc(NoLock, NodeIsReadable);
}
/*
* NodeIsReadableWorker returns true if the given node is a readable worker node.
*/
static bool
NodeIsReadableWorker(WorkerNode *node)
{
return !NodeIsCoordinator(node) && NodeIsReadable(node);
}
@ -414,7 +501,7 @@ PrimaryNodesNotInList(List *currentList)
continue;
}
if (WorkerNodeIsPrimary(workerNode))
if (NodeIsPrimary(workerNode))
{
workerNodeList = lappend(workerNodeList, workerNode);
}
@ -524,7 +611,7 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
WorkerNode *
GetFirstPrimaryWorkerNode(void)
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
ListCell *workerNodeCell = NULL;
WorkerNode *firstWorkerNode = NULL;

View File

@ -124,9 +124,17 @@ StartMetadatSyncToNode(char *nodeNameString, int32 nodePort)
escapedNodeName, nodePort)));
}
if (NodeIsCoordinator(workerNode))
{
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
"metadata, skipping syncing the metadata",
nodeNameString, nodePort)));
return;
}
MarkNodeHasMetadata(nodeNameString, nodePort, true);
if (!WorkerNodeIsPrimary(workerNode))
if (!NodeIsPrimary(workerNode))
{
/*
* If this is a secondary node we can't actually sync metadata to it; we assume
@ -346,7 +354,7 @@ MetadataCreateCommands(void)
List *distributedTableList = DistributedTableList();
List *propagatedTableList = NIL;
bool includeNodesFromOtherClusters = true;
List *workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL;
bool includeSequenceDefaults = true;
@ -1177,7 +1185,7 @@ SchemaOwnerName(Oid objectId)
static bool
HasMetadataWorkers(void)
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, workerNodeList)
@ -1306,7 +1314,7 @@ SyncMetadataToNodes(void)
return METADATA_SYNC_FAILED_LOCK;
}
workerList = ActivePrimaryNodeList(NoLock);
workerList = ActivePrimaryWorkerNodeList(NoLock);
foreach(workerCell, workerList)
{

View File

@ -1991,7 +1991,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependedJobList, Var *partitionKey,
static uint32
HashPartitionCount(void)
{
uint32 groupCount = ActiveReadableNodeCount();
uint32 groupCount = ActiveReadableWorkerNodeCount();
double maxReduceTasksPerNode = MaxRunningTasksPerNode / 2.0;
uint32 partitionCount = (uint32) rint(groupCount * maxReduceTasksPerNode);
@ -5024,7 +5024,7 @@ GreedyAssignTaskList(List *taskList)
uint32 taskCount = list_length(taskList);
/* get the worker node list and sort the list */
List *workerNodeList = ActiveReadableNodeList();
List *workerNodeList = ActiveReadableWorkerNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*
@ -5472,7 +5472,7 @@ AssignDualHashTaskList(List *taskList)
* if subsequent jobs have a small number of tasks, we won't allocate the
* tasks to the same worker repeatedly.
*/
List *workerNodeList = ActiveReadableNodeList();
List *workerNodeList = ActiveReadableWorkerNodeList();
uint32 workerNodeCount = (uint32) list_length(workerNodeList);
uint32 beginningNodeIndex = jobId % workerNodeCount;

View File

@ -2120,7 +2120,7 @@ PlanRouterQuery(Query *originalQuery,
}
else if (replacePrunedQueryWithDummy)
{
List *workerNodeList = ActiveReadableNodeList();
List *workerNodeList = ActiveReadableWorkerNodeList();
if (workerNodeList != NIL)
{
int workerNodeCount = list_length(workerNodeList);

View File

@ -6,6 +6,8 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS
#include "udfs/master_drain_node/9.1-1.sql"
#include "udfs/rebalance_table_shards/9.1-1.sql"
#include "udfs/get_rebalance_table_shards_plan/9.1-1.sql"
#include "udfs/master_add_node/9.1-1.sql"
#include "udfs/master_add_inactive_node/9.1-1.sql"
-- we don't maintain replication factor of reference tables anymore and just
-- use -1 instead.

View File

@ -0,0 +1,15 @@
-- Update the default groupId to -1
DROP FUNCTION master_add_inactive_node(text, integer, integer, noderole, name);
CREATE FUNCTION master_add_inactive_node(nodename text,
nodeport integer,
groupid integer default -1,
noderole noderole default 'primary',
nodecluster name default 'default')
RETURNS INTEGER
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$master_add_inactive_node$$;
COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer,
groupid integer, noderole noderole,
nodecluster name)
IS 'prepare node by adding it to pg_dist_node';
REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM PUBLIC;

View File

@ -0,0 +1,15 @@
-- Update the default groupId to -1
DROP FUNCTION master_add_inactive_node(text, integer, integer, noderole, name);
CREATE FUNCTION master_add_inactive_node(nodename text,
nodeport integer,
groupid integer default -1,
noderole noderole default 'primary',
nodecluster name default 'default')
RETURNS INTEGER
LANGUAGE C STRICT
AS 'MODULE_PATHNAME',$$master_add_inactive_node$$;
COMMENT ON FUNCTION master_add_inactive_node(nodename text,nodeport integer,
groupid integer, noderole noderole,
nodecluster name)
IS 'prepare node by adding it to pg_dist_node';
REVOKE ALL ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) FROM PUBLIC;

View File

@ -0,0 +1,14 @@
-- Update the default groupId to -1
DROP FUNCTION master_add_node(text, integer, integer, noderole, name);
CREATE FUNCTION master_add_node(nodename text,
nodeport integer,
groupid integer default -1,
noderole noderole default 'primary',
nodecluster name default 'default')
RETURNS INTEGER
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_add_node$$;
COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer,
groupid integer, noderole noderole, nodecluster name)
IS 'add node to the cluster';
REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC;

View File

@ -0,0 +1,14 @@
-- Update the default groupId to -1
DROP FUNCTION master_add_node(text, integer, integer, noderole, name);
CREATE FUNCTION master_add_node(nodename text,
nodeport integer,
groupid integer default -1,
noderole noderole default 'primary',
nodecluster name default 'default')
RETURNS INTEGER
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_add_node$$;
COMMENT ON FUNCTION master_add_node(nodename text, nodeport integer,
groupid integer, noderole noderole, nodecluster name)
IS 'add node to the cluster';
REVOKE ALL ON FUNCTION master_add_node(text,int,int,noderole,name) FROM PUBLIC;

View File

@ -80,7 +80,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
uint32 timeout = PG_GETARG_UINT32(0);
int waitResult = 0;
List *workerList = ActivePrimaryNodeList(NoLock);
List *workerList = ActivePrimaryWorkerNodeList(NoLock);
ListCell *workerCell = NULL;
bool waitNotifications = false;
MultiConnection *connection = NULL;

View File

@ -216,7 +216,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL;
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
ListCell *workerNodeCell = NULL;
List *connectionList = NIL;
ListCell *connectionCell = NULL;

View File

@ -317,7 +317,7 @@ CitusStatActivity(const char *statQuery)
{
List *citusStatsList = NIL;
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
ListCell *workerNodeCell = NULL;
char *nodeUser = NULL;
List *connectionList = NIL;
@ -455,7 +455,7 @@ GetLocalNodeCitusDistStat(const char *statQuery)
localGroupId = GetLocalGroupId();
/* get the current worker's node stats */
workerNodeList = ActivePrimaryNodeList(NoLock);
workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);

View File

@ -111,13 +111,14 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command)
List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = ActivePrimaryNodeList(lockMode);
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode);
ListCell *workerNodeCell = NULL;
List *result = NIL;
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (targetWorkerSet == WORKERS_WITH_METADATA &&
(!workerNode->hasMetadata || !workerNode->metadataSynced))
{

View File

@ -627,7 +627,7 @@ LookupNodeForGroup(int32 groupId)
foundAnyNodes = true;
if (WorkerNodeIsReadable(workerNode))
if (NodeIsReadable(workerNode))
{
return workerNode;
}
@ -3026,7 +3026,7 @@ InitializeWorkerNodeCache(void)
newWorkerNodeHash = hash_create("Worker Node Hash", maxTableSize, &info, hashFlags);
/* read the list from pg_dist_node */
workerNodeList = ReadWorkerNodes(includeNodesFromOtherClusters);
workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
newWorkerNodeCount = list_length(workerNodeList);
newWorkerNodeArray = MemoryContextAlloc(MetadataCacheMemoryContext,

View File

@ -49,6 +49,7 @@
#include "utils/rel.h"
#include "utils/relcache.h"
#define INVALID_GROUP_ID -1
/* default group size */
int GroupSize = 1;
@ -109,6 +110,7 @@ DefaultNodeMetadata()
NodeMetadata nodeMetadata = {
.nodeRack = WORKER_DEFAULT_RACK,
.shouldHaveShards = true,
.groupId = INVALID_GROUP_ID,
};
return nodeMetadata;
}
@ -277,7 +279,7 @@ master_disable_node(PG_FUNCTION_ARGS)
PG_TRY();
{
if (WorkerNodeIsPrimary(workerNode))
if (NodeIsPrimary(workerNode))
{
/*
* Delete reference table placements so they are not taken into account
@ -363,7 +365,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
static void
SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
{
if (WorkerNodeIsPrimary(newWorkerNode))
if (NodeIsPrimary(newWorkerNode))
{
EnsureNoModificationsHaveBeenDone();
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
@ -450,10 +452,10 @@ GroupForNode(char *nodeName, int nodePort)
/*
* WorkerNodeIsPrimary returns whether the argument represents a primary node.
* NodeIsPrimary returns whether the argument represents a primary node.
*/
bool
WorkerNodeIsPrimary(WorkerNode *worker)
NodeIsPrimary(WorkerNode *worker)
{
Oid primaryRole = PrimaryNodeRoleId();
@ -468,10 +470,10 @@ WorkerNodeIsPrimary(WorkerNode *worker)
/*
* WorkerNodeIsSecondary returns whether the argument represents a secondary node.
* NodeIsSecondary returns whether the argument represents a secondary node.
*/
bool
WorkerNodeIsSecondary(WorkerNode *worker)
NodeIsSecondary(WorkerNode *worker)
{
Oid secondaryRole = SecondaryNodeRoleId();
@ -486,36 +488,20 @@ WorkerNodeIsSecondary(WorkerNode *worker)
/*
* WorkerNodeIsPrimaryShouldHaveShardsNode returns whether the argument represents a
* primary node that is a eligible for new data.
*/
bool
WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker)
{
if (!WorkerNodeIsPrimary(worker))
{
return false;
}
return worker->shouldHaveShards;
}
/*
* WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this
* NodeIsReadable returns whether we're allowed to send SELECT queries to this
* node.
*/
bool
WorkerNodeIsReadable(WorkerNode *workerNode)
NodeIsReadable(WorkerNode *workerNode)
{
if (ReadFromSecondaries == USE_SECONDARY_NODES_NEVER &&
WorkerNodeIsPrimary(workerNode))
NodeIsPrimary(workerNode))
{
return true;
}
if (ReadFromSecondaries == USE_SECONDARY_NODES_ALWAYS &&
WorkerNodeIsSecondary(workerNode))
NodeIsSecondary(workerNode))
{
return true;
}
@ -552,7 +538,7 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
*groupContainsNodes = true;
}
if (WorkerNodeIsPrimary(workerNode))
if (NodeIsPrimary(workerNode))
{
hash_seq_term(&status);
return workerNode;
@ -668,7 +654,7 @@ master_update_node(PG_FUNCTION_ARGS)
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
if (WorkerNodeIsPrimary(workerNode))
if (NodeIsPrimary(workerNode))
{
/*
* before acquiring the locks check if we want a background worker to help us to
@ -918,7 +904,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
/*
* ReadWorkerNodes iterates over pg_dist_node table, converts each row
* ReadDistNode iterates over pg_dist_node table, converts each row
* into it's memory representation (i.e., WorkerNode) and adds them into
* a list. Lastly, the list is returned to the caller.
*
@ -926,7 +912,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
* by includeNodesFromOtherClusters.
*/
List *
ReadWorkerNodes(bool includeNodesFromOtherClusters)
ReadDistNode(bool includeNodesFromOtherClusters)
{
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
@ -981,7 +967,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
char *nodeDeleteCommand = NULL;
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
if (WorkerNodeIsPrimary(workerNode))
if (NodeIsPrimary(workerNode))
{
bool onlyConsiderActivePlacements = false;
@ -1024,7 +1010,7 @@ CountPrimariesWithMetadata(void)
while ((workerNode = hash_seq_search(&status)) != NULL)
{
if (workerNode->hasMetadata && WorkerNodeIsPrimary(workerNode))
if (workerNode->hasMetadata && NodeIsPrimary(workerNode))
{
primariesWithMetadata++;
}
@ -1074,11 +1060,18 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
}
/* user lets Citus to decide on the group that the newly added node should be in */
if (nodeMetadata->groupId == 0)
if (nodeMetadata->groupId == INVALID_GROUP_ID)
{
nodeMetadata->groupId = GetNextGroupId();
}
/* if this is a coordinator, we shouldn't place shards on it */
if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
{
nodeMetadata->shouldHaveShards = false;
nodeMetadata->hasMetadata = true;
}
/* if nodeRole hasn't been added yet there's a constraint for one-node-per-group */
if (nodeMetadata->nodeRole != InvalidOid && nodeMetadata->nodeRole ==
PrimaryNodeRoleId())

View File

@ -36,8 +36,8 @@
/* local function forward declarations */
static void ReplicateSingleShardTableToAllWorkers(Oid relationId);
static void ReplicateShardToAllWorkers(ShardInterval *shardInterval);
static void ReplicateSingleShardTableToAllNodes(Oid relationId);
static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
@ -56,8 +56,6 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
List *shardIntervalList = NIL;
ShardInterval *shardInterval = NULL;
uint64 shardId = INVALID_SHARD_ID;
DistTableCacheEntry *tableEntry = NULL;
CheckCitusVersion(ERROR);
@ -95,6 +93,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
relationName)));
}
LockRelationOid(relationId, AccessExclusiveLock);
shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1)
{
@ -106,13 +106,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
"reference tables.", relationName)));
}
shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
LockShardDistributionMetadata(shardId, ExclusiveLock);
LockShardResource(shardId, ExclusiveLock);
ReplicateSingleShardTableToAllWorkers(relationId);
ReplicateSingleShardTableToAllNodes(relationId);
PG_RETURN_VOID();
}
@ -184,12 +178,13 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
/*
* ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to
* all worker nodes. It assumes that caller of this function ensures that given broadcast
* table has only one shard.
* ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates
* it to all worker nodes, and the coordinator if it has been added by the user
* to pg_dist_node. It assumes that caller of this function ensures that given
* broadcast table has only one shard.
*/
static void
ReplicateSingleShardTableToAllWorkers(Oid relationId)
ReplicateSingleShardTableToAllNodes(Oid relationId)
{
List *shardIntervalList = LoadShardIntervalList(relationId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
@ -209,12 +204,12 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
}
/*
* ReplicateShardToAllWorkers function opens separate transactions (i.e., not part
* ReplicateShardToAllNodes function opens separate transactions (i.e., not part
* of any coordinated transactions) to each worker and replicates given shard to all
* workers. If a worker already has a healthy replica of given shard, it skips that
* worker to prevent copying unnecessary data.
*/
ReplicateShardToAllWorkers(shardInterval);
ReplicateShardToAllNodes(shardInterval);
/*
* We need to update metadata tables to mark this table as reference table. We modify
@ -233,20 +228,20 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
/*
* ReplicateShardToAllWorkers function replicates given shard to the all worker nodes
* ReplicateShardToAllNodes function replicates given shard to all nodes
* in separate transactions. While replicating, it only replicates the shard to the
* workers which does not have a healthy replica of the shard. However, this function
* nodes which does not have a healthy replica of the shard. However, this function
* does not obtain any lock on shard resource and shard metadata. It is caller's
* responsibility to take those locks.
*/
static void
ReplicateShardToAllWorkers(ShardInterval *shardInterval)
ReplicateShardToAllNodes(ShardInterval *shardInterval)
{
List *workerNodeList = NULL;
ListCell *workerNodeCell = NULL;
/* prevent concurrent pg_dist_node changes */
workerNodeList = ActivePrimaryNodeList(ShareLock);
workerNodeList = ReferenceTablePlacementNodeList(ShareLock);
/*
* We will iterate over all worker nodes and if a healthy placement does not exist
@ -325,7 +320,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
}
/*
* Although ReplicateShardToAllWorkers is used only for reference tables,
* Although ReplicateShardToAllNodes is used only for reference tables,
* during the upgrade phase, the placements are created before the table is
* marked as a reference table. All metadata (including the placement
* metadata) will be copied to workers after all reference table changed
@ -514,3 +509,16 @@ CompareOids(const void *leftElement, const void *rightElement)
return 0;
}
}
/*
* ReferenceTableReplicationFactor returns the replication factor for
* reference tables.
*/
int
ReferenceTableReplicationFactor(void)
{
List *nodeList = ReferenceTablePlacementNodeList(NoLock);
int replicationFactor = list_length(nodeList);
return replicationFactor;
}

View File

@ -233,7 +233,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
static bool
IsFirstWorkerNode()
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
WorkerNode *firstWorkerNode = NULL;
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);

View File

@ -108,7 +108,7 @@ CollectBasicUsageStatistics(void)
distTableOids = DistTableOidList();
roundedDistTableCount = NextPow2(list_length(distTableOids));
roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids));
workerNodeCount = ActivePrimaryNodeCount();
workerNodeCount = ActivePrimaryWorkerNodeCount();
metadataJsonbDatum = DistNodeMetadata();
metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out,
metadataJsonbDatum));

View File

@ -122,11 +122,18 @@ RelationIsAKnownShard(Oid shardRelationId)
localGroupId = GetLocalGroupId();
if (localGroupId == 0)
{
/*
* We're not interested in shards in the coordinator
* or non-mx worker nodes.
*/
return false;
bool coordinatorIsKnown = false;
PrimaryNodeForGroup(0, &coordinatorIsKnown);
if (!coordinatorIsKnown)
{
/*
* We're not interested in shards in the coordinator
* or non-mx worker nodes, unless the coordinator is
* in pg_dist_node.
*/
return false;
}
}
/* we're not interested in the relations that are not in the search path */

View File

@ -58,5 +58,6 @@ extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands);
extern bool AlterTableInProgress(void);
extern bool DropSchemaOrDBInProgress(void);
#endif /* MULTI_UTILITY_H */

View File

@ -21,6 +21,7 @@ extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
extern List * ReferenceTableOidList(void);
extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void);
#endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -33,6 +33,8 @@
#define WORKER_DEFAULT_CLUSTER "default"
#define COORDINATOR_GROUP_ID 0
/*
* In memory representation of pg_dist_node table elements. The elements are hold in
* WorkerNodeHash table.
@ -67,22 +69,25 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList,
uint64 shardId,
uint32 placementIndex);
extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList);
extern uint32 ActivePrimaryNodeCount(void);
extern uint32 ActivePrimaryWorkerNodeCount(void);
extern List * ActivePrimaryWorkerNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode);
extern uint32 ActiveReadableNodeCount(void);
extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode);
extern uint32 ActiveReadableWorkerNodeCount(void);
extern List * ActiveReadableWorkerNodeList(void);
extern List * ActiveReadableNodeList(void);
extern WorkerNode * GetWorkerNodeByNodeId(int nodeId);
extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters);
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePorT);
extern uint32 GroupForNode(char *nodeName, int32 nodePort);
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
extern bool WorkerNodeIsPrimary(WorkerNode *worker);
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
extern bool WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker);
extern bool WorkerNodeIsReadable(WorkerNode *worker);
extern bool NodeIsPrimary(WorkerNode *worker);
extern bool NodeIsSecondary(WorkerNode *worker);
extern bool NodeIsReadable(WorkerNode *worker);
extern bool NodeIsCoordinator(WorkerNode *node);
extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);

View File

@ -0,0 +1,22 @@
--
-- ADD_COORDINATOR
--
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
-- adding the same node again should return the existing nodeid
SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid;
?column?
----------
t
(1 row)
-- adding another node with groupid=0 should error out
SELECT master_add_node('localhost', 12345, groupid => 0) = :master_nodeid;
ERROR: group 0 already has a primary node
-- start_metadata_sync_to_node() for coordinator should raise a notice
SELECT start_metadata_sync_to_node('localhost', :master_port);
NOTICE: localhost:57636 is the coordinator and already contains metadata, skipping syncing the metadata
start_metadata_sync_to_node
-----------------------------
(1 row)

View File

@ -527,12 +527,10 @@ SELECT count(*) FROM referencing_schema.referencing_table;
800
(1 row)
SET client_min_messages TO ERROR;
DROP SCHEMA referenced_schema CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table referenced_schema.referenced_table
drop cascades to constraint fkey_ref on table referencing_schema.referencing_table
DROP SCHEMA referencing_schema CASCADE;
NOTICE: drop cascades to table referencing_schema.referencing_table
RESET client_min_messages;
-- on delete set update cascades properly
CREATE TABLE referenced_table(test_column int, test_column2 int, PRIMARY KEY(test_column));
CREATE TABLE referencing_table(id int, ref_id int DEFAULT 1);
@ -819,8 +817,8 @@ INSERT INTO referenced_table SELECT x, x+1 FROM generate_series(0,1000) AS f(x);
INSERT INTO referenced_table2 SELECT x, x+1 FROM generate_series(500,1500) AS f(x);
-- should fail
INSERT INTO referencing_table SELECT x, x+1 FROM generate_series(0,1500) AS f(x);
ERROR: insert or update on table "referencing_table_7000230" violates foreign key constraint "foreign_key_2_7000230"
DETAIL: Key (id)=(28) is not present in table "referenced_table2_7000225".
ERROR: insert or update on table "referencing_table_7000226" violates foreign key constraint "foreign_key_2_7000226"
DETAIL: Key (id)=(1) is not present in table "referenced_table2_7000225".
-- should fail
INSERT INTO referencing_table SELECT x, x+1 FROM generate_series(0,400) AS f(x);
ERROR: insert or update on table "referencing_table_7000226" violates foreign key constraint "foreign_key_2_7000226"
@ -1188,24 +1186,24 @@ SELECT create_distributed_table('referencing_referencing_table', 'id');
ALTER TABLE referencing_table ADD CONSTRAINT fkey_ref FOREIGN KEY (ref_id, ref_id2) REFERENCES referenced_table(test_column, test_column2) ON DELETE CASCADE;
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_reference_table.referencing%' ORDER BY 1,2,3;
name | relid | refd_relid
-----------------------------------------------+------------------------------------------------------------+------------------------------------------------
fkey_ref_7000299 | fkey_reference_table.referencing_table_7000299 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000300 | fkey_reference_table.referencing_table_7000300 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000301 | fkey_reference_table.referencing_table_7000301 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000302 | fkey_reference_table.referencing_table_7000302 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000303 | fkey_reference_table.referencing_table_7000303 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000304 | fkey_reference_table.referencing_table_7000304 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000305 | fkey_reference_table.referencing_table_7000305 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000306 | fkey_reference_table.referencing_table_7000306 | fkey_reference_table.referenced_table_7000298
referencing_referencing_table_id_fkey_7000307 | fkey_reference_table.referencing_referencing_table_7000307 | fkey_reference_table.referencing_table_7000299
referencing_referencing_table_id_fkey_7000308 | fkey_reference_table.referencing_referencing_table_7000308 | fkey_reference_table.referencing_table_7000300
referencing_referencing_table_id_fkey_7000309 | fkey_reference_table.referencing_referencing_table_7000309 | fkey_reference_table.referencing_table_7000301
referencing_referencing_table_id_fkey_7000310 | fkey_reference_table.referencing_referencing_table_7000310 | fkey_reference_table.referencing_table_7000302
referencing_referencing_table_id_fkey_7000311 | fkey_reference_table.referencing_referencing_table_7000311 | fkey_reference_table.referencing_table_7000303
referencing_referencing_table_id_fkey_7000312 | fkey_reference_table.referencing_referencing_table_7000312 | fkey_reference_table.referencing_table_7000304
referencing_referencing_table_id_fkey_7000313 | fkey_reference_table.referencing_referencing_table_7000313 | fkey_reference_table.referencing_table_7000305
referencing_referencing_table_id_fkey_7000314 | fkey_reference_table.referencing_referencing_table_7000314 | fkey_reference_table.referencing_table_7000306
name | relid | refd_relid
------------------------------------------------------+------------------------------------------------------------+------------------------------------------------
fkey_ref_7000299 | fkey_reference_table.referencing_table_7000299 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000300 | fkey_reference_table.referencing_table_7000300 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000301 | fkey_reference_table.referencing_table_7000301 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000302 | fkey_reference_table.referencing_table_7000302 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000303 | fkey_reference_table.referencing_table_7000303 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000304 | fkey_reference_table.referencing_table_7000304 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000305 | fkey_reference_table.referencing_table_7000305 | fkey_reference_table.referenced_table_7000298
fkey_ref_7000306 | fkey_reference_table.referencing_table_7000306 | fkey_reference_table.referenced_table_7000298
referencing_referencing_table_id_ref_id_fkey_7000307 | fkey_reference_table.referencing_referencing_table_7000307 | fkey_reference_table.referencing_table_7000299
referencing_referencing_table_id_ref_id_fkey_7000308 | fkey_reference_table.referencing_referencing_table_7000308 | fkey_reference_table.referencing_table_7000300
referencing_referencing_table_id_ref_id_fkey_7000309 | fkey_reference_table.referencing_referencing_table_7000309 | fkey_reference_table.referencing_table_7000301
referencing_referencing_table_id_ref_id_fkey_7000310 | fkey_reference_table.referencing_referencing_table_7000310 | fkey_reference_table.referencing_table_7000302
referencing_referencing_table_id_ref_id_fkey_7000311 | fkey_reference_table.referencing_referencing_table_7000311 | fkey_reference_table.referencing_table_7000303
referencing_referencing_table_id_ref_id_fkey_7000312 | fkey_reference_table.referencing_referencing_table_7000312 | fkey_reference_table.referencing_table_7000304
referencing_referencing_table_id_ref_id_fkey_7000313 | fkey_reference_table.referencing_referencing_table_7000313 | fkey_reference_table.referencing_table_7000305
referencing_referencing_table_id_ref_id_fkey_7000314 | fkey_reference_table.referencing_referencing_table_7000314 | fkey_reference_table.referencing_table_7000306
(16 rows)
INSERT INTO referenced_table SELECT x, x+1 FROM generate_series(1,1000) AS f(x);
@ -1221,7 +1219,7 @@ SELECT max(ref_id) FROM referencing_referencing_table;
DROP TABLE referenced_table CASCADE;
NOTICE: drop cascades to constraint fkey_ref on table referencing_table
DROP TABLE referencing_table CASCADE;
NOTICE: drop cascades to constraint referencing_referencing_table_id_fkey on table referencing_referencing_table
NOTICE: drop cascades to constraint referencing_referencing_table_id_ref_id_fkey on table referencing_referencing_table
DROP TABLE referencing_referencing_table;
-- test if create_distributed_table works in transactions with some edge cases
-- the following checks if create_distributed_table works on foreign keys when
@ -1859,9 +1857,7 @@ ROLLBACK;
DROP TABLE referenced_table CASCADE;
NOTICE: drop cascades to constraint fkey_to_ref on table referencing_table_4
DROP TABLE referencing_table;
SET client_min_messages TO ERROR;
DROP SCHEMA fkey_reference_table CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to type foreign_details
drop cascades to view table_fkeys_in_workers
drop cascades to type composite
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -0,0 +1,143 @@
Parsed test spec with 3 sessions
starting permutation: s1-begin s2-begin s1-update-dist-table s2-lock-ref-table-placement-on-coordinator s1-lock-ref-table-placement-on-coordinator s2-update-dist-table deadlock-checker-call s1-end s2-end
create_distributed_table
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-dist-table:
update dist_table set b = 2 where a = 1;
step s2-lock-ref-table-placement-on-coordinator:
DO $$
DECLARE refshardid int;
BEGIN
SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass;
EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text);
END
$$;
step s1-lock-ref-table-placement-on-coordinator:
DO $$
DECLARE refshardid int;
BEGIN
SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass;
EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text);
END
$$;
<waiting ...>
step s2-update-dist-table:
update dist_table set b = 2 where a = 1;
<waiting ...>
step deadlock-checker-call:
SELECT check_distributed_deadlocks();
check_distributed_deadlocks
t
step s1-lock-ref-table-placement-on-coordinator: <... completed>
step s2-update-dist-table: <... completed>
error in steps deadlock-checker-call s1-lock-ref-table-placement-on-coordinator s2-update-dist-table: ERROR: canceling the transaction since it was involved in a distributed deadlock
step s1-end:
END;
step s2-end:
END;
master_remove_node
starting permutation: s1-begin s2-begin s1-update-ref-table s2-sleep s2-view-dist s2-view-worker s2-end s1-end
create_distributed_table
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-ref-table:
update ref_table set a = a + 1;
step s2-sleep:
SELECT pg_sleep(0.5);
pg_sleep
step s2-view-dist:
SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC;
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
update ref_table set a = a + 1;
coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression
update ref_table set a = a + 1;
localhost 57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-view-worker:
SELECT query, query_hostname, query_hostport, master_query_host_name,
master_query_host_port, state, wait_event_type, wait_event, usename, datname
FROM citus_worker_stat_activity
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_wait_edges%'
ORDER BY query, query_hostport DESC;
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression
UPDATE public.ref_table_1400163 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)localhost 57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-end:
END;
step s1-end:
END;
master_remove_node
starting permutation: s1-begin s2-begin s1-update-ref-table s2-active-transactions s1-end s2-end
create_distributed_table
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-update-ref-table:
update ref_table set a = a + 1;
step s2-active-transactions:
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
count
2
count
6
step s1-end:
END;
step s2-end:
END;
master_remove_node

View File

@ -0,0 +1,90 @@
CREATE SCHEMA mx_add_coordinator;
SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET citus.replication_model TO streaming;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column?
----------
1
(1 row)
-- test that coordinator pg_dist_node entry is synced to the workers
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
CREATE TABLE ref(a int);
SELECT create_reference_table('ref');
create_reference_table
------------------------
(1 row)
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;
INSERT INTO ref VALUES (1), (2), (3);
UPDATE ref SET a = a + 1;
DELETE FROM ref WHERE a > 3;
\c - - - :master_port
SET search_path TO mx_add_coordinator,public;
SELECT * FROM ref ORDER BY a;
a
---
2
3
(2 rows)
-- Clear pg_dist_transaction before removing the node. This is to keep the output
-- of multi_mx_transaction_recovery consistent.
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM run_command_on_workers('SELECT recover_prepared_transactions()');
count
-------
2
(1 row)
SELECT master_remove_node('localhost', :master_port);
master_remove_node
--------------------
(1 row)
-- test that coordinator pg_dist_node entry was removed from the workers
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
DROP SCHEMA mx_add_coordinator CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table ref
drop cascades to table ref_7000000
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -34,19 +34,19 @@ WHERE
1250000 | t | t
(1 row)
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
SELECT
shardid, shardstate, nodename, nodeport
shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass)
ORDER BY
placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1250000 | 1 | localhost | 57637
1250000 | 1 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | all_placements_healthy | replicated_to_all
---------+------------------------+-------------------
1250000 | t | t
(1 row)
-- check whether data was copied into distributed table
SELECT * FROM reference_table_test;
@ -783,18 +783,16 @@ SELECT create_reference_table('reference_table_test_fourth');
(1 row)
\set VERBOSITY terse
-- insert a row
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
-- now get the unique key violation
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
ERROR: duplicate key value violates unique constraint "reference_table_test_fourth_pkey_1250003"
DETAIL: Key (value_2)=(1) already exists.
CONTEXT: while executing command on localhost:57637
-- now get null constraint violation due to primary key
INSERT INTO reference_table_test_fourth (value_1, value_3, value_4) VALUES (1, '1.0', '2016-12-01');
ERROR: null value in column "value_2" violates not-null constraint
DETAIL: Failing row contains (1, null, 1.0, 2016-12-01 00:00:00).
CONTEXT: while executing command on localhost:57637
\set VERBOSITY default
-- lets run some upserts
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01') ON CONFLICT DO NOTHING RETURNING *;
value_1 | value_2 | value_3 | value_4
@ -820,18 +818,17 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON C
-- finally see that shard healths are OK
SELECT
shardid, shardstate, nodename, nodeport
shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test_fourth'::regclass)
ORDER BY
placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1250003 | 1 | localhost | 57637
1250003 | 1 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | all_placements_healthy | replicated_to_all
---------+------------------------+-------------------
1250003 | t | t
(1 row)
-- let's not run some update/delete queries on arbitrary columns
DELETE FROM
@ -1439,13 +1436,13 @@ SELECT shardid, shardstate FROM pg_dist_shard_placement WHERE placementid = :a_p
(1 row)
-- some queries that are captured in functions
CREATE FUNCTION select_count_all() RETURNS bigint AS '
CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS '
SELECT
count(*)
FROM
reference_table_test;
' LANGUAGE SQL;
CREATE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
RETURNS void AS '
INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
' LANGUAGE SQL;
@ -1587,10 +1584,12 @@ BEGIN;
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
ROLLBACK;
-- clean up tables
-- clean up tables, ...
SET client_min_messages TO ERROR;
DROP SEQUENCE example_ref_value_seq;
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite;
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite,
colocated_table_test, colocated_table_test_2, append_reference_tmp_table;
DROP TYPE reference_comp_key;
DROP SCHEMA reference_schema CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table reference_schema.reference_table_test_sixth
drop cascades to table reference_schema.reference_table_test_seventh
RESET client_min_messages;

View File

@ -104,7 +104,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -133,7 +134,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
@ -157,7 +159,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
@ -186,7 +189,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370001 | 1 | 0 | localhost | 57638
@ -224,7 +228,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -255,7 +260,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -286,7 +292,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -317,7 +324,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370003 | 1 | 0 | localhost | 57638
@ -384,29 +392,29 @@ WHERE colocationid IN
10004 | 1 | -1 | 0
(1 row)
SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype = 0 \gset
SELECT
logicalrelid, partmethod, colocationid, repmodel
logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 10004 | t
replicate_reference_table_hash | h | 1360005 | c
logicalrelid | partmethod | ?column? | repmodel
-----------------------------------------+------------+----------+----------
replicate_reference_table_reference_one | n | t | t
replicate_reference_table_hash | h | f | c
(2 rows)
BEGIN;
SET LOCAL client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_reference_one" to the node localhost:57638
?column?
----------
1
(1 row)
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
NOTICE: Replicating reference table "replicate_reference_table_hash" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -426,8 +434,7 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370004 | 1 | 0 | localhost | 57638
@ -447,18 +454,18 @@ WHERE colocationid IN
(1 row)
SELECT
logicalrelid, partmethod, colocationid, repmodel
logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 10004 | t
replicate_reference_table_hash | n | 10004 | t
replicate_reference_table_reference_two | n | 10004 | t
logicalrelid | partmethod | ?column? | repmodel
-----------------------------------------+------------+----------+----------
replicate_reference_table_reference_one | n | t | t
replicate_reference_table_hash | n | t | t
replicate_reference_table_reference_two | n | t | t
(3 rows)
DROP TABLE replicate_reference_table_reference_one;
@ -526,7 +533,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -558,7 +566,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -589,7 +598,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -618,7 +628,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370011 | 1 | 0 | localhost | 57638
@ -661,7 +672,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
@ -681,7 +693,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1370012 | 1 | 0 | localhost | 57638
@ -718,7 +731,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
1
(1 row)
-- we should see only one shard placements
-- we should see only one shard placements (other than coordinator)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
@ -730,6 +743,7 @@ WHERE
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
AND nodeport != :master_port
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
@ -755,6 +769,7 @@ WHERE
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
AND nodeport != :master_port
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------

View File

@ -1,5 +1,16 @@
-- Tests for prepared transaction recovery
SET citus.next_shard_id TO 1220000;
-- reference tables can have placements on the coordinator. Add it so
-- verify we recover transactions which do DML on coordinator placements
-- properly.
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
----------
1
(1 row)
RESET client_min_messages;
-- enforce 1 connection per placement since
-- the tests are prepared for that
SET citus.force_max_query_parallelization TO ON;
@ -30,14 +41,25 @@ BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
BEGIN;
CREATE TABLE should_abort (value int);
PREPARE TRANSACTION 'citus_0_should_abort';
BEGIN;
CREATE TABLE should_commit (value int);
PREPARE TRANSACTION 'citus_0_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
SET citus.force_max_query_parallelization TO ON;
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'),
(0, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'),
(0, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
3
6
(1 row)
SELECT count(*) FROM pg_dist_transaction;
@ -46,6 +68,18 @@ SELECT count(*) FROM pg_dist_transaction;
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
count
-------
1
(1 row)
-- Confirm that transactions were correctly rolled forward
\c - - - :worker_1_port
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
@ -328,3 +362,9 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;
DROP TABLE test_recovery_single;
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
----------
1
(1 row)

View File

@ -5,6 +5,12 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- We run this twice, once with coordinator node in pg_dist_node and once without.
-- Set client_min_messages to WARNING to discard NOTICE messages by
-- upgrade_to_reference_table() to make the output consistent in both cases.
-- We check that reference table placements were actually replicated by checking
-- pg_dist_placement.
SET client_min_messages TO WARNING;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
@ -74,7 +80,8 @@ SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
(1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid);
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
ERROR: could not find any healthy placement for shard 1360006
DROP TABLE upgrade_reference_table_unhealthy;
@ -91,13 +98,13 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
NOTICE: Replicating reference table "upgrade_reference_table_composite" to the node localhost:57638
upgrade_to_reference_table
----------------------------
(1 row)
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
-- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference');
@ -152,20 +159,22 @@ WHERE colocationid IN
--------------+------------+-------------------+------------------------
(0 rows)
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360009 | 1 | 8192 | localhost | 57637
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360009 | f
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
NOTICE: Replicating reference table "upgrade_reference_table_append" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -206,19 +215,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360009 | 1 | 8192 | localhost | 57637
1360009 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360009 | t
(1 row)
DROP TABLE upgrade_reference_table_append;
@ -266,19 +274,20 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360010 | 1 | 0 | localhost | 57637
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360010 | f
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
NOTICE: Replicating reference table "upgrade_reference_table_one_worker" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -319,19 +328,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360010 | 1 | 0 | localhost | 57637
1360010 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360010 | t
(1 row)
DROP TABLE upgrade_reference_table_one_worker;
@ -344,7 +352,8 @@ SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port;
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -380,19 +389,19 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360011 | 1 | 0 | localhost | 57637
1360011 | 1 | 0 | localhost | 57638
(2 rows)
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360011 | f
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy');
upgrade_to_reference_table
@ -435,19 +444,19 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360011 | 1 | 0 | localhost | 57637
1360011 | 1 | 0 | localhost | 57638
(2 rows)
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360011 | t
(1 row)
DROP TABLE upgrade_reference_table_one_unhealthy;
@ -494,19 +503,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360012 | 1 | 0 | localhost | 57637
1360012 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid
---------
1360012
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy');
upgrade_to_reference_table
@ -549,19 +557,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360012 | 1 | 0 | localhost | 57637
1360012 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360012 | t
(1 row)
DROP TABLE upgrade_reference_table_both_healthy;
@ -610,20 +617,21 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360013 | 1 | 0 | localhost | 57637
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360013 | f
(1 row)
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
NOTICE: Replicating reference table "upgrade_reference_table_transaction_rollback" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -665,15 +673,17 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360013 | 1 | 0 | localhost | 57637
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360013 | f
(1 row)
@ -723,20 +733,21 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360014 | 1 | 0 | localhost | 57637
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360014 | f
(1 row)
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
NOTICE: Replicating reference table "upgrade_reference_table_transaction_commit" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -778,19 +789,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
ORDER BY
nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360014 | 1 | 0 | localhost | 57637
1360014 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360014 | t
(1 row)
-- verify that shard is replicated to other worker
\c - - - :worker_2_port
@ -849,16 +859,17 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
GROUP BY shardid
ORDER BY shardid;
shardid
---------
1360015
(1 row)
@ -901,16 +912,17 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360015 | f
(1 row)
DROP TABLE upgrade_reference_table_mx;
@ -970,22 +982,22 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 3 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid
---------
1360016
(1 row)
SET client_min_messages TO WARNING;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
NOTICE: Replicating reference table "upgrade_reference_table_mx" to the node localhost:57638
upgrade_to_reference_table
----------------------------
@ -1027,18 +1039,18 @@ WHERE colocationid IN
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360016 | t
(1 row)
-- situation on metadata worker
@ -1066,18 +1078,18 @@ WHERE
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
GROUP BY shardid
ORDER BY shardid;
shardid | ?column?
---------+----------
1360016 | t
(1 row)
\c - - - :master_port
@ -1088,3 +1100,4 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
(1 row)
RESET client_min_messages;

View File

@ -0,0 +1,7 @@
-- removing coordinator from pg_dist_node should update pg_dist_colocation
SELECT master_remove_node('localhost', :master_port);
master_remove_node
--------------------
(1 row)

View File

@ -0,0 +1,103 @@
--
-- REPLICATE_REF_TABLES_ON_COORDINATOR
--
CREATE SCHEMA replicate_ref_to_coordinator;
SET search_path TO 'replicate_ref_to_coordinator';
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 8000000;
SET citus.next_placement_id TO 8000000;
--- enable logging to see which tasks are executed locally
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
CREATE TABLE squares(a int, b int);
SELECT create_reference_table('squares');
create_reference_table
------------------------
(1 row)
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
-- should be executed locally
SELECT count(*) FROM squares;
LOG: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares
count
-------
10
(1 row)
-- create a second reference table
CREATE TABLE numbers(a int);
SELECT create_reference_table('numbers');
create_reference_table
------------------------
(1 row)
INSERT INTO numbers VALUES (20), (21);
LOG: executing the command locally: INSERT INTO replicate_ref_to_coordinator.numbers_8000001 AS citus_table_alias (a) VALUES (20), (21)
-- INSERT ... SELECT between reference tables
BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers;
QUERY PLAN
-----------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57636 dbname=regression
-> Insert on squares_8000000 citus_table_alias (cost=0.00..41.88 rows=2550 width=8)
-> Seq Scan on numbers_8000001 numbers (cost=0.00..41.88 rows=2550 width=8)
(7 rows)
INSERT INTO squares SELECT a, a*a FROM numbers;
SELECT * FROM squares WHERE a >= 20 ORDER BY a;
a | b
----+-----
20 | 400
21 | 441
(2 rows)
ROLLBACK;
BEGIN;
EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
QUERY PLAN
----------------------------------------------------------------------------------------------
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57636 dbname=regression
-> Insert on numbers_8000001 citus_table_alias (cost=0.00..38.25 rows=753 width=4)
-> Seq Scan on squares_8000000 squares (cost=0.00..38.25 rows=753 width=4)
Filter: (a < 3)
(8 rows)
INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
SELECT * FROM numbers ORDER BY a;
a
----
1
2
20
21
(4 rows)
ROLLBACK;
-- Make sure we hide shard tables ...
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
citus_table_is_visible
------------------------
f
(1 row)
-- clean-up
SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE;
-- Make sure the shard was dropped
SELECT 'numbers_8000001'::regclass::oid;
ERROR: relation "numbers_8000001" does not exist
LINE 1: SELECT 'numbers_8000001'::regclass::oid;
^
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -30,6 +30,7 @@ test: isolation_dump_local_wait_edges
test: isolation_replace_wait_function
test: isolation_distributed_deadlock_detection
test: isolation_replicate_reference_tables_to_coordinator
# creating a restore point briefly blocks all
# writes, run this test serially.

View File

@ -21,6 +21,7 @@ test: multi_test_helpers
# the following test has to be run sequentially
test: multi_mx_create_table
test: multi_mx_hide_shard_names
test: multi_mx_add_coordinator
test: multi_mx_modifications_to_reference_tables
test: multi_mx_partitioning
test: multi_mx_copy_data multi_mx_router_planner

View File

@ -251,6 +251,18 @@ test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_remove_node_reference_table
# --------
# Replicating reference tables to coordinator. Add coordinator to pg_dist_node
# and rerun some of the tests.
# --------
test: add_coordinator
test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_reference_table
test: foreign_key_to_reference_table
test: replicate_reference_tables_to_coordinator
test: remove_coordinator
# ----------
# multi_transactional_drop_shards tests for dropping shards using connection API
# ----------

View File

@ -15,8 +15,8 @@
# ---
test: multi_extension
test: multi_cluster_management
test: multi_test_helpers
test: multi_table_ddl
test: multi_test_helpers
# ----------
# The following distributed tests depend on creating a partitioned table and

View File

@ -368,7 +368,7 @@ step "s6-commit"
COMMIT;
}
# we disable the deamon during the regression tests in order to get consistent results
# we disable the daemon during the regression tests in order to get consistent results
# thus we manually issue the deadlock detection
session "deadlock-checker"

View File

@ -0,0 +1,134 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SELECT master_add_node('localhost', 57636);
CREATE TABLE ref_table(a int primary key);
SELECT create_reference_table('ref_table');
INSERT INTO ref_table VALUES (1), (3), (5), (7);
CREATE TABLE dist_table(a int, b int);
SELECT create_distributed_table('dist_table', 'a');
}
teardown
{
SELECT citus_internal.restore_isolation_tester_func();
DROP TABLE ref_table, dist_table;
SELECT master_remove_node('localhost', 57636);
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-end"
{
END;
}
step "s1-update-dist-table"
{
update dist_table set b = 2 where a = 1;
}
step "s1-update-ref-table"
{
update ref_table set a = a + 1;
}
step "s1-lock-ref-table-placement-on-coordinator"
{
DO $$
DECLARE refshardid int;
BEGIN
SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass;
EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text);
END
$$;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-end"
{
END;
}
step "s2-update-dist-table"
{
update dist_table set b = 2 where a = 1;
}
step "s2-lock-ref-table-placement-on-coordinator"
{
DO $$
DECLARE refshardid int;
BEGIN
SELECT shardid INTO refshardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass;
EXECUTE format('SELECT * from ref_table_%s FOR UPDATE', refshardid::text);
END
$$;
}
step "s2-view-dist"
{
SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC;
}
step "s2-view-worker"
{
SELECT query, query_hostname, query_hostport, master_query_host_name,
master_query_host_port, state, wait_event_type, wait_event, usename, datname
FROM citus_worker_stat_activity
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
query NOT ILIKE '%COMMIT%' AND
query NOT ILIKE '%dump_local_wait_edges%'
ORDER BY query, query_hostport DESC;
}
step "s2-sleep"
{
SELECT pg_sleep(0.5);
}
step "s2-active-transactions"
{
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
}
# we disable the daemon during the regression tests in order to get consistent results
# thus we manually issue the deadlock detection
session "deadlock-checker"
# we issue the checker not only when there are deadlocks to ensure that we never cancel
# backend inappropriately
step "deadlock-checker-call"
{
SELECT check_distributed_deadlocks();
}
# verify that locks on the placement of the reference table on the coordinator is
# taken into account when looking for distributed deadlocks
permutation "s1-begin" "s2-begin" "s1-update-dist-table" "s2-lock-ref-table-placement-on-coordinator" "s1-lock-ref-table-placement-on-coordinator" "s2-update-dist-table" "deadlock-checker-call" "s1-end" "s2-end"
# verify that *_dist_stat_activity() functions return the correct result when query
# has a task on the coordinator.
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-sleep" "s2-view-dist" "s2-view-worker" "s2-end" "s1-end"
# verify that get_*_active_transactions() functions return the correct result when
# the query has a task on the coordinator.
permutation "s1-begin" "s2-begin" "s1-update-ref-table" "s2-active-transactions" "s1-end" "s2-end"

View File

@ -0,0 +1,14 @@
--
-- ADD_COORDINATOR
--
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
-- adding the same node again should return the existing nodeid
SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid;
-- adding another node with groupid=0 should error out
SELECT master_add_node('localhost', 12345, groupid => 0) = :master_nodeid;
-- start_metadata_sync_to_node() for coordinator should raise a notice
SELECT start_metadata_sync_to_node('localhost', :master_port);

View File

@ -249,8 +249,10 @@ INSERT INTO referencing_schema.referencing_table SELECT x, x from generate_serie
DELETE FROM referenced_schema.referenced_table WHERE id > 800;
SELECT count(*) FROM referencing_schema.referencing_table;
SET client_min_messages TO ERROR;
DROP SCHEMA referenced_schema CASCADE;
DROP SCHEMA referencing_schema CASCADE;
RESET client_min_messages;
-- on delete set update cascades properly
CREATE TABLE referenced_table(test_column int, test_column2 int, PRIMARY KEY(test_column));
@ -952,5 +954,7 @@ ROLLBACK;
DROP TABLE referenced_table CASCADE;
DROP TABLE referencing_table;
SET client_min_messages TO ERROR;
DROP SCHEMA fkey_reference_table CASCADE;
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -0,0 +1,46 @@
CREATE SCHEMA mx_add_coordinator;
SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET citus.replication_model TO streaming;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
-- test that coordinator pg_dist_node entry is synced to the workers
SELECT wait_until_metadata_sync();
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
CREATE TABLE ref(a int);
SELECT create_reference_table('ref');
-- test that changes from a metadata node is reflected in the coordinator placement
\c - - - :worker_1_port
SET search_path TO mx_add_coordinator,public;
INSERT INTO ref VALUES (1), (2), (3);
UPDATE ref SET a = a + 1;
DELETE FROM ref WHERE a > 3;
\c - - - :master_port
SET search_path TO mx_add_coordinator,public;
SELECT * FROM ref ORDER BY a;
-- Clear pg_dist_transaction before removing the node. This is to keep the output
-- of multi_mx_transaction_recovery consistent.
SELECT recover_prepared_transactions();
SELECT count(*) FROM run_command_on_workers('SELECT recover_prepared_transactions()');
SELECT master_remove_node('localhost', :master_port);
-- test that coordinator pg_dist_node entry was removed from the workers
SELECT wait_until_metadata_sync();
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
DROP SCHEMA mx_add_coordinator CASCADE;
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -23,14 +23,17 @@ FROM
pg_dist_shard
WHERE
logicalrelid = 'reference_table_test'::regclass;
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
SELECT
shardid, shardstate, nodename, nodeport
shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass)
ORDER BY
placementid;
GROUP BY shardid
ORDER BY shardid;
-- check whether data was copied into distributed table
SELECT * FROM reference_table_test;
@ -478,6 +481,8 @@ ORDER BY
CREATE TABLE reference_table_test_fourth (value_1 int, value_2 float PRIMARY KEY, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_fourth');
\set VERBOSITY terse
-- insert a row
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
@ -487,6 +492,8 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
-- now get null constraint violation due to primary key
INSERT INTO reference_table_test_fourth (value_1, value_3, value_4) VALUES (1, '1.0', '2016-12-01');
\set VERBOSITY default
-- lets run some upserts
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01') ON CONFLICT DO NOTHING RETURNING *;
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON CONFLICT (value_2) DO
@ -499,13 +506,13 @@ INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON C
-- finally see that shard healths are OK
SELECT
shardid, shardstate, nodename, nodeport
shardid, bool_and(shardstate = 1) all_placements_healthy, COUNT(distinct nodeport) = :active_primaries replicated_to_all
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test_fourth'::regclass)
ORDER BY
placementid;
GROUP BY shardid
ORDER BY shardid;
-- let's not run some update/delete queries on arbitrary columns
DELETE FROM
@ -909,14 +916,14 @@ SELECT master_copy_shard_placement(:a_shard_id, 'localhost', :worker_2_port, 'lo
SELECT shardid, shardstate FROM pg_dist_shard_placement WHERE placementid = :a_placement_id;
-- some queries that are captured in functions
CREATE FUNCTION select_count_all() RETURNS bigint AS '
CREATE OR REPLACE FUNCTION select_count_all() RETURNS bigint AS '
SELECT
count(*)
FROM
reference_table_test;
' LANGUAGE SQL;
CREATE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
CREATE OR REPLACE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
RETURNS void AS '
INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
' LANGUAGE SQL;
@ -992,7 +999,12 @@ ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
ROLLBACK;
-- clean up tables
-- clean up tables, ...
SET client_min_messages TO ERROR;
DROP SEQUENCE example_ref_value_seq;
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite;
reference_table_test_fourth, reference_schema.reference_table_ddl, reference_table_composite,
colocated_table_test, colocated_table_test_2, append_reference_tmp_table;
DROP TYPE reference_comp_key;
DROP SCHEMA reference_schema CASCADE;
RESET client_min_messages;

View File

@ -3,7 +3,6 @@
--
-- Tests that check that reference tables are replicated when adding new nodes.
SET citus.next_shard_id TO 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000;
@ -71,7 +70,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -88,7 +88,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -106,7 +107,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -123,7 +125,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -147,7 +150,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -166,7 +170,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -188,7 +193,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -207,7 +213,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -251,8 +258,10 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype = 0 \gset
SELECT
logicalrelid, partmethod, colocationid, repmodel
logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel
FROM
pg_dist_partition
WHERE
@ -260,6 +269,7 @@ WHERE
ORDER BY logicalrelid;
BEGIN;
SET LOCAL client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
SELECT create_reference_table('replicate_reference_table_reference_two');
@ -272,8 +282,7 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -283,7 +292,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'replicate_reference_table_reference_one'::regclass);
SELECT
logicalrelid, partmethod, colocationid, repmodel
logicalrelid, partmethod, colocationid = :reference_table_colocationid, repmodel
FROM
pg_dist_partition
WHERE
@ -350,7 +359,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -370,7 +380,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1370009;
@ -387,7 +398,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -404,7 +416,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT *
FROM pg_dist_colocation
@ -434,7 +447,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
@ -444,7 +458,8 @@ SELECT
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
-- verify constraints have been created on the new node
SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';');
@ -459,7 +474,7 @@ SELECT create_reference_table('initially_not_replicated_reference_table');
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
-- we should see only one shard placements
-- we should see only one shard placements (other than coordinator)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
@ -471,6 +486,7 @@ WHERE
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
AND nodeport != :master_port
ORDER BY 1,4,5;
-- we should see the two shard placements after activation
@ -487,6 +503,7 @@ WHERE
pg_dist_shard
WHERE
logicalrelid = 'initially_not_replicated_reference_table'::regclass)
AND nodeport != :master_port
ORDER BY 1,4,5;
-- this should have no effect

View File

@ -1,6 +1,13 @@
-- Tests for prepared transaction recovery
SET citus.next_shard_id TO 1220000;
-- reference tables can have placements on the coordinator. Add it so
-- verify we recover transactions which do DML on coordinator placements
-- properly.
SET client_min_messages TO ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
-- enforce 1 connection per placement since
-- the tests are prepared for that
SET citus.force_max_query_parallelization TO ON;
@ -28,14 +35,32 @@ CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
\c - - - :master_port
BEGIN;
CREATE TABLE should_abort (value int);
PREPARE TRANSACTION 'citus_0_should_abort';
BEGIN;
CREATE TABLE should_commit (value int);
PREPARE TRANSACTION 'citus_0_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_0_should_be_sorted_into_middle';
SET citus.force_max_query_parallelization TO ON;
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_commit'),
(0, 'citus_0_should_commit');
INSERT INTO pg_dist_transaction VALUES (1, 'citus_0_should_be_forgotten'),
(0, 'citus_0_should_be_forgotten');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
-- Confirm that transactions were correctly rolled forward
\c - - - :worker_1_port
SELECT count(*) FROM pg_tables WHERE tablename = 'should_abort';
@ -175,3 +200,5 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;
DROP TABLE test_recovery_single;
SELECT 1 FROM master_remove_node('localhost', :master_port);

View File

@ -7,6 +7,13 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- We run this twice, once with coordinator node in pg_dist_node and once without.
-- Set client_min_messages to WARNING to discard NOTICE messages by
-- upgrade_to_reference_table() to make the output consistent in both cases.
-- We check that reference table placements were actually replicated by checking
-- pg_dist_placement.
SET client_min_messages TO WARNING;
-- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
@ -48,7 +55,8 @@ DROP TABLE upgrade_reference_table_referenced;
CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid);
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
DROP TABLE upgrade_reference_table_unhealthy;
@ -62,6 +70,7 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
-- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int);
@ -102,13 +111,17 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
@ -135,14 +148,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
ORDER BY
nodeport;
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_append;
@ -174,12 +187,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
@ -206,14 +221,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
ORDER BY
nodeport;
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_one_worker;
@ -221,7 +236,8 @@ DROP TABLE upgrade_reference_table_one_worker;
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360010 AND nodeport = :worker_1_port;
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port;
-- situation before upgrade_reference_table
SELECT
@ -246,14 +262,15 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
ORDER BY
nodeport;
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy');
@ -280,14 +297,15 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
ORDER BY
nodeport;
AND shardstate = 1
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_one_unhealthy;
@ -318,14 +336,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
ORDER BY
nodeport;
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy');
@ -352,14 +370,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
ORDER BY
nodeport;
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_both_healthy;
@ -392,12 +410,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
@ -425,13 +445,16 @@ WHERE colocationid IN
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_transaction_rollback;
@ -464,12 +487,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
GROUP BY shardid
ORDER BY shardid;
BEGIN;
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
@ -498,14 +523,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
ORDER BY
nodeport;
GROUP BY shardid
ORDER BY shardid;
-- verify that shard is replicated to other worker
\c - - - :worker_2_port
@ -544,13 +569,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
GROUP BY shardid
ORDER BY shardid;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
@ -579,13 +605,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
GROUP BY shardid
ORDER BY shardid;
DROP TABLE upgrade_reference_table_mx;
@ -624,15 +651,16 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
GROUP BY shardid
ORDER BY shardid;
SET client_min_messages TO WARNING;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
@ -659,13 +687,14 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
GROUP BY shardid
ORDER BY shardid;
-- situation on metadata worker
\c - - - :worker_1_port
@ -684,15 +713,16 @@ WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
shardid, count(distinct nodeport) = :active_primaries
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
GROUP BY shardid
ORDER BY shardid;
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
RESET client_min_messages;

View File

@ -0,0 +1,2 @@
-- removing coordinator from pg_dist_node should update pg_dist_colocation
SELECT master_remove_node('localhost', :master_port);

View File

@ -0,0 +1,52 @@
--
-- REPLICATE_REF_TABLES_ON_COORDINATOR
--
CREATE SCHEMA replicate_ref_to_coordinator;
SET search_path TO 'replicate_ref_to_coordinator';
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 8000000;
SET citus.next_placement_id TO 8000000;
--- enable logging to see which tasks are executed locally
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
CREATE TABLE squares(a int, b int);
SELECT create_reference_table('squares');
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
-- should be executed locally
SELECT count(*) FROM squares;
-- create a second reference table
CREATE TABLE numbers(a int);
SELECT create_reference_table('numbers');
INSERT INTO numbers VALUES (20), (21);
-- INSERT ... SELECT between reference tables
BEGIN;
EXPLAIN INSERT INTO squares SELECT a, a*a FROM numbers;
INSERT INTO squares SELECT a, a*a FROM numbers;
SELECT * FROM squares WHERE a >= 20 ORDER BY a;
ROLLBACK;
BEGIN;
EXPLAIN INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
INSERT INTO numbers SELECT a FROM squares WHERE a < 3;
SELECT * FROM numbers ORDER BY a;
ROLLBACK;
-- Make sure we hide shard tables ...
SELECT citus_table_is_visible('numbers_8000001'::regclass::oid);
-- clean-up
SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE;
-- Make sure the shard was dropped
SELECT 'numbers_8000001'::regclass::oid;
SET search_path TO DEFAULT;
RESET client_min_messages;