mirror of https://github.com/citusdata/citus.git
Merge pull request #4434 from citusdata/marcocitus/single-node
commit
21fd2e2c92
|
@ -110,6 +110,9 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
|
|||
EnsureCoordinator();
|
||||
EnsureTableOwner(relationId);
|
||||
|
||||
/* enable create_citus_local_table on an empty node */
|
||||
InsertCoordinatorIfClusterEmpty();
|
||||
|
||||
/*
|
||||
* Lock target relation with an AccessExclusiveLock as we don't want
|
||||
* multiple backends manipulating this relation. We could actually simply
|
||||
|
|
|
@ -207,6 +207,9 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
|||
|
||||
EnsureCitusTableCanBeCreated(relationId);
|
||||
|
||||
/* enable create_distributed_table on an empty node */
|
||||
InsertCoordinatorIfClusterEmpty();
|
||||
|
||||
/*
|
||||
* Lock target relation with an exclusive lock - there's no way to make
|
||||
* sense of this table until we've committed, and we don't want multiple
|
||||
|
@ -256,6 +259,9 @@ create_reference_table(PG_FUNCTION_ARGS)
|
|||
|
||||
EnsureCitusTableCanBeCreated(relationId);
|
||||
|
||||
/* enable create_reference_table on an empty node */
|
||||
InsertCoordinatorIfClusterEmpty();
|
||||
|
||||
/*
|
||||
* Lock target relation with an exclusive lock - there's no way to make
|
||||
* sense of this table until we've committed, and we don't want multiple
|
||||
|
|
|
@ -669,6 +669,18 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasAnyNodes returns whether there are any nodes in pg_dist_node.
|
||||
*/
|
||||
bool
|
||||
HasAnyNodes(void)
|
||||
{
|
||||
PrepareWorkerNodeCache();
|
||||
|
||||
return WorkerNodeCount > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupNodeByNodeId returns a worker node by nodeId or NULL if the node
|
||||
* cannot be found.
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/lock.h"
|
||||
|
@ -96,6 +97,7 @@ static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
|
|||
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
|
||||
static int32 GetNextGroupId(void);
|
||||
static int GetNextNodeId(void);
|
||||
static void InsertPlaceholderCoordinatorRecord(void);
|
||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
|
||||
*nodeMetadata);
|
||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||
|
@ -110,7 +112,9 @@ static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum val
|
|||
char *field);
|
||||
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
|
||||
PG_FUNCTION_INFO_V1(master_add_node);
|
||||
PG_FUNCTION_INFO_V1(master_add_inactive_node);
|
||||
PG_FUNCTION_INFO_V1(master_add_secondary_node);
|
||||
|
@ -141,6 +145,64 @@ DefaultNodeMetadata()
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_set_coordinator_host configures the hostname and port through which worker
|
||||
* nodes can connect to the coordinator.
|
||||
*/
|
||||
Datum
|
||||
citus_set_coordinator_host(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
nodeMetadata.groupId = 0;
|
||||
nodeMetadata.shouldHaveShards = false;
|
||||
nodeMetadata.nodeRole = PG_GETARG_OID(2);
|
||||
|
||||
Name nodeClusterName = PG_GETARG_NAME(3);
|
||||
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* prevent concurrent modification */
|
||||
LockRelationOid(DistNodeRelationId(), RowShareLock);
|
||||
|
||||
bool isCoordinatorInMetadata = false;
|
||||
WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
|
||||
&isCoordinatorInMetadata);
|
||||
if (!isCoordinatorInMetadata)
|
||||
{
|
||||
bool nodeAlreadyExists = false;
|
||||
|
||||
/* add the coordinator to pg_dist_node if it was not already added */
|
||||
AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
|
||||
/* we just checked */
|
||||
Assert(!nodeAlreadyExists);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* since AddNodeMetadata takes an exclusive lock on pg_dist_node, we
|
||||
* do not need to worry about concurrent changes (e.g. deletion) and
|
||||
* can proceed to update immediately.
|
||||
*/
|
||||
|
||||
UpdateNodeLocation(coordinatorNode->nodeId, nodeNameString, nodePort);
|
||||
|
||||
/* clear cached plans that have the old host/port */
|
||||
ResetPlanCache();
|
||||
}
|
||||
|
||||
TransactionModifiedNodeMetadata = true;
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_add_node function adds a new node to the cluster and returns its id. It also
|
||||
* replicates all reference tables to the new node.
|
||||
|
@ -175,6 +237,12 @@ master_add_node(PG_FUNCTION_ARGS)
|
|||
nodeMetadata.nodeRole = PG_GETARG_OID(3);
|
||||
}
|
||||
|
||||
if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
|
||||
{
|
||||
/* by default, we add the coordinator without shards */
|
||||
nodeMetadata.shouldHaveShards = false;
|
||||
}
|
||||
|
||||
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
|
||||
&nodeAlreadyExists);
|
||||
TransactionModifiedNodeMetadata = true;
|
||||
|
@ -1221,17 +1289,57 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
|
|||
return workerNode->nodeId;
|
||||
}
|
||||
|
||||
if (nodeMetadata->groupId != COORDINATOR_GROUP_ID &&
|
||||
strcmp(nodeName, "localhost") != 0)
|
||||
{
|
||||
/*
|
||||
* User tries to add a worker with a non-localhost address. If the coordinator
|
||||
* is added with "localhost" as well, the worker won't be able to connect.
|
||||
*/
|
||||
|
||||
bool isCoordinatorInMetadata = false;
|
||||
WorkerNode *coordinatorNode = PrimaryNodeForGroup(COORDINATOR_GROUP_ID,
|
||||
&isCoordinatorInMetadata);
|
||||
if (isCoordinatorInMetadata &&
|
||||
strcmp(coordinatorNode->workerName, "localhost") == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot add a worker node when the coordinator "
|
||||
"hostname is set to localhost"),
|
||||
errdetail("Worker nodes need to be able to connect to the "
|
||||
"coordinator to transfer data."),
|
||||
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
|
||||
"to configure the coordinator hostname")));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* When adding the first worker when the coordinator has shard placements,
|
||||
* print a notice on how to drain the coordinator.
|
||||
*/
|
||||
if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
|
||||
ActivePrimaryNonCoordinatorNodeCount() == 0 &&
|
||||
NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID, true))
|
||||
{
|
||||
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
|
||||
|
||||
ereport(NOTICE, (errmsg("shards are still on the coordinator after adding the "
|
||||
"new node"),
|
||||
errhint("Use SELECT rebalance_table_shards(); to balance "
|
||||
"shards data between workers and coordinator or "
|
||||
"SELECT citus_drain_node(%s,%d); to permanently "
|
||||
"move shards away from the coordinator.",
|
||||
quote_literal_cstr(coordinator->workerName),
|
||||
coordinator->workerPort)));
|
||||
}
|
||||
|
||||
/* user lets Citus to decide on the group that the newly added node should be in */
|
||||
if (nodeMetadata->groupId == INVALID_GROUP_ID)
|
||||
{
|
||||
nodeMetadata->groupId = GetNextGroupId();
|
||||
}
|
||||
|
||||
/* if this is a coordinator, we shouldn't place shards on it */
|
||||
if (nodeMetadata->groupId == COORDINATOR_GROUP_ID)
|
||||
{
|
||||
nodeMetadata->shouldHaveShards = false;
|
||||
|
||||
/*
|
||||
* Coordinator has always the authoritative metadata, reflect this
|
||||
* fact in the pg_dist_node.
|
||||
|
@ -1539,6 +1647,53 @@ EnsureCoordinator(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
|
||||
* created even on a node that has just performed CREATE EXTENSION citus;
|
||||
*/
|
||||
void
|
||||
InsertCoordinatorIfClusterEmpty(void)
|
||||
{
|
||||
/* prevent concurrent node additions */
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowShareLock);
|
||||
|
||||
if (!HasAnyNodes())
|
||||
{
|
||||
/*
|
||||
* create_distributed_table being called for the first time and there are
|
||||
* no pg_dist_node records. Add a record for the coordinator.
|
||||
*/
|
||||
InsertPlaceholderCoordinatorRecord();
|
||||
}
|
||||
|
||||
/*
|
||||
* We release the lock, if InsertPlaceholderCoordinatorRecord was called
|
||||
* we already have a strong (RowExclusive) lock.
|
||||
*/
|
||||
table_close(pgDistNode, RowShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertPlaceholderCoordinatorRecord inserts a placeholder record for the coordinator
|
||||
* to be able to create distributed tables on a single node.
|
||||
*/
|
||||
static void
|
||||
InsertPlaceholderCoordinatorRecord(void)
|
||||
{
|
||||
NodeMetadata nodeMetadata = DefaultNodeMetadata();
|
||||
nodeMetadata.groupId = 0;
|
||||
nodeMetadata.shouldHaveShards = true;
|
||||
nodeMetadata.nodeRole = PrimaryNodeRoleId();
|
||||
nodeMetadata.nodeCluster = "default";
|
||||
|
||||
bool nodeAlreadyExists = false;
|
||||
|
||||
/* as long as there is a single node, localhost should be ok */
|
||||
AddNodeMetadata("localhost", PostPortNumber, &nodeMetadata, &nodeAlreadyExists);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertNodeRow opens the node system catalog, and inserts a new row with the
|
||||
* given values into that system catalog.
|
||||
|
|
|
@ -8,6 +8,7 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
|||
#include "udfs/citus_finish_pg_upgrade/10.0-1.sql"
|
||||
#include "udfs/undistribute_table/10.0-1.sql"
|
||||
#include "udfs/create_citus_local_table/10.0-1.sql"
|
||||
#include "udfs/citus_set_coordinator_host/10.0-1.sql"
|
||||
|
||||
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@ DROP FUNCTION pg_catalog.create_citus_local_table(regclass,boolean);
|
|||
DROP VIEW pg_catalog.time_partitions;
|
||||
DROP FUNCTION pg_catalog.time_partition_range(regclass);
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name);
|
||||
|
||||
#include "../udfs/citus_total_relation_size/7.0-1.sql"
|
||||
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"
|
||||
#include "../udfs/undistribute_table/9.5-1.sql"
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
CREATE FUNCTION pg_catalog.citus_set_coordinator_host(
|
||||
host text,
|
||||
port integer default current_setting('port')::int,
|
||||
node_role noderole default 'primary',
|
||||
node_cluster name default 'default')
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_set_coordinator_host$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_set_coordinator_host(text,integer,noderole,name)
|
||||
IS 'set the host and port of the coordinator';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name) FROM PUBLIC;
|
|
@ -0,0 +1,13 @@
|
|||
CREATE FUNCTION pg_catalog.citus_set_coordinator_host(
|
||||
host text,
|
||||
port integer default current_setting('port')::int,
|
||||
node_role noderole default 'primary',
|
||||
node_cluster name default 'default')
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$citus_set_coordinator_host$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_set_coordinator_host(text,integer,noderole,name)
|
||||
IS 'set the host and port of the coordinator';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_set_coordinator_host(text,int,noderole,name) FROM PUBLIC;
|
|
@ -196,6 +196,7 @@ extern char LookupDistributionMethod(Oid distributionMethodOid);
|
|||
extern bool RelationExists(Oid relationId);
|
||||
|
||||
/* access WorkerNodeHash */
|
||||
extern bool HasAnyNodes(void);
|
||||
extern HTAB * GetWorkerNodeHash(void);
|
||||
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
|
||||
extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId);
|
||||
|
|
|
@ -88,6 +88,7 @@ extern WorkerNode * FindWorkerNodeOrError(const char *nodeName, int32 nodePort);
|
|||
extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort);
|
||||
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
|
||||
extern void EnsureCoordinator(void);
|
||||
extern void InsertCoordinatorIfClusterEmpty(void);
|
||||
extern uint32 GroupForNode(char *nodeName, int32 nodePort);
|
||||
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
|
||||
extern bool NodeIsPrimary(WorkerNode *worker);
|
||||
|
|
|
@ -1,13 +1,8 @@
|
|||
SET citus.next_shard_id TO 1220000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1;
|
||||
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
|
||||
-- Tests functions related to cluster membership
|
||||
-- before starting the test, lets try to create reference table and see a
|
||||
-- meaningful error
|
||||
CREATE TABLE test_reference_table (y int primary key, name text);
|
||||
SELECT create_reference_table('test_reference_table');
|
||||
ERROR: cannot create reference table "test_reference_table"
|
||||
DETAIL: There are no active worker nodes.
|
||||
-- add the nodes to the cluster
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
||||
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
|
||||
|
@ -127,6 +122,13 @@ SELECT master_get_active_worker_nodes();
|
|||
(2 rows)
|
||||
|
||||
-- insert a row so that master_disable_node() exercises closing connections
|
||||
CREATE TABLE test_reference_table (y int primary key, name text);
|
||||
SELECT create_reference_table('test_reference_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_reference_table VALUES (1, '1');
|
||||
-- try to disable a node with active placements see that node is removed
|
||||
-- observe that a notification is displayed
|
||||
|
@ -146,6 +148,24 @@ SELECT master_get_active_worker_nodes();
|
|||
-- try to disable a node which does not exist and see that an error is thrown
|
||||
SELECT master_disable_node('localhost.noexist', 2345);
|
||||
ERROR: node at "localhost.noexist:2345" does not exist
|
||||
-- drop the table without leaving a shard placement behind (messes up other tests)
|
||||
SELECT master_activate_node('localhost', :worker_2_port);
|
||||
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
|
||||
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
|
||||
NOTICE: Replicating reference table "test_reference_table" to the node localhost:xxxxx
|
||||
master_activate_node
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_reference_table;
|
||||
SELECT master_disable_node('localhost', :worker_2_port);
|
||||
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57638) to activate this node back.
|
||||
master_disable_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE USER non_super_user;
|
||||
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
|
||||
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
|
||||
|
|
|
@ -453,6 +453,7 @@ SELECT * FROM print_extension_changes();
|
|||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_set_coordinator_host(text,integer,noderole,name)
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function columnar.columnar_handler(internal)
|
||||
| function create_citus_local_table(regclass,boolean)
|
||||
|
@ -465,7 +466,7 @@ SELECT * FROM print_extension_changes();
|
|||
| table columnar.options
|
||||
| view citus_tables
|
||||
| view time_partitions
|
||||
(20 rows)
|
||||
(21 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -443,13 +443,14 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 10.0-1
|
||||
ALTER EXTENSION citus UPDATE TO '10.0-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_total_relation_size(regclass) |
|
||||
function create_citus_local_table(regclass) |
|
||||
function undistribute_table(regclass) |
|
||||
function upgrade_to_reference_table(regclass) |
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_set_coordinator_host(text,integer,noderole,name)
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function create_citus_local_table(regclass,boolean)
|
||||
| function time_partition_range(regclass)
|
||||
|
@ -461,7 +462,7 @@ SELECT * FROM print_extension_changes();
|
|||
| table columnar.options
|
||||
| view citus_tables
|
||||
| view time_partitions
|
||||
(16 rows)
|
||||
(17 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -764,10 +764,10 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
|||
|
||||
-- verify we cannot replicate reference tables in a transaction modifying pg_dist_node
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_add_inactive_node('invalid-node-name', 9999);
|
||||
?column?
|
||||
SELECT citus_set_coordinator_host('127.0.0.1');
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT replicate_reference_tables();
|
||||
|
|
|
@ -362,7 +362,6 @@ SELECT pg_reload_conf();
|
|||
DROP TABLE test_recovery_ref;
|
||||
DROP TABLE test_recovery;
|
||||
DROP TABLE test_recovery_single;
|
||||
DROP TABLE test_reference_table;
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -9,7 +9,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
|
|||
ERROR: coordinator node cannot be added as inactive node
|
||||
-- idempotently add node to allow this test to run without add_coordinator
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -20,12 +20,61 @@ SELECT 1 FROM master_disable_node('localhost', :master_port);
|
|||
ERROR: Disabling localhost:xxxxx failed
|
||||
DETAIL: cannot change "isactive" field of the coordinator node
|
||||
RESET client_min_messages;
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_dist_node;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- there are no workers now, but we should still be able to create Citus tables
|
||||
CREATE TABLE ref(x int, y int);
|
||||
SELECT create_reference_table('ref');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
||||
---------------------------------------------------------------------
|
||||
0 | localhost | 57636 | t | t | t | t
|
||||
(1 row)
|
||||
|
||||
DROP TABLE ref;
|
||||
-- remove the coordinator to try again with create_reference_table
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE loc(x int, y int);
|
||||
SELECT create_citus_local_table('loc');
|
||||
create_citus_local_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
||||
---------------------------------------------------------------------
|
||||
0 | localhost | 57636 | t | t | t | t
|
||||
(1 row)
|
||||
|
||||
DROP TABLE loc;
|
||||
-- remove the coordinator to try again with create_distributed_table
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE test(x int, y int);
|
||||
SELECT create_distributed_table('test','x');
|
||||
create_distributed_table
|
||||
|
@ -33,6 +82,68 @@ SELECT create_distributed_table('test','x');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
groupid | nodename | nodeport | isactive | shouldhaveshards | hasmetadata | metadatasynced
|
||||
---------------------------------------------------------------------
|
||||
0 | localhost | 57636 | t | t | t | t
|
||||
(1 row)
|
||||
|
||||
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
|
||||
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
||||
ERROR: cannot add a worker node when the coordinator hostname is set to localhost
|
||||
DETAIL: Worker nodes need to be able to connect to the coordinator to transfer data.
|
||||
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
|
||||
-- adding localhost workers is ok
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
||||
NOTICE: shards are still on the coordinator after adding the new node
|
||||
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('localhost',57636); to permanently move shards away from the coordinator.
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- set the coordinator host to something different than localhost
|
||||
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- adding workers with specific IP is ok now
|
||||
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
||||
NOTICE: shards are still on the coordinator after adding the new node
|
||||
HINT: Use SELECT rebalance_table_shards(); to balance shards data between workers and coordinator or SELECT citus_drain_node('127.0.0.1',57636); to permanently move shards away from the coordinator.
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- set the coordinator host back to localhost for the remainder of tests
|
||||
SELECT 1 FROM citus_set_coordinator_host('localhost');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- should have shards setting should not really matter for a single node
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
CREATE TYPE new_type AS (n int, m text);
|
||||
CREATE TABLE test_2(x int, y int, z new_type);
|
||||
SELECT create_distributed_table('test_2','x');
|
||||
|
@ -125,21 +236,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
|
|||
SET citus.log_remote_commands to true;
|
||||
-- observe that there is a conflict and the following query does nothing
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- same as the above with different syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- again the same query with another syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630513 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630513 DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630515 DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
@ -147,7 +258,7 @@ NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_9063
|
|||
BEGIN;
|
||||
-- force local execution
|
||||
SELECT count(*) FROM upsert_test WHERE part_key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630513 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630515 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -233,10 +344,10 @@ SET search_path TO single_node;
|
|||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630518
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630519
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630520
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630521
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630522
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630523
|
||||
-- test partitioned index creation with long name
|
||||
CREATE TABLE test_index_creation1
|
||||
(
|
||||
|
@ -423,7 +534,7 @@ EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE)
|
|||
-> Task
|
||||
Tuple data received from node: 4 bytes
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on test_90630500 test (actual rows=2 loops=1)
|
||||
-> Seq Scan on test_90630502 test (actual rows=2 loops=1)
|
||||
(8 rows)
|
||||
|
||||
-- common utility command
|
||||
|
@ -1104,7 +1215,7 @@ END;$$;
|
|||
SELECT * FROM pg_dist_node;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards
|
||||
---------------------------------------------------------------------
|
||||
1 | 0 | localhost | 57636 | default | t | t | primary | default | t | t
|
||||
4 | 0 | localhost | 57636 | default | t | t | primary | default | t | t
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('call_delegation(int)', '$1', 'test');
|
||||
|
@ -1251,56 +1362,56 @@ SELECT pg_sleep(0.1);
|
|||
|
||||
SET citus.executor_slow_start_interval TO 10;
|
||||
SELECT count(*) from another_schema_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630509 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630510 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630513 another_schema_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630514 another_schema_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
UPDATE another_schema_table SET b = b;
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630509 another_schema_table SET b = b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630510 another_schema_table SET b = b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630513 another_schema_table SET b = b
|
||||
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630514 another_schema_table SET b = b
|
||||
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
|
||||
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
|
||||
-- COPY command
|
||||
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE (a IS NOT NULL)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE (a IS NOT NULL)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a IS NOT NULL)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE (a IS NOT NULL)
|
||||
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
-- multi-row INSERTs
|
||||
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) VALUES (6,6)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) VALUES (2,2)
|
||||
-- INSERT..SELECT with re-partitioning when using local execution
|
||||
BEGIN;
|
||||
INSERT INTO another_schema_table VALUES (1,100);
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 (a, b) VALUES (1, 100)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 (a, b) VALUES (1, 100)
|
||||
INSERT INTO another_schema_table VALUES (2,100);
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 (a, b) VALUES (2, 100)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 (a, b) VALUES (2, 100)
|
||||
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630510_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_2,repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630512_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630513_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630513_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630514_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630514_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630513_to_2,repartitioned_results_xxxxx_from_90630514_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630514_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer)
|
||||
SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
100 | 1
|
||||
|
@ -1311,10 +1422,10 @@ ROLLBACK;
|
|||
-- intermediate results
|
||||
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -56,6 +56,7 @@ ORDER BY 1;
|
|||
function citus_relation_size(regclass)
|
||||
function citus_remote_connection_stats()
|
||||
function citus_server_id()
|
||||
function citus_set_coordinator_host(text,integer,noderole,name)
|
||||
function citus_set_default_rebalance_strategy(text)
|
||||
function citus_shard_allowed_on_node_true(bigint,integer)
|
||||
function citus_shard_cost_1(bigint)
|
||||
|
@ -219,5 +220,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(203 rows)
|
||||
(204 rows)
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ ORDER BY 1;
|
|||
function citus_relation_size(regclass)
|
||||
function citus_remote_connection_stats()
|
||||
function citus_server_id()
|
||||
function citus_set_coordinator_host(text,integer,noderole,name)
|
||||
function citus_set_default_rebalance_strategy(text)
|
||||
function citus_shard_allowed_on_node_true(bigint,integer)
|
||||
function citus_shard_cost_1(bigint)
|
||||
|
@ -215,5 +216,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(199 rows)
|
||||
(200 rows)
|
||||
|
||||
|
|
|
@ -1,14 +1,10 @@
|
|||
SET citus.next_shard_id TO 1220000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1;
|
||||
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
|
||||
|
||||
-- Tests functions related to cluster membership
|
||||
|
||||
-- before starting the test, lets try to create reference table and see a
|
||||
-- meaningful error
|
||||
CREATE TABLE test_reference_table (y int primary key, name text);
|
||||
SELECT create_reference_table('test_reference_table');
|
||||
|
||||
-- add the nodes to the cluster
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
|
@ -49,6 +45,8 @@ SELECT master_remove_node('localhost', :worker_2_port);
|
|||
SELECT master_get_active_worker_nodes();
|
||||
|
||||
-- insert a row so that master_disable_node() exercises closing connections
|
||||
CREATE TABLE test_reference_table (y int primary key, name text);
|
||||
SELECT create_reference_table('test_reference_table');
|
||||
INSERT INTO test_reference_table VALUES (1, '1');
|
||||
|
||||
-- try to disable a node with active placements see that node is removed
|
||||
|
@ -59,6 +57,11 @@ SELECT master_get_active_worker_nodes();
|
|||
-- try to disable a node which does not exist and see that an error is thrown
|
||||
SELECT master_disable_node('localhost.noexist', 2345);
|
||||
|
||||
-- drop the table without leaving a shard placement behind (messes up other tests)
|
||||
SELECT master_activate_node('localhost', :worker_2_port);
|
||||
DROP TABLE test_reference_table;
|
||||
SELECT master_disable_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE USER non_super_user;
|
||||
CREATE USER node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_activate_node(text,int) TO node_metadata_user;
|
||||
|
|
|
@ -506,7 +506,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
|||
|
||||
-- verify we cannot replicate reference tables in a transaction modifying pg_dist_node
|
||||
BEGIN;
|
||||
SELECT 1 FROM master_add_inactive_node('invalid-node-name', 9999);
|
||||
SELECT citus_set_coordinator_host('127.0.0.1');
|
||||
SELECT replicate_reference_tables();
|
||||
ROLLBACK;
|
||||
|
||||
|
|
|
@ -200,6 +200,5 @@ SELECT pg_reload_conf();
|
|||
DROP TABLE test_recovery_ref;
|
||||
DROP TABLE test_recovery;
|
||||
DROP TABLE test_recovery_single;
|
||||
DROP TABLE test_reference_table;
|
||||
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
|
|
|
@ -10,18 +10,64 @@ SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
|
|||
|
||||
-- idempotently add node to allow this test to run without add_coordinator
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
|
||||
|
||||
-- coordinator cannot be disabled
|
||||
SELECT 1 FROM master_disable_node('localhost', :master_port);
|
||||
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||
|
||||
SELECT count(*) FROM pg_dist_node;
|
||||
|
||||
-- there are no workers now, but we should still be able to create Citus tables
|
||||
|
||||
CREATE TABLE ref(x int, y int);
|
||||
SELECT create_reference_table('ref');
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
DROP TABLE ref;
|
||||
|
||||
-- remove the coordinator to try again with create_reference_table
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
||||
|
||||
CREATE TABLE loc(x int, y int);
|
||||
SELECT create_citus_local_table('loc');
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
DROP TABLE loc;
|
||||
|
||||
-- remove the coordinator to try again with create_distributed_table
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node WHERE groupid = 0;
|
||||
|
||||
CREATE TABLE test(x int, y int);
|
||||
SELECT create_distributed_table('test','x');
|
||||
|
||||
SELECT groupid, nodename, nodeport, isactive, shouldhaveshards, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
-- cannot add workers with specific IP as long as I have a placeholder coordinator record
|
||||
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
||||
|
||||
-- adding localhost workers is ok
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
||||
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
|
||||
|
||||
-- set the coordinator host to something different than localhost
|
||||
SELECT 1 FROM citus_set_coordinator_host('127.0.0.1');
|
||||
|
||||
-- adding workers with specific IP is ok now
|
||||
SELECT 1 FROM master_add_node('127.0.0.1', :worker_1_port);
|
||||
SELECT 1 FROM master_remove_node('127.0.0.1', :worker_1_port);
|
||||
|
||||
-- set the coordinator host back to localhost for the remainder of tests
|
||||
SELECT 1 FROM citus_set_coordinator_host('localhost');
|
||||
|
||||
-- should have shards setting should not really matter for a single node
|
||||
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true);
|
||||
|
||||
CREATE TYPE new_type AS (n int, m text);
|
||||
CREATE TABLE test_2(x int, y int, z new_type);
|
||||
SELECT create_distributed_table('test_2','x');
|
||||
|
|
Loading…
Reference in New Issue