Automatically add placeholder record for coordinator

pull/4434/head
Marco Slot 2020-12-18 08:33:24 +01:00
parent 597533b1ff
commit d900a7336e
18 changed files with 382 additions and 93 deletions

View File

@ -110,6 +110,9 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
EnsureCoordinator(); EnsureCoordinator();
EnsureTableOwner(relationId); EnsureTableOwner(relationId);
/* enable create_citus_local_table on an empty node */
InsertCoordinatorIfClusterEmpty();
/* /*
* Lock target relation with an AccessExclusiveLock as we don't want * Lock target relation with an AccessExclusiveLock as we don't want
* multiple backends manipulating this relation. We could actually simply * multiple backends manipulating this relation. We could actually simply

View File

@ -207,6 +207,9 @@ create_distributed_table(PG_FUNCTION_ARGS)
EnsureCitusTableCanBeCreated(relationId); EnsureCitusTableCanBeCreated(relationId);
/* enable create_distributed_table on an empty node */
InsertCoordinatorIfClusterEmpty();
/* /*
* Lock target relation with an exclusive lock - there's no way to make * Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple * sense of this table until we've committed, and we don't want multiple
@ -256,6 +259,9 @@ create_reference_table(PG_FUNCTION_ARGS)
EnsureCitusTableCanBeCreated(relationId); EnsureCitusTableCanBeCreated(relationId);
/* enable create_reference_table on an empty node */
InsertCoordinatorIfClusterEmpty();
/* /*
* Lock target relation with an exclusive lock - there's no way to make * Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple * sense of this table until we've committed, and we don't want multiple

View File

@ -669,6 +669,18 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
} }
/*
* HasAnyNodes returns whether there are any nodes in pg_dist_node.
*/
bool
HasAnyNodes(void)
{
PrepareWorkerNodeCache();
return WorkerNodeCount > 0;
}
/* /*
* LookupNodeByNodeId returns a worker node by nodeId or NULL if the node * LookupNodeByNodeId returns a worker node by nodeId or NULL if the node
* cannot be found. * cannot be found.

View File

@ -97,6 +97,7 @@ static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static int GetNextNodeId(void); static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
@ -163,14 +164,26 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
Name nodeClusterName = PG_GETARG_NAME(3); Name nodeClusterName = PG_GETARG_NAME(3);
nodeMetadata.nodeCluster = NameStr(*nodeClusterName); nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
bool nodeAlreadyExists = false;
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
/* add the coordinator to pg_dist_node if it was not already added */ /* prevent concurrent modification */
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, LockRelationOid(DistNodeRelationId(), RowShareLock);
&nodeAlreadyExists);
if (nodeAlreadyExists) bool isCoordinatorInMetadata = false;
WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
&isCoordinatorInMetadata);
if (!isCoordinatorInMetadata)
{
bool nodeAlreadyExists = false;
/* add the coordinator to pg_dist_node if it was not already added */
AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
/* we just checked */
Assert(!nodeAlreadyExists);
}
else
{ {
/* /*
* since AddNodeMetadata takes an exclusive lock on pg_dist_node, we * since AddNodeMetadata takes an exclusive lock on pg_dist_node, we
@ -178,12 +191,14 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
* can proceed to update immediately. * can proceed to update immediately.
*/ */
UpdateNodeLocation(nodeId, nodeNameString, nodePort); UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
/* clear cached plans that have the old host/port */ /* clear cached plans that have the old host/port */
ResetPlanCache(); ResetPlanCache();
} }
TransactionModifiedNodeMetadata = true;
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -222,6 +237,12 @@ master_add_node(PG_FUNCTION_ARGS)
nodeMetadata.nodeRole = PG_GETARG_OID(3); nodeMetadata.nodeRole = PG_GETARG_OID(3);
} }
if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
{
/* by default, we add the coordinator without shards */
nodeMetadata.shouldHaveShards = false;
}
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata, int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists); &nodeAlreadyExists);
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
@ -1268,17 +1289,57 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
return workerNode->nodeId; return workerNode->nodeId;
} }
if (nodeMetadata->groupId != COORDINATOR_GROUP_ID &&
strcmp(nodeName, "localhost") != 0)
{
/*
* User tries to add a worker with a non-localhost address. If the coordinator
* is added with "localhost" as well, the worker won't be able to connect.
*/
bool isCoordinatorInMetadata = false;
WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
&isCoordinatorInMetadata);
if (isCoordinatorInMetadata &&
strcmp(coordinatorNode->workerName, "localhost") == 0)
{
ereport(ERROR, (errmsg("cannot add a worker node when the coordinator "
"hostname is set to localhost"),
errdetail("Worker nodes need to be able to connect to the "
"coordinator to transfer data."),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"to configure the coordinator hostname")));
}
}
/*
* When adding the first worker when the coordinator has shard placements,
* print a notice on how to drain the coordinator.
*/
if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
ActivePrimaryNonCoordinatorNodeCount() == 0 &&
NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID, true))
{
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
ereport(NOTICE, (errmsg("shards are still on the coordinator after adding the "
"new node"),
errhint("Use SELECT rebalance_table_shards(); to balance "
"shards data between workers and coordinator or "
"SELECT citus_drain_node(%s,%d); to permanently "
"move shards away from the coordinator.",
quote_literal_cstr(coordinator->workerName),
coordinator->workerPort)));
}
/* user lets Citus to decide on the group that the newly added node should be in */ /* user lets Citus to decide on the group that the newly added node should be in */
if (nodeMetadata->groupId == INVALID_GROUP_ID) if (nodeMetadata->groupId == INVALID_GROUP_ID)
{ {
nodeMetadata->groupId = GetNextGroupId(); nodeMetadata->groupId = GetNextGroupId();
} }
/* if this is a coordinator, we shouldn't place shards on it */
if (nodeMetadata->groupId == COORDINATOR_GROUP_ID) if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
{ {
nodeMetadata->shouldHaveShards = false;
/* /*
* Coordinator has always the authoritative metadata, reflect this * Coordinator has always the authoritative metadata, reflect this
* fact in the pg_dist_node. * fact in the pg_dist_node.
@ -1586,6 +1647,53 @@ EnsureCoordinator(void)
} }
/*
* InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
* created even on a node that has just performed CREATE EXTENSION citus;
*/
void
InsertCoordinatorIfClusterEmpty(void)
{
/* prevent concurrent node additions */
Relation pgDistNode = table_open(DistNodeRelationId(), RowShareLock);
if (!HasAnyNodes())
{
/*
* create_distributed_table being called for the first time and there are
* no pg_dist_node records. Add a record for the coordinator.
*/
InsertPlaceholderCoordinatorRecord();
}
/*
* We release the lock, if InsertPlaceholderCoordinatorRecord was called
* we already have a strong (RowExclusive) lock.
*/
table_close(pgDistNode, RowShareLock);
}
/*
* InsertPlaceholderCoordinatorRecord inserts a placeholder record for the coordinator
* to be able to create distributed tables on a single node.
*/
static void
InsertPlaceholderCoordinatorRecord(void)
{
NodeMetadata nodeMetadata = DefaultNodeMetadata();
nodeMetadata.groupId = 0;
nodeMetadata.shouldHaveShards = true;
nodeMetadata.nodeRole = PrimaryNodeRoleId();
nodeMetadata.nodeCluster = "default";
bool nodeAlreadyExists = false;
/* as long as there is a single node, localhost should be ok */
AddNodeMetadata("localhost", PostPortNumber, &nodeMetadata, &nodeAlreadyExists);
}
/* /*
* InsertNodeRow opens the node system catalog, and inserts a new row with the * InsertNodeRow opens the node system catalog, and inserts a new row with the
* given values into that system catalog. * given values into that system catalog.

View File

@ -173,20 +173,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* load and sort the worker node list for deterministic placement */ /* load and sort the worker node list for deterministic placement */
List *workerNodeList = DistributedTablePlacementNodeList(NoLock); List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
if (list_length(workerNodeList) == 0)
{
/* fall back to using coordinator, if it is in the metadata */
WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID, NULL);
if (coordinatorNode != NULL)
{
workerNodeList = list_make1(coordinatorNode);
}
}
else
{
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
}
int32 workerNodeCount = list_length(workerNodeList); int32 workerNodeCount = list_length(workerNodeList);
if (replicationFactor > workerNodeCount) if (replicationFactor > workerNodeCount)

View File

@ -167,7 +167,6 @@ extern int ShardMaxSize;
extern int ShardPlacementPolicy; extern int ShardPlacementPolicy;
extern int NextShardId; extern int NextShardId;
extern int NextPlacementId; extern int NextPlacementId;
extern bool AddCoordinatorOnEmptyCluster;
extern bool IsCoordinator(void); extern bool IsCoordinator(void);

View File

@ -196,6 +196,7 @@ extern char LookupDistributionMethod(Oid distributionMethodOid);
extern bool RelationExists(Oid relationId); extern bool RelationExists(Oid relationId);
/* access WorkerNodeHash */ /* access WorkerNodeHash */
extern bool HasAnyNodes(void);
extern HTAB * GetWorkerNodeHash(void); extern HTAB * GetWorkerNodeHash(void);
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId); extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId); extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId);

View File

@ -88,6 +88,7 @@ extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);
extern void InsertCoordinatorIfClusterEmpty(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern uint32 GroupForNode(char *nodeName, int32 nodePort);
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
extern bool NodeIsPrimary(WorkerNode *worker); extern bool NodeIsPrimary(WorkerNode *worker);

View File

@ -1,13 +1,8 @@
SET citus.next_shard_id TO 1220000; SET citus.next_shard_id TO 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1;
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
-- Tests functions related to cluster membership -- Tests functions related to cluster membership
-- before starting the test, lets try to create reference table and see a
-- meaningful error
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
ERROR: cannot create reference table "test_reference_table"
DETAIL: There are no active worker nodes.
-- add the nodes to the cluster -- add the nodes to the cluster
SELECT 1 FROM master_add_node('localhost', :worker_1_port); SELECT 1 FROM master_add_node('localhost', :worker_1_port);
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
@ -127,6 +122,13 @@ SELECT master_get_active_worker_nodes();
(2 rows) (2 rows)
-- insert a row so that master_disable_node() exercises closing connections -- insert a row so that master_disable_node() exercises closing connections
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_reference_table VALUES (1, '1'); INSERT INTO test_reference_table VALUES (1, '1');
-- try to disable a node with active placements see that node is removed -- try to disable a node with active placements see that node is removed
-- observe that a notification is displayed -- observe that a notification is displayed
@ -146,6 +148,24 @@ SELECT master_get_active_worker_nodes();
-- try to disable a node which does not exist and see that an error is thrown -- try to disable a node which does not exist and see that an error is thrown
SELECT master_disable_node('localhost.noexist', 2345); SELECT master_disable_node('localhost.noexist', 2345);
ERROR: node at "localhost.noexist:2345" does not exist ERROR: node at "localhost.noexist:2345" does not exist
-- drop the table without leaving a shard placement behind (messes up other tests)
SELECT master_activate_node('localhost', :worker_2_port);
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
NOTICE: Replicating reference table "test_reference_table" to the node localhost:xxxxx
master_activate_node
---------------------------------------------------------------------
3
(1 row)
DROP TABLE test_reference_table;
SELECT master_disable_node('localhost', :worker_2_port);
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57638) to activate this node back.
master_disable_node
---------------------------------------------------------------------
(1 row)
CREATE USER non_super_user; CREATE USER non_super_user;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to worker nodes directly to manually create all necessary users and roles.

View File

@ -434,8 +434,14 @@ SELECT * FROM print_extension_changes();
-- Test downgrade to 9.5-1 from 10.0-1 -- Test downgrade to 9.5-1 from 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1'; ALTER EXTENSION citus UPDATE TO '10.0-1';
ALTER EXTENSION citus UPDATE TO '9.5-1'; ALTER EXTENSION citus UPDATE TO '9.5-1';
ERROR: syntax error at or near ">>>>>>>"
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
SELECT * FROM print_extension_changes(); SELECT * FROM print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -462,14 +468,6 @@ SELECT * FROM print_extension_changes();
| view time_partitions | view time_partitions
(21 rows) (21 rows)
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
NOTICE: version "10.0-1" of extension "citus" is already installed
SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version
SHOW citus.version; SHOW citus.version;

View File

@ -434,8 +434,14 @@ SELECT * FROM print_extension_changes();
-- Test downgrade to 9.5-1 from 10.0-1 -- Test downgrade to 9.5-1 from 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1'; ALTER EXTENSION citus UPDATE TO '10.0-1';
ALTER EXTENSION citus UPDATE TO '9.5-1'; ALTER EXTENSION citus UPDATE TO '9.5-1';
ERROR: syntax error at or near ">>>>>>>"
-- Should be empty result since upgrade+downgrade should be a no-op -- Should be empty result since upgrade+downgrade should be a no-op
SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
SELECT * FROM print_extension_changes(); SELECT * FROM print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -458,14 +464,6 @@ SELECT * FROM print_extension_changes();
| view time_partitions | view time_partitions
(17 rows) (17 rows)
-- Snapshot of state at 10.0-1
ALTER EXTENSION citus UPDATE TO '10.0-1';
NOTICE: version "10.0-1" of extension "citus" is already installed
SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version
SHOW citus.version; SHOW citus.version;

View File

@ -764,10 +764,10 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- verify we cannot replicate reference tables in a transaction modifying pg_dist_node -- verify we cannot replicate reference tables in a transaction modifying pg_dist_node
BEGIN; BEGIN;
SELECT 1 FROM master_add_inactive_node('invalid-node-name', 9999); SELECT citus_set_coordinator_host('127.0.0.1');
?column? citus_set_coordinator_host
--------------------------------------------------------------------- ---------------------------------------------------------------------
1
(1 row) (1 row)
SELECT replicate_reference_tables(); SELECT replicate_reference_tables();

View File

@ -362,7 +362,6 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref; DROP TABLE test_recovery_ref;
DROP TABLE test_recovery; DROP TABLE test_recovery;
DROP TABLE test_recovery_single; DROP TABLE test_recovery_single;
DROP TABLE test_reference_table;
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -20,6 +20,61 @@ SELECT 1 FROM master_disable_node('localhost', :master_port);
ERROR: Disabling localhost:xxxxx failed ERROR: Disabling localhost:xxxxx failed
DETAIL: cannot change "isactive" field of the coordinator node DETAIL: cannot change "isactive" field of the coordinator node
RESET client_min_messages; RESET client_min_messages;
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_node;
count
---------------------------------------------------------------------
0
(1 row)
-- there are no workers now, but we should still be able to create Citus tables
CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
---------------------------------------------------------------------
0 | localhost | 57636 | t | t | t | t
(1 row)
DROP TABLE ref;
-- remove the coordinator to try again with create_reference_table
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
master_remove_node
---------------------------------------------------------------------
(1 row)
CREATE TABLE loc(x int, y int);
SELECT create_citus_local_table('loc');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
---------------------------------------------------------------------
0 | localhost | 57636 | t | t | t | t
(1 row)
DROP TABLE loc;
-- remove the coordinator to try again with create_distributed_table
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
master_remove_node
---------------------------------------------------------------------
(1 row)
CREATE TABLE test(x int, y int); CREATE TABLE test(x int, y int);
SELECT create_distributed_table('test','x'); SELECT create_distributed_table('test','x');
create_distributed_table create_distributed_table
@ -27,7 +82,62 @@ SELECT create_distributed_table('test','x');
(1 row) (1 row)
-- should have shards setting should not matter for a single node SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
---------------------------------------------------------------------
0 | localhost | 57636 | t | t | t | t
(1 row)
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
ERROR: cannot add a worker node when the coordinator hostname is set to localhost
DETAIL: Worker nodes need to be able to connect to the coordinator to transfer data.
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
NOTICE: shards are still on the coordinator after adding the new node
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('localhost',57636); to permanently move shards away from the coordinator.
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- set the coordinator host to something different than localhost
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
?column?
---------------------------------------------------------------------
1
(1 row)
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
NOTICE: shards are still on the coordinator after adding the new node
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('127.0.0.1',57636); to permanently move shards away from the coordinator.
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- set the coordinator host back to localhost for the remainder of tests
SELECT 1 FROM citus_set_coordinator_host('localhost');
?column?
---------------------------------------------------------------------
1
(1 row)
-- should have shards setting should not really matter for a single node
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -126,21 +236,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
SET citus.log_remote_commands to true; SET citus.log_remote_commands to true;
-- observe that there is a conflict and the following query does nothing -- observe that there is a conflict and the following query does nothing
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *; INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col part_key | other_col | third_col
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
-- same as the above with different syntax -- same as the above with different syntax
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *; INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col part_key | other_col | third_col
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
-- again the same query with another syntax -- again the same query with another syntax
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630513 DO NOTHING RETURNING part_key, other_col, third_col NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630515 DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col part_key | other_col | third_col
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -148,7 +258,7 @@ NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_9063
BEGIN; BEGIN;
-- force local execution -- force local execution
SELECT count(*) FROM upsert_test WHERE part_key = 1; SELECT count(*) FROM upsert_test WHERE part_key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630513 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630515 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -234,10 +344,10 @@ SET search_path TO single_node;
DROP SCHEMA "Quoed.Schema" CASCADE; DROP SCHEMA "Quoed.Schema" CASCADE;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
drop cascades to table "Quoed.Schema".simple_table_name_90630518
drop cascades to table "Quoed.Schema".simple_table_name_90630519
drop cascades to table "Quoed.Schema".simple_table_name_90630520 drop cascades to table "Quoed.Schema".simple_table_name_90630520
drop cascades to table "Quoed.Schema".simple_table_name_90630521 drop cascades to table "Quoed.Schema".simple_table_name_90630521
drop cascades to table "Quoed.Schema".simple_table_name_90630522
drop cascades to table "Quoed.Schema".simple_table_name_90630523
-- test partitioned index creation with long name -- test partitioned index creation with long name
CREATE TABLE test_index_creation1 CREATE TABLE test_index_creation1
( (
@ -424,7 +534,7 @@ EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE)
-> Task -> Task
Tuple data received from node: 4 bytes Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on test_90630500 test (actual rows=2 loops=1) -> Seq Scan on test_90630502 test (actual rows=2 loops=1)
(8 rows) (8 rows)
-- common utility command -- common utility command
@ -1105,7 +1215,7 @@ END;$$;
SELECT * FROM pg_dist_node; SELECT * FROM pg_dist_node;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t 4 | 0 | localhost | 57636 | default | t | t | primary | default | t | t
(1 row) (1 row)
SELECT create_distributed_function('call_delegation(int)', '$1', 'test'); SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
@ -1252,56 +1362,56 @@ SELECT pg_sleep(0.1);
SET citus.executor_slow_start_interval TO 10; SET citus.executor_slow_start_interval TO 10;
SELECT count(*) from another_schema_table; SELECT count(*) from another_schema_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630509 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630510 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
UPDATE another_schema_table SET b = b; UPDATE another_schema_table SET b = b;
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630509 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630510 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = b
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning -- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
-- not that we ignore INSERT .. SELECT via coordinator as it relies on -- not that we ignore INSERT .. SELECT via coordinator as it relies on
-- COPY command -- COPY command
INSERT INTO another_schema_table SELECT * FROM another_schema_table; INSERT INTO another_schema_table SELECT * FROM another_schema_table;
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE (a IS NOT NULL)
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
-- multi-row INSERTs -- multi-row INSERTs
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) VALUES (1,1), (5,5) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) VALUES (6,6)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) VALUES (2,2)
-- INSERT..SELECT with re-partitioning when using local execution -- INSERT..SELECT with re-partitioning when using local execution
BEGIN; BEGIN;
INSERT INTO another_schema_table VALUES (1,100); INSERT INTO another_schema_table VALUES (1,100);
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 (a, b) VALUES (1, 100) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 (a, b) VALUES (1, 100)
INSERT INTO another_schema_table VALUES (2,100); INSERT INTO another_schema_table VALUES (2,100);
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 (a, b) VALUES (2, 100) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 (a, b) VALUES (2, 100)
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630510_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_2,repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630512_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630513_to_2,repartitioned_results_xxxxx_from_90630514_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630514_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b; SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b
a | b a | b
--------------------------------------------------------------------- ---------------------------------------------------------------------
100 | 1 100 | 1
@ -1312,10 +1422,10 @@ ROLLBACK;
-- intermediate results -- intermediate results
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1; SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -1,14 +1,10 @@
SET citus.next_shard_id TO 1220000; SET citus.next_shard_id TO 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1;
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
-- Tests functions related to cluster membership -- Tests functions related to cluster membership
-- before starting the test, lets try to create reference table and see a
-- meaningful error
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
-- add the nodes to the cluster -- add the nodes to the cluster
SELECT 1 FROM master_add_node('localhost', :worker_1_port); SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port);
@ -49,6 +45,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
SELECT master_get_active_worker_nodes(); SELECT master_get_active_worker_nodes();
-- insert a row so that master_disable_node() exercises closing connections -- insert a row so that master_disable_node() exercises closing connections
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
INSERT INTO test_reference_table VALUES (1, '1'); INSERT INTO test_reference_table VALUES (1, '1');
-- try to disable a node with active placements see that node is removed -- try to disable a node with active placements see that node is removed
@ -59,6 +57,11 @@ SELECT master_get_active_worker_nodes();
-- try to disable a node which does not exist and see that an error is thrown -- try to disable a node which does not exist and see that an error is thrown
SELECT master_disable_node('localhost.noexist', 2345); SELECT master_disable_node('localhost.noexist', 2345);
-- drop the table without leaving a shard placement behind (messes up other tests)
SELECT master_activate_node('localhost', :worker_2_port);
DROP TABLE test_reference_table;
SELECT master_disable_node('localhost', :worker_2_port);
CREATE USER non_super_user; CREATE USER non_super_user;
CREATE USER node_metadata_user; CREATE USER node_metadata_user;
GRANT EXECUTE ON FUNCTION master_activate_node(text,int) TO node_metadata_user; GRANT EXECUTE ON FUNCTION master_activate_node(text,int) TO node_metadata_user;

View File

@ -506,7 +506,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
-- verify we cannot replicate reference tables in a transaction modifying pg_dist_node -- verify we cannot replicate reference tables in a transaction modifying pg_dist_node
BEGIN; BEGIN;
SELECT 1 FROM master_add_inactive_node('invalid-node-name', 9999); SELECT citus_set_coordinator_host('127.0.0.1');
SELECT replicate_reference_tables(); SELECT replicate_reference_tables();
ROLLBACK; ROLLBACK;

View File

@ -200,6 +200,5 @@ SELECT pg_reload_conf();
DROP TABLE test_recovery_ref; DROP TABLE test_recovery_ref;
DROP TABLE test_recovery; DROP TABLE test_recovery;
DROP TABLE test_recovery_single; DROP TABLE test_recovery_single;
DROP TABLE test_reference_table;
SELECT 1 FROM master_remove_node('localhost', :master_port); SELECT 1 FROM master_remove_node('localhost', :master_port);

View File

@ -17,10 +17,55 @@ SELECT 1 FROM master_disable_node('localhost', :master_port);
RESET client_min_messages; RESET client_min_messages;
SELECT 1 FROM master_remove_node('localhost', :master_port);
SELECT count(*) FROM pg_dist_node;
-- there are no workers now, but we should still be able to create Citus tables
CREATE TABLE ref(x int, y int);
SELECT create_reference_table('ref');
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
DROP TABLE ref;
-- remove the coordinator to try again with create_reference_table
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
CREATE TABLE loc(x int, y int);
SELECT create_citus_local_table('loc');
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
DROP TABLE loc;
-- remove the coordinator to try again with create_distributed_table
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
CREATE TABLE test(x int, y int); CREATE TABLE test(x int, y int);
SELECT create_distributed_table('test','x'); SELECT create_distributed_table('test','x');
-- should have shards setting should not matter for a single node SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
-- adding localhost workers is ok
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
-- set the coordinator host to something different than localhost
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
-- adding workers with specific IP is ok now
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
-- set the coordinator host back to localhost for the remainder of tests
SELECT 1 FROM citus_set_coordinator_host('localhost');
-- should have shards setting should not really matter for a single node
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
CREATE TYPE new_type AS (n int, m text); CREATE TYPE new_type AS (n int, m text);