Defer reference table replication to shard creation time

pull/3637/head
Marco Slot 2020-03-08 00:49:06 +01:00 committed by Hadi Moshayedi
parent 76a8a3c7c9
commit 924cd7343a
27 changed files with 528 additions and 557 deletions

View File

@ -346,6 +346,12 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
colocationId, replicationModel, viaDeprecatedAPI);
/*
* Make sure that existing reference tables have been replicated to all the nodes
* such that we can create foreign keys and joins work immediately after creation.
*/
EnsureReferenceTablesExistOnAllNodes();
/* we need to calculate these variables before creating distributed metadata */
bool localTableEmpty = LocalTableEmpty(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);

View File

@ -86,6 +86,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
ObjectAddressSet(tableAddress, RelationRelationId, distributedTableId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
EnsureReferenceTablesExistOnAllNodes();
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor,
useExclusiveConnections);

View File

@ -28,6 +28,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -449,6 +450,19 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
targetNodeName, targetNodePort);
}
if (!IsReferenceTable(distributedTableId))
{
/*
* When copying a shard to a new node, we should first ensure that reference
* tables are present such that joins work immediately after copying the shard.
* When copying a reference table, we are probably trying to achieve just that.
*
* Since this a long-running operation we do this after the error checks, but
* before taking metadata locks.
*/
EnsureReferenceTablesExistOnAllNodes();
}
/*
* CopyColocatedShardPlacement function copies given shard with its co-located
* shards.

View File

@ -60,6 +60,9 @@ int GroupSize = 1;
/* config variable managed via guc.c */
char *CurrentCluster = "default";
/* did current transaction modify pg_dist_node? */
bool TransactionModifiedNodeMetadata = false;
typedef struct NodeMetadata
{
int32 groupId;
@ -158,6 +161,7 @@ master_add_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
/*
* After adding new node, if the node did not already exist, we will activate
@ -196,6 +200,7 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(nodeId);
}
@ -229,6 +234,7 @@ master_add_secondary_node(PG_FUNCTION_ARGS)
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(nodeId);
}
@ -252,6 +258,7 @@ master_remove_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}
@ -305,6 +312,7 @@ master_disable_node(PG_FUNCTION_ARGS)
}
SetNodeState(nodeName, nodePort, isActive);
TransactionModifiedNodeMetadata = true;
}
PG_CATCH();
{
@ -351,6 +359,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
)));
}
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}
@ -371,8 +380,6 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
EnsureNoModificationsHaveBeenDone();
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
/*
* Let the maintenance daemon do the hard work of syncing the metadata.
@ -452,6 +459,8 @@ master_activate_node(PG_FUNCTION_ARGS)
nodePort);
ActivateNode(workerNode->workerName, workerNode->workerPort);
TransactionModifiedNodeMetadata = true;
PG_RETURN_INT32(workerNode->nodeId);
}
@ -723,6 +732,8 @@ master_update_node(PG_FUNCTION_ARGS)
TerminateBackgroundWorker(handle);
}
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID();
}

View File

@ -4,3 +4,4 @@
#include "udfs/citus_extradata_container/9.3-2.sql"
#include "udfs/update_distributed_table_colocation/9.3-2.sql"
#include "udfs/replicate_reference_tables/9.3-1.sql"

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.replicate_reference_tables()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replicate_reference_tables$$;
COMMENT ON FUNCTION pg_catalog.replicate_reference_tables()
IS 'replicate reference tables to all nodes';
REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC;

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.replicate_reference_tables()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$replicate_reference_tables$$;
COMMENT ON FUNCTION pg_catalog.replicate_reference_tables()
IS 'replicate reference tables to all nodes';
REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC;

View File

@ -449,6 +449,7 @@ ResetGlobalVariables()
dlist_init(&InProgressTransactions);
activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false;
TransactionModifiedNodeMetadata = false;
}

View File

@ -24,26 +24,243 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "postmaster/postmaster.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
/* local function forward declarations */
static List * WorkersWithoutReferenceTablePlacement(uint64 shardId);
static void CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *newWorkerNode);
static void ReplicateSingleShardTableToAllNodes(Oid relationId);
static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
int nodePort);
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
PG_FUNCTION_INFO_V1(replicate_reference_tables);
/*
* IsReferenceTable returns whether the given relation ID identifies a reference
* table.
*/
bool
IsReferenceTable(Oid relationId)
{
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
if (!tableEntry->isCitusTable)
{
return false;
}
if (tableEntry->partitionMethod != DISTRIBUTE_BY_NONE)
{
return false;
}
return true;
}
/*
* replicate_reference_tables is a UDF to ensure that allreference tables are
* replicated to all nodes.
*/
Datum
replicate_reference_tables(PG_FUNCTION_ARGS)
{
EnsureReferenceTablesExistOnAllNodes();
PG_RETURN_VOID();
}
/*
* EnsureReferenceTablesExistOnAllNodes ensures that a shard placement for every
* reference table exists on all nodes. If a node does not have a set of shard
* placements, then master_copy_shard_placement is called in a subtransaction
* to pull the data to the new node.
*/
void
EnsureReferenceTablesExistOnAllNodes(void)
{
List *referenceTableIdList = ReferenceTableOidList();
if (list_length(referenceTableIdList) == 0)
{
/* no reference tables exist */
return;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
get_rel_name(referenceTableId))));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
/* prevent this function from running concurrently with itself */
int colocationId = TableColocationId(referenceTableId);
LockColocationId(colocationId, ExclusiveLock);
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId);
if (list_length(newWorkersList) == 0)
{
/* nothing to do, no need for lock */
UnlockColocationId(colocationId, ExclusiveLock);
return;
}
/*
* master_copy_shard_placement triggers metadata sync-up, which tries to
* acquire a ShareLock on pg_dist_node. We do master_copy_shad_placement
* in a separate connection. If we have modified pg_dist_node in the
* current backend, this will cause a deadlock.
*/
if (TransactionModifiedNodeMetadata)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified node metadata")));
}
/*
* Modifications to reference tables in current transaction are not visible
* to master_copy_shard_placement, since it is done in a separate backend.
*/
if (AnyRelationsModifiedInTransaction(referenceTableIdList))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot replicate reference tables in a transaction "
"that modified a reference table")));
}
bool missingOk = false;
ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk);
if (sourceShardPlacement == NULL)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table shard " UINT64_FORMAT " does not "
"have an active shard placement",
shardId)));
}
WorkerNode *newWorkerNode = NULL;
foreach_ptr(newWorkerNode, newWorkersList)
{
CopyShardPlacementToNewWorkerNode(sourceShardPlacement, newWorkerNode);
}
/*
* Unblock other backends, they will probably observe that there are no
* more worker nodes without placements, unless nodes were added concurrently
*/
UnlockColocationId(colocationId, ExclusiveLock);
}
/*
* AnyRelationsModifiedInTransaction returns true if any of the given relations
* were modified in the current transaction.
*/
static bool
AnyRelationsModifiedInTransaction(List *relationIdList)
{
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
if (GetRelationDDLAccessMode(relationId) != RELATION_NOT_ACCESSED ||
GetRelationDMLAccessMode(relationId) != RELATION_NOT_ACCESSED)
{
return true;
}
}
return false;
}
/*
* WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that
* do not yet have a placement for the given reference table shard ID, but are
* supposed to.
*/
static List *
WorkersWithoutReferenceTablePlacement(uint64 shardId)
{
List *workersWithoutPlacements = NIL;
List *shardPlacementList = ActiveShardPlacementList(shardId);
/* we only take an access share lock, otherwise we'll hold up master_add_node */
List *workerNodeList = ReferenceTablePlacementNodeList(AccessShareLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
nodeName, nodePort);
if (targetPlacement == NULL)
{
workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode);
}
}
return workersWithoutPlacements;
}
/*
* CopyShardPlacementToNewWorkerNode runs master_copy_shard_placement in a
* subtransaction by connecting to localhost.
*/
static void
CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *newWorkerNode)
{
int connectionFlags = OUTSIDE_TRANSACTION;
StringInfo queryString = makeStringInfo();
const char *userName = CitusExtensionOwnerName();
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags, "localhost", PostPortNumber,
userName, NULL);
appendStringInfo(queryString,
"SELECT master_copy_shard_placement("
UINT64_FORMAT ", %s, %d, %s, %d, do_repair := false)",
sourceShardPlacement->shardId,
quote_literal_cstr(sourceShardPlacement->nodeName),
sourceShardPlacement->nodePort,
quote_literal_cstr(newWorkerNode->workerName),
newWorkerNode->workerPort);
ExecuteCriticalRemoteCommand(connection, queryString->data);
}
/*
@ -104,72 +321,13 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
"reference tables.", relationName)));
}
EnsureReferenceTablesExistOnAllNodes();
ReplicateSingleShardTableToAllNodes(relationId);
PG_RETURN_VOID();
}
/*
* ReplicateAllReferenceTablesToNode function finds all reference tables and
* replicates them to the given worker node. It also modifies pg_dist_colocation
* table to update the replication factor column when necessary. This function
* skips reference tables if that node already has healthy placement of that
* reference table to prevent unnecessary data transfer.
*/
void
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{
List *referenceTableList = ReferenceTableOidList();
/* if there is no reference table, we do not need to replicate anything */
if (list_length(referenceTableList) > 0)
{
List *referenceShardIntervalList = NIL;
/*
* We sort the reference table list to prevent deadlocks in concurrent
* ReplicateAllReferenceTablesToAllNodes calls.
*/
referenceTableList = SortList(referenceTableList, CompareOids);
Oid referenceTableId = InvalidOid;
foreach_oid(referenceTableId, referenceTableList)
{
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
referenceShardIntervalList = lappend(referenceShardIntervalList,
shardInterval);
}
if (ClusterHasKnownMetadataWorkers())
{
BlockWritesToShardList(referenceShardIntervalList);
}
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, referenceShardIntervalList)
{
uint64 shardId = shardInterval->shardId;
LockShardDistributionMetadata(shardId, ExclusiveLock);
ReplicateShardToNode(shardInterval, nodeName, nodePort);
}
/* create foreign constraints between reference tables */
foreach_ptr(shardInterval, referenceShardIntervalList)
{
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
commandList);
}
}
}
/*
* ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates
* it to all worker nodes, and the coordinator if it has been added by the user

View File

@ -319,6 +319,36 @@ IntToLockMode(int mode)
}
/*
* LockColocationId returns after acquiring a co-location ID lock, typically used
* for rebalancing and replication.
*/
void
LockColocationId(int colocationId, LOCKMODE lockMode)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId);
(void) LockAcquire(&tag, lockMode, sessionLock, dontWait);
}
/*
* UnlockColocationId releases a co-location ID lock.
*/
void
UnlockColocationId(int colocationId, LOCKMODE lockMode)
{
LOCKTAG tag;
const bool sessionLock = false;
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId);
LockRelease(&tag, lockMode, sessionLock);
}
/*
* LockShardDistributionMetadata returns after grabbing a lock for distribution
* metadata related to the specified shard, blocking if required. Any locks

View File

@ -16,8 +16,9 @@
#include "listutils.h"
extern bool IsReferenceTable(Oid relationId);
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern uint32 CreateReferenceTableColocationId(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
extern List * ReferenceTableOidList(void);
extern int CompareOids(const void *leftElement, const void *rightElement);

View File

@ -102,6 +102,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode);
extern void LockJobResource(uint64 jobId, LOCKMODE lockmode);
extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode);
/* Lock a co-location group */
extern void LockColocationId(int colocationId, LOCKMODE lockMode);
extern void UnlockColocationId(int colocationId, LOCKMODE lockMode);
/* Lock multiple shards for safe modification */
extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
extern void LockShardsInPlacementListMetadata(List *shardPlacementList,

View File

@ -103,6 +103,9 @@ extern int DoBlockLevel;
/* SET LOCAL statements active in the current (sub-)transaction. */
extern StringInfo activeSetStmts;
/* did current transaction modify pg_dist_node? */
extern bool TransactionModifiedNodeMetadata;
/*
* Coordinated transaction management.
*/

View File

@ -75,25 +75,6 @@ ORDER BY placementid;
200000 | 1
(1 row)
-- fail activate node by failing reference table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_activate_node('localhost', :worker_2_proxy_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
@ -140,39 +121,6 @@ ORDER BY placementid;
200000 | 1
(1 row)
-- fail activate node by failing reference table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_activate_node('localhost', :worker_2_proxy_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
ERROR: canceling statement due to user request
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
node_name | node_port
---------------------------------------------------------------------
localhost | 57637
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
200000 | 1
(1 row)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- master_remove_node fails when there are shards on that worker
SELECT master_remove_node('localhost', :worker_2_proxy_port);
ERROR: you cannot remove the primary node of a node group which has shard placements
@ -225,63 +173,6 @@ ORDER BY placementid;
200000 | 1
(1 row)
-- test master_add_node replicated a reference table
-- to newly added node.
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_add_node('localhost', :worker_2_proxy_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
node_name | node_port
---------------------------------------------------------------------
localhost | 57637
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
200000 | 1
(1 row)
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_add_node('localhost', :worker_2_proxy_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
ERROR: canceling statement due to user request
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
node_name | node_port
---------------------------------------------------------------------
localhost | 57637
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
200000 | 1
(1 row)
-- reset cluster to original state
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
@ -290,10 +181,9 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SELECT master_add_node('localhost', :worker_2_proxy_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
master_add_node
---------------------------------------------------------------------
6
4
(1 row)
-- verify node is added
@ -312,32 +202,6 @@ ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
200000 | 1
200000 | 1
(2 rows)
-- fail master_add_node by failing copy out operation
SELECT master_remove_node('localhost', :worker_1_port);
master_remove_node
---------------------------------------------------------------------
(1 row)
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
ERROR: could not copy table "user_table_200000" from "localhost:xxxxx"
CONTEXT: while executing command on localhost:xxxxx
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
node_name | node_port
---------------------------------------------------------------------
localhost | 9060
(1 row)
SELECT citus.mitmproxy('conn.allow()');
@ -347,10 +211,9 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SELECT master_add_node('localhost', :worker_1_port);
NOTICE: Replicating reference table "user_table" to the node localhost:xxxxx
master_add_node
---------------------------------------------------------------------
8
1
(1 row)
-- verify node is added
@ -369,8 +232,7 @@ ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
200000 | 1
200000 | 1
(2 rows)
(1 row)
RESET SEARCH_PATH;
DROP SCHEMA add_remove_node CASCADE;

View File

@ -1122,7 +1122,6 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT

View File

@ -5,36 +5,34 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
<waiting ...>
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-commit:
COMMIT;
step s2-copy-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
master_remove_node
@ -45,36 +43,34 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s2-begin:
BEGIN;
BEGIN;
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
SELECT 1 FROM master_add_node('localhost', 57638);
step s1-add-second-worker: <... completed>
?column?
1
step s2-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 10
57638 t 10
master_remove_node
@ -85,36 +81,34 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
<waiting ...>
INSERT INTO test_reference_table VALUES (6);
step s1-commit:
COMMIT;
step s2-insert-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 6
57638 t 6
master_remove_node
@ -125,36 +119,34 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s2-begin:
BEGIN;
BEGIN;
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
INSERT INTO test_reference_table VALUES (6);
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
SELECT 1 FROM master_add_node('localhost', 57638);
step s1-add-second-worker: <... completed>
?column?
1
step s2-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 6
57638 t 6
master_remove_node
@ -165,36 +157,35 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
CREATE INDEX reference_index ON test_reference_table(test_id);
<waiting ...>
step s1-commit:
COMMIT;
step s2-ddl-on-reference-table: <... completed>
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -205,36 +196,35 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s2-begin:
BEGIN;
BEGIN;
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
CREATE INDEX reference_index ON test_reference_table(test_id);
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-add-second-worker: <... completed>
?column?
1
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -245,19 +235,19 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-create-reference-table-2:
SELECT create_reference_table('test_reference_table_2');
SELECT create_reference_table('test_reference_table_2');
<waiting ...>
step s1-commit:
COMMIT;
@ -267,12 +257,12 @@ create_reference_table
step s2-print-content-2:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
@ -288,39 +278,38 @@ create_distributed_table
step s2-load-metadata-cache:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s2-begin:
BEGIN;
BEGIN;
step s2-create-reference-table-2:
SELECT create_reference_table('test_reference_table_2');
SELECT create_reference_table('test_reference_table_2');
create_reference_table
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-add-second-worker: <... completed>
?column?
1
step s2-print-content-2:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -334,30 +323,28 @@ step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
<waiting ...>
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-commit:
COMMIT;
step s2-copy-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
master_remove_node
@ -368,33 +355,31 @@ create_distributed_table
step s2-begin:
BEGIN;
BEGIN;
step s2-copy-to-reference-table:
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
SELECT 1 FROM master_add_node('localhost', 57638);
step s1-add-second-worker: <... completed>
?column?
1
step s2-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 5
57638 t 5
master_remove_node
@ -408,30 +393,28 @@ step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
<waiting ...>
INSERT INTO test_reference_table VALUES (6);
step s1-commit:
COMMIT;
step s2-insert-to-reference-table: <... completed>
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -442,33 +425,31 @@ create_distributed_table
step s2-begin:
BEGIN;
BEGIN;
step s2-insert-to-reference-table:
INSERT INTO test_reference_table VALUES (6);
INSERT INTO test_reference_table VALUES (6);
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
SELECT 1 FROM master_add_node('localhost', 57638);
step s1-add-second-worker: <... completed>
?column?
1
step s2-commit:
COMMIT;
step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -482,30 +463,29 @@ step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
CREATE INDEX reference_index ON test_reference_table(test_id);
<waiting ...>
step s1-commit:
COMMIT;
step s2-ddl-on-reference-table: <... completed>
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -516,33 +496,32 @@ create_distributed_table
step s2-begin:
BEGIN;
BEGIN;
step s2-ddl-on-reference-table:
CREATE INDEX reference_index ON test_reference_table(test_id);
CREATE INDEX reference_index ON test_reference_table(test_id);
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-add-second-worker: <... completed>
?column?
1
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node
@ -556,13 +535,13 @@ step s1-begin:
BEGIN;
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-create-reference-table-2:
SELECT create_reference_table('test_reference_table_2');
SELECT create_reference_table('test_reference_table_2');
<waiting ...>
step s1-commit:
COMMIT;
@ -572,12 +551,12 @@ create_reference_table
step s2-print-content-2:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
@ -593,36 +572,35 @@ create_distributed_table
step s2-begin:
BEGIN;
BEGIN;
step s2-create-reference-table-2:
SELECT create_reference_table('test_reference_table_2');
SELECT create_reference_table('test_reference_table_2');
create_reference_table
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
SELECT 1 FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-add-second-worker: <... completed>
?column?
1
step s2-print-content-2:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_reference_table_2', 'select count(*) from %s')
ORDER BY
nodeport;
nodeport success result
57637 t 1
57638 t 1
master_remove_node

View File

@ -15,10 +15,6 @@ DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx
DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx
NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx
NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx
NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx
NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx
master_add_node
---------------------------------------------------------------------
32

View File

@ -1394,7 +1394,6 @@ WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "mx_ref" to the node localhost:xxxxx
master_add_node
---------------------------------------------------------------------
7
@ -1407,8 +1406,7 @@ ORDER BY shardid, nodeport;
shardid | nodename | nodeport
---------------------------------------------------------------------
1310073 | localhost | 57637
1310073 | localhost | 57638
(2 rows)
(1 row)
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
@ -1418,8 +1416,7 @@ ORDER BY shardid, nodeport;
shardid | nodename | nodeport
---------------------------------------------------------------------
1310073 | localhost | 57637
1310073 | localhost | 57638
(2 rows)
(1 row)
-- Get the metadata back into a consistent state
\c - - - :master_port

View File

@ -173,7 +173,6 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
-- Test updating a node when another node is in readonly-mode
---------------------------------------------------------------------
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
NOTICE: Replicating reference table "ref_table" to the node localhost:xxxxx
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
@ -383,7 +382,6 @@ SELECT verify_metadata('localhost', :worker_1_port);
(1 row)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "ref_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -449,7 +447,6 @@ SELECT wait_until_metadata_sync();
(1 row)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "ref_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -511,7 +508,6 @@ SELECT wait_until_metadata_sync();
(1 row)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "ref_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1

View File

@ -221,7 +221,6 @@ SELECT master_remove_node('localhost', :worker_2_port);
ERROR: node at "localhost:xxxxx" does not exist
-- re-add the node for next tests
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -242,7 +241,6 @@ SELECT master_remove_node('localhost', :worker_2_port);
-- re-add the node for the next test
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -262,10 +260,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -291,10 +288,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
\c - - - :master_port
BEGIN;
@ -318,10 +314,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -347,10 +342,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
\c - - - :master_port
-- remove node in a transaction and COMMIT
@ -367,10 +361,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -396,10 +389,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
\c - - - :master_port
BEGIN;
@ -458,7 +450,6 @@ WHERE
\c - - - :master_port
-- re-add the node for next tests
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -478,10 +469,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -507,10 +497,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
\c - - - :master_port
BEGIN;
@ -583,7 +572,6 @@ SELECT * FROM remove_node_reference_table;
\c - - - :master_port
-- re-add the node for next tests
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -603,10 +591,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -632,10 +619,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
\c - - - :master_port
BEGIN;
@ -704,7 +690,6 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.remove_
-- re-add the node for next tests
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -724,10 +709,9 @@ FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380000 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -903,8 +887,6 @@ WHERE
\c - - - :master_port
-- re-add the node for next tests
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
NOTICE: Replicating reference table "table1" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -926,11 +908,9 @@ WHERE
nodeport = :worker_2_port
ORDER BY
shardid;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -957,11 +937,9 @@ FROM
WHERE
nodeport = :worker_2_port
ORDER BY shardid ASC;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1380001 | 1 | 0 | localhost | 57638
1380002 | 1 | 0 | localhost | 57638
(2 rows)
(0 rows)
\c - - - :master_port
SELECT master_disable_node('localhost', :worker_2_port);
@ -1018,8 +996,6 @@ WHERE
\c - - - :master_port
-- re-add the node for next tests
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to the node localhost:xxxxx
NOTICE: Replicating reference table "table1" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1

View File

@ -70,12 +70,16 @@ SELECT create_reference_table('replicate_reference_table_unhealthy');
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1370000;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ERROR: could not find any healthy placement for shard xxxxx
?column?
---------------------------------------------------------------------
1
(1 row)
-- verify node is not added
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
---------------------------------------------------------------------
0
1
(1 row)
-- verify nothing is replicated to the new node
@ -106,9 +110,10 @@ FROM
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
(0 rows)
1370001 | 1 | 0 | localhost | 57638
(1 row)
SELECT *
FROM pg_dist_colocation
@ -122,7 +127,6 @@ WHERE colocationid IN
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_valid" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -247,7 +251,6 @@ WHERE colocationid IN
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_rollback" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -311,7 +314,6 @@ WHERE colocationid IN
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_commit" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -326,10 +328,9 @@ FROM
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370003 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -406,8 +407,7 @@ ORDER BY logicalrelid;
replicate_reference_table_hash | h | f | c
(2 rows)
BEGIN;
SET LOCAL client_min_messages TO ERROR;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
@ -426,7 +426,7 @@ SELECT create_reference_table('replicate_reference_table_reference_two');
(1 row)
COMMIT;
RESET client_min_messages;
-- status after master_add_node
SELECT
shardid, shardstate, shardlength, nodename, nodeport
@ -554,7 +554,6 @@ WHERE colocationid IN
BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "replicate_reference_table_drop" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -618,7 +617,6 @@ WHERE colocationid IN
(1 row)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "table1" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -632,10 +630,9 @@ FROM
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370011 | 1 | 0 | localhost | 57638
(1 row)
(0 rows)
SELECT *
FROM pg_dist_colocation
@ -681,9 +678,6 @@ ORDER BY shardid, nodeport;
(0 rows)
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "ref_table_1" to the node localhost:xxxxx
NOTICE: Replicating reference table "ref_table_2" to the node localhost:xxxxx
NOTICE: Replicating reference table "ref_table_3" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -697,19 +691,16 @@ FROM
WHERE
nodeport = :worker_2_port
ORDER BY shardid, nodeport;
shardid | shardstate | shardlength | nodename | nodeport
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370012 | 1 | 0 | localhost | 57638
1370013 | 1 | 0 | localhost | 57638
1370014 | 1 | 0 | localhost | 57638
(3 rows)
(0 rows)
-- verify constraints have been created on the new node
SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,2)
(localhost,57638,t,2)
(localhost,57638,t,0)
(2 rows)
DROP TABLE ref_table_1, ref_table_2, ref_table_3;
@ -754,7 +745,6 @@ ORDER BY 1,4,5;
-- we should see the two shard placements after activation
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "initially_not_replicated_reference_table" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -776,8 +766,7 @@ ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370015 | 1 | 0 | localhost | 57637
1370015 | 1 | 0 | localhost | 57638
(2 rows)
(1 row)
-- this should have no effect
SELECT 1 FROM master_add_node('localhost', :worker_2_port);

View File

@ -655,7 +655,6 @@ ORDER BY
\c - - - :master_port
-- try using the coordinator as a worker and then dropping the table
SELECT 1 FROM master_add_node('localhost', :master_port);
NOTICE: Replicating reference table "transactional_drop_reference" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1

View File

@ -247,7 +247,6 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam
-- and add the other node
SELECT 1 from master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "ref_table_2" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1
@ -443,7 +442,6 @@ BEGIN;
COMMIT;
-- add the node back
SELECT 1 from master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "t3" to the node localhost:xxxxx
?column?
---------------------------------------------------------------------
1

View File

@ -502,7 +502,9 @@ SELECT shardid, nodename, nodeport
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
-- add the node back
SET client_min_messages TO ERROR;
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
RESET client_min_messages;
RESET citus.shard_replication_factor;
-- add two new shards and verify they are created at both workers
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');

View File

@ -674,19 +674,14 @@ SELECT shardid, nodename, nodeport
(4 rows)
-- add the node back
SET client_min_messages TO ERROR;
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
NOTICE: Replicating reference table "orders_reference" to the node localhost:57637
NOTICE: Replicating reference table "customer" to the node localhost:57637
NOTICE: Replicating reference table "nation" to the node localhost:57637
NOTICE: Replicating reference table "part" to the node localhost:57637
NOTICE: Replicating reference table "supplier" to the node localhost:57637
NOTICE: Replicating reference table "multi_outer_join_right_reference" to the node localhost:57637
NOTICE: Replicating reference table "multi_outer_join_third_reference" to the node localhost:57637
?column?
----------
1
(1 row)
RESET client_min_messages;
RESET citus.shard_replication_factor;
-- add two new shards and verify they are created at both workers
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');

View File

@ -39,13 +39,6 @@ FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
-- fail activate node by failing reference table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
SELECT master_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.allow()');
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
@ -69,22 +62,6 @@ FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
-- fail activate node by failing reference table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").cancel(' || pg_backend_pid() || ')');
SELECT master_activate_node('localhost', :worker_2_proxy_port);
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
SELECT citus.mitmproxy('conn.allow()');
-- master_remove_node fails when there are shards on that worker
SELECT master_remove_node('localhost', :worker_2_proxy_port);
@ -113,34 +90,6 @@ FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
-- test master_add_node replicated a reference table
-- to newly added node.
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").kill()');
SELECT master_add_node('localhost', :worker_2_proxy_port);
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE").cancel(' || pg_backend_pid() || ')');
SELECT master_add_node('localhost', :worker_2_proxy_port);
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
-- reset cluster to original state
SELECT citus.mitmproxy('conn.allow()');
SELECT master_add_node('localhost', :worker_2_proxy_port);
@ -154,15 +103,6 @@ FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
ORDER BY placementid;
-- fail master_add_node by failing copy out operation
SELECT master_remove_node('localhost', :worker_1_port);
SELECT citus.mitmproxy('conn.onQuery(query="COPY").kill()');
SELECT master_add_node('localhost', :worker_1_port);
-- verify node is not added
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT citus.mitmproxy('conn.allow()');
SELECT master_add_node('localhost', :worker_1_port);

View File

@ -268,12 +268,11 @@ WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
BEGIN;
SET LOCAL client_min_messages TO ERROR;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
SELECT create_reference_table('replicate_reference_table_reference_two');
COMMIT;
RESET client_min_messages;
-- status after master_add_node
SELECT