From 78e495e030b857943125e2b9d69cf31b373a1c72 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 22 Oct 2019 16:47:16 +0200 Subject: [PATCH] Add shouldhaveshards to pg_dist_node (#2960) This is an improvement over #2512. This adds the boolean shouldhaveshards column to pg_dist_node. When it's false, create_distributed_table for new collocation groups will not create shards on that node. Reference tables will still be created on nodes where it is false. --- src/backend/distributed/citus.control | 2 +- .../distributed/master/master_create_shards.c | 2 +- .../distributed/master/shard_rebalancer.c | 1 + .../distributed/master/worker_node_manager.c | 67 ++-- .../distributed/metadata/metadata_sync.c | 18 + .../distributed/sql/citus--9.0-1--9.1-1.sql | 6 + .../sql/udfs/master_drain_node/9.1-1.sql | 14 + .../sql/udfs/master_drain_node/latest.sql | 14 + .../udfs/master_set_node_property/9.1-1.sql | 21 ++ .../udfs/master_set_node_property/latest.sql | 21 ++ src/backend/distributed/utils/hash_helpers.c | 15 + .../distributed/utils/metadata_cache.c | 29 +- src/backend/distributed/utils/node_metadata.c | 342 ++++++++++++------ src/include/distributed/hash_helpers.h | 15 + src/include/distributed/metadata_sync.h | 1 + src/include/distributed/pg_dist_node.h | 3 +- .../distributed/reference_table_utils.h | 4 + src/include/distributed/worker_manager.h | 8 +- .../isolation_dump_global_wait_edges.out | 18 +- .../expected/isolation_shouldhaveshards.out | 168 +++++++++ .../expected/multi_cluster_management.out | 248 +++++++++++-- .../expected/multi_metadata_attributes.out | 19 +- .../regress/expected/multi_metadata_sync.out | 229 +++++++----- .../expected/multi_mx_create_table.out | 55 ++- .../expected/multi_mx_hide_shard_names.out | 1 - .../multi_unsupported_worker_operations.out | 25 +- src/test/regress/isolation_schedule | 1 + .../specs/isolation_shouldhaveshards.spec | 73 ++++ .../regress/sql/multi_cluster_management.sql | 124 ++++++- .../regress/sql/multi_metadata_attributes.sql | 8 +- src/test/regress/sql/multi_metadata_sync.sql | 166 +++++---- .../regress/sql/multi_mx_create_table.sql | 11 +- .../regress/sql/multi_mx_hide_shard_names.sql | 1 - .../multi_unsupported_worker_operations.sql | 22 +- src/test/regress/upgrade/config.py | 2 +- 35 files changed, 1312 insertions(+), 442 deletions(-) create mode 100644 src/backend/distributed/sql/citus--9.0-1--9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_drain_node/9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_drain_node/latest.sql create mode 100644 src/backend/distributed/sql/udfs/master_set_node_property/9.1-1.sql create mode 100644 src/backend/distributed/sql/udfs/master_set_node_property/latest.sql create mode 100644 src/test/regress/expected/isolation_shouldhaveshards.out create mode 100644 src/test/regress/specs/isolation_shouldhaveshards.spec diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 9aef9deaf..247ef7b1d 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '9.0-1' +default_version = '9.1-1' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 011e65496..42333379d 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -177,7 +177,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, LockRelationOid(DistNodeRelationId(), RowShareLock); /* load and sort the worker node list for deterministic placement */ - workerNodeList = ActivePrimaryNodeList(NoLock); + workerNodeList = ActivePrimaryShouldHaveShardsNodeList(NoLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* diff --git a/src/backend/distributed/master/shard_rebalancer.c b/src/backend/distributed/master/shard_rebalancer.c index a040f150a..74bbbcf9c 100644 --- a/src/backend/distributed/master/shard_rebalancer.c +++ b/src/backend/distributed/master/shard_rebalancer.c @@ -17,3 +17,4 @@ NOT_SUPPORTED_IN_COMMUNITY(rebalance_table_shards); NOT_SUPPORTED_IN_COMMUNITY(replicate_table_shards); NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_table_shards_plan); NOT_SUPPORTED_IN_COMMUNITY(get_rebalance_progress); +NOT_SUPPORTED_IN_COMMUNITY(master_drain_node); diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 030e599b8..c53009785 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -15,9 +15,10 @@ #include "miscadmin.h" #include "commands/dbcommands.h" -#include "distributed/worker_manager.h" +#include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/worker_manager.h" #include "libpq/hba.h" #include "common/ip.h" #include "libpq/libpq-be.h" @@ -319,19 +320,20 @@ ActiveReadableNodeCount(void) /* - * ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash + * ActiveNodeListFilterFunc returns a list of all active nodes that checkFunction + * returns true for. * lockMode specifies which lock to use on pg_dist_node, this is necessary when * the caller wouldn't want nodes to be added concurrent to their use of this list */ -List * -ActivePrimaryNodeList(LOCKMODE lockMode) +static List * +FilterActiveNodeListFunc(LOCKMODE lockMode, bool (*checkFunction)(WorkerNode *)) { List *workerNodeList = NIL; WorkerNode *workerNode = NULL; HTAB *workerNodeHash = NULL; HASH_SEQ_STATUS status; - EnsureModificationsCanRun(); + Assert(checkFunction != NULL); if (lockMode != NoLock) { @@ -343,7 +345,7 @@ ActivePrimaryNodeList(LOCKMODE lockMode) while ((workerNode = hash_seq_search(&status)) != NULL) { - if (workerNode->isActive && WorkerNodeIsPrimary(workerNode)) + if (workerNode->isActive && checkFunction(workerNode)) { WorkerNode *workerNodeCopy = palloc0(sizeof(WorkerNode)); memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); @@ -355,39 +357,38 @@ ActivePrimaryNodeList(LOCKMODE lockMode) } +/* + * ActivePrimaryNodeList returns a list of all the active primary nodes in workerNodeHash + * lockMode specifies which lock to use on pg_dist_node, this is necessary when + * the caller wouldn't want nodes to be added concurrent to their use of this list + */ +List * +ActivePrimaryNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimary); +} + + +/* + * ActivePrimaryShouldHaveShardsNodeList returns a list of all active, primary + * worker nodes that can store new data, i.e shouldstoreshards is 'true' + */ +List * +ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, WorkerNodeIsPrimaryShouldHaveShardsNode); +} + + /* * ActiveReadableNodeList returns a list of all nodes in workerNodeHash we can read from. */ List * ActiveReadableNodeList(void) { - List *workerNodeList = NIL; - WorkerNode *workerNode = NULL; - HTAB *workerNodeHash = GetWorkerNodeHash(); - HASH_SEQ_STATUS status; - - hash_seq_init(&status, workerNodeHash); - - while ((workerNode = hash_seq_search(&status)) != NULL) - { - WorkerNode *workerNodeCopy; - - if (!workerNode->isActive) - { - continue; - } - - if (!WorkerNodeIsReadable(workerNode)) - { - continue; - } - - workerNodeCopy = palloc0(sizeof(WorkerNode)); - memcpy(workerNodeCopy, workerNode, sizeof(WorkerNode)); - workerNodeList = lappend(workerNodeList, workerNodeCopy); - } - - return workerNodeList; + return FilterActiveNodeListFunc(NoLock, WorkerNodeIsReadable); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 683ba0046..0079e183a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -899,6 +899,24 @@ NodeStateUpdateCommand(uint32 nodeId, bool isActive) } +/* + * ShouldHaveShardsUpdateCommand generates a command that can be executed to + * update the shouldhaveshards column of a node in pg_dist_node table. + */ +char * +ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards) +{ + StringInfo nodeStateUpdateCommand = makeStringInfo(); + char *shouldHaveShardsString = shouldHaveShards ? "TRUE" : "FALSE"; + + appendStringInfo(nodeStateUpdateCommand, + "UPDATE pg_catalog.pg_dist_node SET shouldhaveshards = %s " + "WHERE nodeid = %u", shouldHaveShardsString, nodeId); + + return nodeStateUpdateCommand->data; +} + + /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition diff --git a/src/backend/distributed/sql/citus--9.0-1--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-1--9.1-1.sql new file mode 100644 index 000000000..4636e0d4b --- /dev/null +++ b/src/backend/distributed/sql/citus--9.0-1--9.1-1.sql @@ -0,0 +1,6 @@ +ALTER TABLE pg_catalog.pg_dist_node ADD shouldhaveshards bool NOT NULL DEFAULT true; +COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS + 'indicates whether the node is eligible to contain data from distributed tables'; + +#include "udfs/master_set_node_property/9.1-1.sql" +#include "udfs/master_drain_node/9.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/master_drain_node/9.1-1.sql b/src/backend/distributed/sql/udfs/master_drain_node/9.1-1.sql new file mode 100644 index 000000000..44ab5b03d --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_drain_node/9.1-1.sql @@ -0,0 +1,14 @@ +CREATE FUNCTION pg_catalog.master_drain_node( + nodename text, + nodeport integer, + threshold float4 default 0, + max_shard_moves int default 1000000, + excluded_shard_list bigint[] default '{}', + shard_transfer_mode citus.shard_transfer_mode default 'auto') + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_drain_node$$; +COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) + IS 'mark a node to be drained of data and actually drain it as well'; + +REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_drain_node/latest.sql b/src/backend/distributed/sql/udfs/master_drain_node/latest.sql new file mode 100644 index 000000000..44ab5b03d --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_drain_node/latest.sql @@ -0,0 +1,14 @@ +CREATE FUNCTION pg_catalog.master_drain_node( + nodename text, + nodeport integer, + threshold float4 default 0, + max_shard_moves int default 1000000, + excluded_shard_list bigint[] default '{}', + shard_transfer_mode citus.shard_transfer_mode default 'auto') + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_drain_node$$; +COMMENT ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) + IS 'mark a node to be drained of data and actually drain it as well'; + +REVOKE ALL ON FUNCTION pg_catalog.master_drain_node(text,int,float4,int,bigint[],citus.shard_transfer_mode) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_set_node_property/9.1-1.sql b/src/backend/distributed/sql/udfs/master_set_node_property/9.1-1.sql new file mode 100644 index 000000000..c21b9e507 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_set_node_property/9.1-1.sql @@ -0,0 +1,21 @@ +CREATE FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', 'master_set_node_property'; +COMMENT ON FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + IS 'set a property of a node in pg_dist_node'; + +REVOKE ALL ON FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/master_set_node_property/latest.sql b/src/backend/distributed/sql/udfs/master_set_node_property/latest.sql new file mode 100644 index 000000000..c21b9e507 --- /dev/null +++ b/src/backend/distributed/sql/udfs/master_set_node_property/latest.sql @@ -0,0 +1,21 @@ +CREATE FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', 'master_set_node_property'; +COMMENT ON FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + IS 'set a property of a node in pg_dist_node'; + +REVOKE ALL ON FUNCTION pg_catalog.master_set_node_property( + nodename text, + nodeport integer, + property text, + value boolean) + FROM PUBLIC; diff --git a/src/backend/distributed/utils/hash_helpers.c b/src/backend/distributed/utils/hash_helpers.c index 0ed090dca..6bbf14938 100644 --- a/src/backend/distributed/utils/hash_helpers.c +++ b/src/backend/distributed/utils/hash_helpers.c @@ -32,3 +32,18 @@ hash_delete_all(HTAB *htab) Assert(found); } } + + +/* + * foreach_htab_cleanup cleans up the hash iteration state after the iteration + * is done. This is only needed when break statements are present in the + * foreach block. + */ +void +foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status) +{ + if ((var) != NULL) + { + hash_seq_term(status); + } +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 3672f7371..7a75c919a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -2385,14 +2385,14 @@ CurrentUserName(void) /* - * LookupNodeRoleValueId returns the Oid of the "pg_catalog.noderole" type, or InvalidOid - * if it does not exist. + * LookupTypeOid returns the Oid of the "pg_catalog.{typeNameString}" type, or + * InvalidOid if it does not exist. */ static Oid -LookupNodeRoleTypeOid() +LookupTypeOid(char *typeNameString) { Value *schemaName = makeString("pg_catalog"); - Value *typeName = makeString("noderole"); + Value *typeName = makeString(typeNameString); List *qualifiedName = list_make2(schemaName, typeName); TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName); @@ -2417,21 +2417,21 @@ LookupNodeRoleTypeOid() /* - * LookupNodeRoleValueId returns the Oid of the value in "pg_catalog.noderole" which - * matches the provided name, or InvalidOid if the noderole enum doesn't exist yet. + * LookupStringEnumValueId returns the Oid of the value in "pg_catalog.{enumName}" + * which matches the provided valueName, or InvalidOid if the enum doesn't exist yet. */ static Oid -LookupNodeRoleValueId(char *valueName) +LookupStringEnumValueId(char *enumName, char *valueName) { - Oid nodeRoleTypId = LookupNodeRoleTypeOid(); + Oid enumTypeId = LookupTypeOid(enumName); - if (nodeRoleTypId == InvalidOid) + if (enumTypeId == InvalidOid) { return InvalidOid; } else { - Oid valueId = LookupEnumValueId(nodeRoleTypId, valueName); + Oid valueId = LookupEnumValueId(enumTypeId, valueName); return valueId; } } @@ -2458,7 +2458,7 @@ PrimaryNodeRoleId(void) { if (!MetadataCache.primaryNodeRoleId) { - MetadataCache.primaryNodeRoleId = LookupNodeRoleValueId("primary"); + MetadataCache.primaryNodeRoleId = LookupStringEnumValueId("noderole", "primary"); } return MetadataCache.primaryNodeRoleId; @@ -2471,7 +2471,8 @@ SecondaryNodeRoleId(void) { if (!MetadataCache.secondaryNodeRoleId) { - MetadataCache.secondaryNodeRoleId = LookupNodeRoleValueId("secondary"); + MetadataCache.secondaryNodeRoleId = LookupStringEnumValueId("noderole", + "secondary"); } return MetadataCache.secondaryNodeRoleId; @@ -2484,7 +2485,8 @@ UnavailableNodeRoleId(void) { if (!MetadataCache.unavailableNodeRoleId) { - MetadataCache.unavailableNodeRoleId = LookupNodeRoleValueId("unavailable"); + MetadataCache.unavailableNodeRoleId = LookupStringEnumValueId("noderole", + "unavailable"); } return MetadataCache.unavailableNodeRoleId; @@ -3037,6 +3039,7 @@ InitializeWorkerNodeCache(void) workerNode->metadataSynced = currentNode->metadataSynced; workerNode->isActive = currentNode->isActive; workerNode->nodeRole = currentNode->nodeRole; + workerNode->shouldHaveShards = currentNode->shouldHaveShards; strlcpy(workerNode->nodeCluster, currentNode->nodeCluster, NAMEDATALEN); newWorkerNodeArray[workerNodeIndex++] = workerNode; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 33d5502de..3662309ce 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -64,6 +64,7 @@ typedef struct NodeMetadata bool metadataSynced; bool isActive; Oid nodeRole; + bool shouldHaveShards; char *nodeCluster; } NodeMetadata; @@ -72,22 +73,26 @@ static int ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); -static void SetNodeState(char *nodeName, int32 nodePort, bool isActive); -static HeapTuple GetNodeTuple(char *nodeName, int32 nodePort); +static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); +static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); static int32 GetNextGroupId(void); static int GetNextNodeId(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); +static void SetUpDistributedTableDependencies(WorkerNode *workerNode); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); +static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); +static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_add_node); PG_FUNCTION_INFO_V1(master_add_inactive_node); PG_FUNCTION_INFO_V1(master_add_secondary_node); +PG_FUNCTION_INFO_V1(master_set_node_property); PG_FUNCTION_INFO_V1(master_remove_node); PG_FUNCTION_INFO_V1(master_disable_node); PG_FUNCTION_INFO_V1(master_activate_node); @@ -105,6 +110,7 @@ DefaultNodeMetadata() { NodeMetadata nodeMetadata = { .nodeRack = WORKER_DEFAULT_RACK, + .shouldHaveShards = true, }; return nodeMetadata; } @@ -237,13 +243,12 @@ master_add_secondary_node(PG_FUNCTION_ARGS) Datum master_remove_node(PG_FUNCTION_ARGS) { - text *nodeName = PG_GETARG_TEXT_P(0); + text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeNameString = text_to_cstring(nodeName); CheckCitusVersion(ERROR); - RemoveNodeFromCluster(nodeNameString, nodePort); + RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort); PG_RETURN_VOID(); } @@ -264,41 +269,32 @@ master_remove_node(PG_FUNCTION_ARGS) Datum master_disable_node(PG_FUNCTION_ARGS) { - const bool onlyConsiderActivePlacements = true; text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeName = text_to_cstring(nodeNameText); + WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); bool isActive = false; - - WorkerNode *workerNode = NULL; - - CheckCitusVersion(ERROR); - - EnsureCoordinator(); - - /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ - LockRelationOid(DistNodeRelationId(), ExclusiveLock); - - workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); - } + bool onlyConsiderActivePlacements = false; if (WorkerNodeIsPrimary(workerNode)) { + /* + * Delete reference table placements so they are not taken into account + * for the check if there are placements after this + */ DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId); - } - if (WorkerNodeIsPrimary(workerNode) && - NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements)) - { - ereport(NOTICE, (errmsg("Node %s:%d has active shard placements. Some queries " - "may fail after this operation. Use " - "SELECT master_activate_node('%s', %d) to activate this " - "node back.", - nodeName, nodePort, nodeName, nodePort))); + if (NodeGroupHasShardPlacements(workerNode->groupId, + onlyConsiderActivePlacements)) + { + ereport(NOTICE, (errmsg( + "Node %s:%d has active shard placements. Some queries " + "may fail after this operation. Use " + "SELECT master_activate_node('%s', %d) to activate this " + "node back.", + workerNode->workerName, nodePort, workerNode->workerName, + nodePort))); + } } SetNodeState(nodeName, nodePort, isActive); @@ -313,6 +309,95 @@ master_disable_node(PG_FUNCTION_ARGS) } +/* + * master_set_node_property sets a property of the node + */ +Datum +master_set_node_property(PG_FUNCTION_ARGS) +{ + text *nodeNameText = PG_GETARG_TEXT_P(0); + int32 nodePort = PG_GETARG_INT32(1); + text *propertyText = PG_GETARG_TEXT_P(2); + bool value = PG_GETARG_BOOL(3); + + WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText), + nodePort); + + if (strcmp(text_to_cstring(propertyText), "shouldhaveshards") == 0) + { + SetShouldHaveShards(workerNode, value); + } + else + { + ereport(ERROR, (errmsg( + "only the 'shouldhaveshards' property can be set using this function" + ))); + } + + + PG_RETURN_VOID(); +} + + +/* + * SetUpDistributedTableDependencies sets up up the following on a node if it's + * a primary node that currently stores data: + * - All dependencies (e.g., types, schemas) + * - Reference tables, because they are needed to handle queries efficiently. + * - Distributed functions + */ +static void +SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) +{ + if (WorkerNodeIsPrimary(newWorkerNode)) + { + EnsureNoModificationsHaveBeenDone(); + ReplicateAllDependenciesToNode(newWorkerNode->workerName, + newWorkerNode->workerPort); + ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, + newWorkerNode->workerPort); + + /* + * Let the maintanince deamon do the hard work of syncing the metadata. + * We prefer this because otherwise node activation might fail within + * transaction blocks. + */ + if (ClusterHasDistributedFunctionWithDistArgument()) + { + MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort, + true); + TriggerMetadataSync(MyDatabaseId); + } + } +} + + +/* + * ModifiableWorkerNode gets the requested WorkerNode and also gets locks + * required for modifying it. This fails if the node does not exist. + */ +static WorkerNode * +ModifiableWorkerNode(const char *nodeName, int32 nodePort) +{ + WorkerNode *workerNode = NULL; + + CheckCitusVersion(ERROR); + + EnsureCoordinator(); + + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + + workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + if (workerNode == NULL) + { + ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); + } + + return workerNode; +} + + /* * master_activate_node UDF activates the given node. It sets the node's isactive * value to active and replicates all reference tables to that node. @@ -320,19 +405,14 @@ master_disable_node(PG_FUNCTION_ARGS) Datum master_activate_node(PG_FUNCTION_ARGS) { - text *nodeName = PG_GETARG_TEXT_P(0); + text *nodeNameText = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); - char *nodeNameString = text_to_cstring(nodeName); - int nodeId = 0; + WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText), + nodePort); + ActivateNode(workerNode->workerName, workerNode->workerPort); - CheckCitusVersion(ERROR); - - EnsureCoordinator(); - - nodeId = ActivateNode(nodeNameString, nodePort); - - PG_RETURN_INT32(nodeId); + PG_RETURN_INT32(workerNode->nodeId); } @@ -391,6 +471,22 @@ WorkerNodeIsSecondary(WorkerNode *worker) } +/* + * WorkerNodeIsPrimaryShouldHaveShardsNode returns whether the argument represents a + * primary node that is a eligible for new data. + */ +bool +WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker) +{ + if (!WorkerNodeIsPrimary(worker)) + { + return false; + } + + return worker->shouldHaveShards; +} + + /* * WorkerNodeIsReadable returns whether we're allowed to send SELECT queries to this * node. @@ -461,34 +557,16 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) static int ActivateNode(char *nodeName, int nodePort) { - WorkerNode *workerNode = NULL; + WorkerNode *newWorkerNode = NULL; bool isActive = true; /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - SetNodeState(nodeName, nodePort, isActive); + newWorkerNode = SetNodeState(nodeName, nodePort, isActive); - workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - - if (WorkerNodeIsPrimary(workerNode)) - { - EnsureNoModificationsHaveBeenDone(); - ReplicateAllDependenciesToNode(nodeName, nodePort); - ReplicateAllReferenceTablesToNode(nodeName, nodePort); - - /* - * Let the maintanince deamon do the hard work of syncing the metadata. We prefer - * this because otherwise node activation might fail withing transaction blocks. - */ - if (ClusterHasDistributedFunctionWithDistArgument()) - { - MarkNodeHasMetadata(nodeName, nodePort, true); - TriggerMetadataSync(MyDatabaseId); - } - } - - return workerNode->nodeId; + SetUpDistributedTableDependencies(newWorkerNode); + return newWorkerNode->nodeId; } @@ -845,7 +923,7 @@ FindWorkerNode(char *nodeName, int32 nodePort) * clusters do not exist. */ WorkerNode * -FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort) +FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort) { WorkerNode *workerNode = NULL; @@ -924,40 +1002,28 @@ ReadWorkerNodes(bool includeNodesFromOtherClusters) static void RemoveNodeFromCluster(char *nodeName, int32 nodePort) { - const bool onlyConsiderActivePlacements = false; char *nodeDeleteCommand = NULL; - WorkerNode *workerNode = NULL; - uint32 deletedNodeId = INVALID_PLACEMENT_ID; - - EnsureCoordinator(); - - /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ - LockRelationOid(DistNodeRelationId(), ExclusiveLock); - - workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); - } - - if (workerNode != NULL) - { - deletedNodeId = workerNode->nodeId; - } + WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort); if (WorkerNodeIsPrimary(workerNode)) { + bool onlyConsiderActivePlacements = false; + + /* + * Delete reference table placements so they are not taken into account + * for the check if there are placements after this + */ DeleteAllReferenceTablePlacementsFromNodeGroup(workerNode->groupId); + + if (NodeGroupHasShardPlacements(workerNode->groupId, + onlyConsiderActivePlacements)) + { + ereport(ERROR, (errmsg("you cannot remove the primary node of a node group " + "which has shard placements"))); + } } - if (WorkerNodeIsPrimary(workerNode) && - NodeGroupHasShardPlacements(workerNode->groupId, onlyConsiderActivePlacements)) - { - ereport(ERROR, (errmsg("you cannot remove the primary node of a node group " - "which has shard placements"))); - } - - DeleteNodeRow(nodeName, nodePort); + DeleteNodeRow(workerNode->workerName, nodePort); if (WorkerNodeIsPrimary(workerNode)) { @@ -965,10 +1031,10 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) ActivePrimaryNodeCount()); } - nodeDeleteCommand = NodeDeleteCommand(deletedNodeId); + nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); /* make sure we don't have any lingering session lifespan connections */ - CloseNodeConnectionsAfterTransaction(nodeName, nodePort); + CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); } @@ -1088,38 +1154,62 @@ AddNodeMetadata(char *nodeName, int32 nodePort, SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); } - return nextNodeIdInt; + return workerNode->nodeId; } /* - * SetNodeState function sets the isactive column of the specified worker in - * pg_dist_node to isActive. + * SetWorkerColumn function sets the column with the specified index + * (see pg_dist_node.h) on the worker in pg_dist_node. + * It returns the new worker node after the modification. */ -static void -SetNodeState(char *nodeName, int32 nodePort, bool isActive) +static WorkerNode * +SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) { Relation pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); - HeapTuple heapTuple = GetNodeTuple(nodeName, nodePort); + HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort); + WorkerNode *newWorkerNode = NULL; Datum values[Natts_pg_dist_node]; bool isnull[Natts_pg_dist_node]; bool replace[Natts_pg_dist_node]; + char *metadataSyncCommand = NULL; - char *nodeStateUpdateCommand = NULL; - WorkerNode *workerNode = NULL; + + switch (columnIndex) + { + case Anum_pg_dist_node_isactive: + { + metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + case Anum_pg_dist_node_shouldhaveshards: + { + metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + default: + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + workerNode->workerName, workerNode->workerPort))); + } + } if (heapTuple == NULL) { ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", - nodeName, nodePort))); + workerNode->workerName, workerNode->workerPort))); } memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(isActive); - isnull[Anum_pg_dist_node_isactive - 1] = false; - replace[Anum_pg_dist_node_isactive - 1] = true; + values[columnIndex - 1] = value; + isnull[columnIndex - 1] = false; + replace[columnIndex - 1] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -1128,13 +1218,40 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive) CitusInvalidateRelcacheByRelid(DistNodeRelationId()); CommandCounterIncrement(); - workerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); + newWorkerNode = TupleToWorkerNode(tupleDescriptor, heapTuple); heap_close(pgDistNode, NoLock); - /* we also update isactive column at worker nodes */ - nodeStateUpdateCommand = NodeStateUpdateCommand(workerNode->nodeId, isActive); - SendCommandToWorkers(WORKERS_WITH_METADATA, nodeStateUpdateCommand); + /* we also update the column at worker nodes */ + SendCommandToWorkers(WORKERS_WITH_METADATA, metadataSyncCommand); + return newWorkerNode; +} + + +/* + * SetShouldHaveShards function sets the shouldhaveshards column of the + * specified worker in pg_dist_node. + * It returns the new worker node after the modification. + */ +static WorkerNode * +SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards) +{ + return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, + BoolGetDatum(shouldHaveShards)); +} + + +/* + * SetNodeState function sets the isactive column of the specified worker in + * pg_dist_node to isActive. + * It returns the new worker node after the modification. + */ +static WorkerNode * +SetNodeState(char *nodeName, int nodePort, bool isActive) +{ + WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); + return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, + BoolGetDatum(isActive)); } @@ -1145,7 +1262,7 @@ SetNodeState(char *nodeName, int32 nodePort, bool isActive) * This function may return worker nodes from other clusters. */ static HeapTuple -GetNodeTuple(char *nodeName, int32 nodePort) +GetNodeTuple(const char *nodeName, int32 nodePort) { Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); const int scanKeyCount = 2; @@ -1296,6 +1413,8 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta values[Anum_pg_dist_node_isactive - 1] = BoolGetDatum(nodeMetadata->isActive); values[Anum_pg_dist_node_noderole - 1] = ObjectIdGetDatum(nodeMetadata->nodeRole); values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; + values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( + nodeMetadata->shouldHaveShards); pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock); @@ -1562,6 +1681,9 @@ TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple) DatumGetBool(datumArray[Anum_pg_dist_node_metadatasynced - 1]); workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]); workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]); + workerNode->shouldHaveShards = DatumGetBool( + datumArray[Anum_pg_dist_node_shouldhaveshards - + 1]); /* * nodecluster column can be missing. In the case of extension creation/upgrade, diff --git a/src/include/distributed/hash_helpers.h b/src/include/distributed/hash_helpers.h index 024db708b..d39255aba 100644 --- a/src/include/distributed/hash_helpers.h +++ b/src/include/distributed/hash_helpers.h @@ -9,6 +9,8 @@ #ifndef HASH_HELPERS_H #define HASH_HELPERS_H +#include "postgres.h" + #include "utils/hsearch.h" /* pg12 includes this exact implementation of hash_combine */ @@ -33,4 +35,17 @@ hash_combine(uint32 a, uint32 b) extern void hash_delete_all(HTAB *htab); +/* + * foreach_htab - + * a convenience macro which loops through a HTAB + */ + +#define foreach_htab(var, status, htab) \ + hash_seq_init((status), (htab)); \ + for ((var) = hash_seq_search(status); \ + (var) != NULL; \ + (var) = hash_seq_search(status)) + +extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status); + #endif diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e92600e56..9e00df41a 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -43,6 +43,7 @@ extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardDeleteCommandList(ShardInterval *shardInterval); extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeStateUpdateCommand(uint32 nodeId, bool isActive); +extern char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, diff --git a/src/include/distributed/pg_dist_node.h b/src/include/distributed/pg_dist_node.h index e6fd709f0..002df073d 100644 --- a/src/include/distributed/pg_dist_node.h +++ b/src/include/distributed/pg_dist_node.h @@ -20,7 +20,7 @@ * in particular their OUT parameters) must be changed whenever the definition of * pg_dist_node changes. */ -#define Natts_pg_dist_node 10 +#define Natts_pg_dist_node 11 #define Anum_pg_dist_node_nodeid 1 #define Anum_pg_dist_node_groupid 2 #define Anum_pg_dist_node_nodename 3 @@ -31,6 +31,7 @@ #define Anum_pg_dist_node_noderole 8 #define Anum_pg_dist_node_nodecluster 9 #define Anum_pg_dist_node_metadatasynced 10 +#define Anum_pg_dist_node_shouldhaveshards 11 #define GROUPID_SEQUENCE_NAME "pg_dist_groupid_seq" #define NODEID_SEQUENCE_NAME "pg_dist_node_nodeid_seq" diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 480e77a1b..0aeaf1c64 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -12,6 +12,10 @@ #ifndef REFERENCE_TABLE_UTILS_H_ #define REFERENCE_TABLE_UTILS_H_ +#include "postgres.h" + +#include "listutils.h" + extern uint32 CreateReferenceTableColocationId(void); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 51fc67690..26aec0666 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -16,6 +16,7 @@ #include "postgres.h" +#include "storage/lmgr.h" #include "storage/lockdefs.h" #include "nodes/pg_list.h" @@ -38,6 +39,8 @@ /* * In memory representation of pg_dist_node table elements. The elements are hold in * WorkerNodeHash table. + * IMPORTANT: The order of the fields in this definition should match the + * column order of pg_dist_node */ typedef struct WorkerNode { @@ -51,6 +54,7 @@ typedef struct WorkerNode Oid nodeRole; /* the node's role in its group */ char nodeCluster[NAMEDATALEN]; /* the cluster the node is a part of */ bool metadataSynced; /* node has the most recent metadata */ + bool shouldHaveShards; /* if the node should have distributed table shards on it or not */ } WorkerNode; @@ -68,17 +72,19 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, extern WorkerNode * WorkerGetLocalFirstCandidateNode(List *currentNodeList); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); +extern List * ActivePrimaryShouldHaveShardsNodeList(LOCKMODE lockMode); extern uint32 ActiveReadableNodeCount(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * GetWorkerNodeByNodeId(int nodeId); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); -extern WorkerNode * FindWorkerNodeAnyCluster(char *nodeName, int32 nodePort); +extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(bool includeNodesFromOtherClusters); extern void EnsureCoordinator(void); extern uint32 GroupForNode(char *nodeName, int32 nodePorT); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); extern bool WorkerNodeIsPrimary(WorkerNode *worker); extern bool WorkerNodeIsSecondary(WorkerNode *worker); +extern bool WorkerNodeIsPrimaryShouldHaveShardsNode(WorkerNode *worker); extern bool WorkerNodeIsReadable(WorkerNode *worker); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 26d41b18f..16de25852 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -276 275 f +290 289 f transactionnumberwaitingtransactionnumbers -275 -276 275 +289 +290 289 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -280 279 f -281 279 f -281 280 t +294 293 f +295 293 f +295 294 t transactionnumberwaitingtransactionnumbers -279 -280 279 -281 279,280 +293 +294 293 +295 293,294 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_shouldhaveshards.out b/src/test/regress/expected/isolation_shouldhaveshards.out new file mode 100644 index 000000000..faa82b86a --- /dev/null +++ b/src/test/regress/expected/isolation_shouldhaveshards.out @@ -0,0 +1,168 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-add-second-node s1-begin s2-begin s2-create-distributed-table s1-noshards s2-commit s1-commit s2-shardcounts +?column? + +1 +step s1-add-second-node: + SELECT 1 FROM master_add_node('localhost', 57638); + +?column? + +1 +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-create-distributed-table: + CREATE TABLE t1 (a int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +create_distributed_table + + +step s1-noshards: + SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false); + +step s2-commit: + COMMIT; + +step s1-noshards: <... completed> +master_set_node_property + + +step s1-commit: + COMMIT; + +step s2-shardcounts: + SELECT nodeport, count(*) + FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) + WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport; + +nodeport count + +57637 2 +57638 2 +master_remove_node + + + + +starting permutation: s1-add-second-node s1-begin s2-begin s1-noshards s2-create-distributed-table s1-commit s2-commit s2-shardcounts +?column? + +1 +step s1-add-second-node: + SELECT 1 FROM master_add_node('localhost', 57638); + +?column? + +1 +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-noshards: + SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false); + +master_set_node_property + + +step s2-create-distributed-table: + CREATE TABLE t1 (a int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-distributed-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s2-shardcounts: + SELECT nodeport, count(*) + FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) + WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport; + +nodeport count + +57638 4 +master_remove_node + + + + +starting permutation: s1-begin s2-begin s1-noshards s2-update-node s1-commit s2-commit +?column? + +1 +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-noshards: + SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false); + +master_set_node_property + + +step s2-update-node: + select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638) + +step s1-commit: + COMMIT; + +step s2-update-node: <... completed> +master_update_node + + +step s2-commit: + COMMIT; + +master_remove_node + + + +starting permutation: s1-begin s2-begin s2-update-node s1-noshards s2-commit s1-commit +?column? + +1 +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-update-node: + select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638) + +master_update_node + + +step s1-noshards: + SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false); + +step s2-commit: + COMMIT; + +step s1-noshards: <... completed> +error in steps s2-commit s1-noshards: ERROR: node at "localhost:57637" does not exist +step s1-commit: + COMMIT; + +master_remove_node + + diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 47243b81d..2196b00d5 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1,6 +1,7 @@ SET citus.next_shard_id TO 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; -- Tests functions related to cluster membership --- before starting the test, lets try to create reference table and see a +-- before starting the test, lets try to create reference table and see a -- meaningful error CREATE TABLE test_reference_table (y int primary key, name text); SELECT create_reference_table('test_reference_table'); @@ -43,7 +44,7 @@ SELECT master_get_active_worker_nodes(); (2 rows) -- try to remove a node (with no placements) -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); master_remove_node -------------------- @@ -63,7 +64,7 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); 1 (1 row) -SELECT master_disable_node('localhost', :worker_2_port); +SELECT master_disable_node('localhost', :worker_2_port); master_disable_node --------------------- @@ -106,7 +107,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER (8 rows) -- try to remove a node with active placements and see that node removal is failed -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); ERROR: you cannot remove the primary node of a node group which has shard placements SELECT master_get_active_worker_nodes(); master_get_active_worker_nodes @@ -119,7 +120,7 @@ SELECT master_get_active_worker_nodes(); INSERT INTO test_reference_table VALUES (1, '1'); -- try to disable a node with active placements see that node is removed -- observe that a notification is displayed -SELECT master_disable_node('localhost', :worker_2_port); +SELECT master_disable_node('localhost', :worker_2_port); NOTICE: Node localhost:57638 has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57638) to activate this node back. master_disable_node --------------------- @@ -236,7 +237,7 @@ SELECT * FROM master_activate_node('localhost', :worker_2_port); (1 row) -- try to remove a node with active placements and see that node removal is failed -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); ERROR: you cannot remove the primary node of a node group which has shard placements -- mark all placements in the candidate node as inactive SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -255,7 +256,7 @@ SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHER (8 rows) -- try to remove a node with only inactive placements and see that removal still fails -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); ERROR: you cannot remove the primary node of a node group which has shard placements SELECT master_get_active_worker_nodes(); master_get_active_worker_nodes @@ -362,8 +363,8 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep \c - - - :master_port -- check that removing two nodes in the same transaction works -SELECT - master_remove_node('localhost', :worker_1_port), +SELECT + master_remove_node('localhost', :worker_1_port), master_remove_node('localhost', :worker_2_port); master_remove_node | master_remove_node --------------------+-------------------- @@ -386,10 +387,10 @@ SELECT (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+-----------+----------+----------+-------------+----------+----------+-------------+---------------- - 11 | 9 | localhost | 57637 | default | f | t | primary | default | f - 12 | 10 | localhost | 57638 | default | f | t | primary | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------ + 11 | 9 | localhost | 57637 | default | f | t | primary | default | f | t + 12 | 10 | localhost | 57638 | default | f | t | primary | default | f | t (2 rows) -- check that mixed add/remove node commands work fine inside transaction @@ -509,11 +510,11 @@ SELECT col1, col2 FROM temp ORDER BY col1; row2 | 2 (2 rows) -SELECT - count(*) -FROM - pg_dist_shard_placement, pg_dist_shard -WHERE +SELECT + count(*) +FROM + pg_dist_shard_placement, pg_dist_shard +WHERE pg_dist_shard_placement.shardid = pg_dist_shard.shardid AND pg_dist_shard.logicalrelid = 'temp'::regclass AND pg_dist_shard_placement.nodeport = :worker_2_port; @@ -522,7 +523,6 @@ WHERE 4 (1 row) - DROP TABLE temp; \c - - - :worker_1_port DELETE FROM pg_dist_partition; @@ -608,11 +608,11 @@ CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 a INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster) VALUES ('localhost', 5000, 1000, 'primary', 'olap'); ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster" -DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f). +DETAIL: Failing row contains (24, 1000, localhost, 5000, default, f, t, primary, olap, f, t). UPDATE pg_dist_node SET nodecluster = 'olap' WHERE nodeport = :worker_1_port; ERROR: new row for relation "pg_dist_node" violates check constraint "primaries_are_only_allowed_in_the_default_cluster" -DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f). +DETAIL: Failing row contains (16, 14, localhost, 57637, default, f, t, primary, olap, f, t). -- check that you /can/ add a secondary node to a non-default cluster SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset SELECT master_add_node('localhost', 8888, groupid => :worker_1_group, noderole => 'secondary', nodecluster=> 'olap'); @@ -635,9 +635,9 @@ SELECT master_add_node('localhost', 8887, groupid => :worker_1_group, noderole = (1 row) SELECT * FROM pg_dist_node WHERE nodeport=8887; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+---------------- - 26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+-----------+----------+----------+-------------+----------+-----------+-----------------------------------------------------------------+----------------+------------------ + 26 | 14 | localhost | 8887 | default | f | t | secondary | thisisasixtyfourcharacterstringrepeatedfourtimestomake256chars. | f | t (1 row) -- don't remove the secondary and unavailable nodes, check that no commands are sent to @@ -678,9 +678,9 @@ SELECT master_update_node(:worker_1_node, 'somehost', 9000); (1 row) SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+----------+----------+----------+-------------+----------+----------+-------------+---------------- - 16 | 14 | somehost | 9000 | default | f | t | primary | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------ + 16 | 14 | somehost | 9000 | default | f | t | primary | default | f | t (1 row) -- cleanup @@ -691,8 +691,196 @@ SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port); (1 row) SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+-----------+----------+----------+-------------+----------+----------+-------------+---------------- - 16 | 14 | localhost | 57637 | default | f | t | primary | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------ + 16 | 14 | localhost | 57637 | default | f | t | primary | default | f | t (1 row) +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_dist (x int, y int); +SELECT create_distributed_table('test_dist', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +-- testing behaviour when setting shouldhaveshards to false on partially empty node +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); + master_set_node_property +-------------------------- + +(1 row) + +CREATE TABLE test_dist_colocated (x int, y int); +CREATE TABLE test_dist_non_colocated (x int, y int); +CREATE TABLE test_dist_colocated_with_non_colocated (x int, y int); +CREATE TABLE test_ref (a int, b int); +SELECT create_distributed_table('test_dist_colocated', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_dist_colocated_with_non_colocated', 'x', colocate_with => 'test_dist_non_colocated'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('test_ref'); + create_reference_table +------------------------ + +(1 row) + +-- colocated tables should still be placed on shouldhaveshards false nodes for safety +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 2 + 57638 | 2 +(2 rows) + +-- non colocated tables should not be placed on shouldhaveshards false nodes anymore +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 4 +(1 row) + +-- this table should be colocated with the test_dist_non_colocated table +-- correctly only on nodes with shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated_with_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 4 +(1 row) + +-- reference tables should be placed on with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 1 + 57638 | 1 +(2 rows) + +-- cleanup for next test +DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated, test_dist_colocated_with_non_colocated; +-- testing behaviour when setting shouldhaveshards to false on fully empty node +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); + master_set_node_property +-------------------------- + +(1 row) + +CREATE TABLE test_dist (x int, y int); +CREATE TABLE test_dist_colocated (x int, y int); +CREATE TABLE test_dist_non_colocated (x int, y int); +CREATE TABLE test_ref (a int, b int); +SELECT create_distributed_table('test_dist', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('test_ref'); + create_reference_table +------------------------ + +(1 row) + +-- distributed tables should not be placed on nodes with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 4 +(1 row) + +-- reference tables should be placed on nodes with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 1 + 57638 | 1 +(2 rows) + +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); + master_set_node_property +-------------------------- + +(1 row) + +-- distributed tables should still not be placed on nodes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 4 +(1 row) + +-- reference tables should still be placed on all nodes with isdatanode 'true' +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 1 + 57638 | 1 +(2 rows) + +SELECT create_distributed_table('test_dist_colocated', 'x'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +-- colocated tables should not be placed on nodedes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 4 +(1 row) + +-- non colocated tables should be placed on nodedes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + nodeport | count +----------+------- + 57637 | 2 + 57638 | 2 +(2 rows) + +SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false); +ERROR: only the 'shouldhaveshards' property can be set using this function +DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index 1309c510e..abcef2178 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -1,15 +1,14 @@ -- if the output of following query changes, we might need to change --- some heap_getattr() calls to heap_deform_tuple(). +-- some heap_getattr() calls to heap_deform_tuple(). This errors out in +-- postgres versions before 11. If this test fails check out +-- https://github.com/citusdata/citus/pull/2464 for an explanation of what to +-- do. Once you used the new code for the table you can add it to the NOT IN +-- part of the query so new changes to it won't affect this test. SELECT attrelid::regclass, attname, atthasmissing, attmissingval FROM pg_attribute -WHERE atthasmissing +WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass) ORDER BY attrelid, attname; - attrelid | attname | atthasmissing | attmissingval ---------------+----------------+---------------+--------------- - pg_dist_node | hasmetadata | t | {f} - pg_dist_node | isactive | t | {t} - pg_dist_node | metadatasynced | t | {f} - pg_dist_node | nodecluster | t | {default} - pg_dist_node | noderole | t | {primary} -(5 rows) + attrelid | attname | atthasmissing | attmissingval +----------+---------+---------------+--------------- +(0 rows) diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index ae910b488..5bd752c00 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -14,10 +14,8 @@ CREATE FUNCTION master_metadata_snapshot() RETURNS text[] LANGUAGE C STRICT AS 'citus'; - COMMENT ON FUNCTION master_metadata_snapshot() IS 'commands to create the metadata snapshot'; - -- Show that none of the existing tables are qualified to be MX tables SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; logicalrelid | partmethod | partkey | colocationid | repmodel @@ -48,7 +46,7 @@ SELECT master_create_worker_shards('mx_test_table', 8, 1); (1 row) --- Set the replication model of the test table to streaming replication so that it is +-- Set the replication model of the test table to streaming replication so that it is -- considered as an MX table UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; -- Show that the created MX table is included in the metadata snapshot @@ -79,7 +77,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1; ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres ALTER TABLE public.mx_test_table OWNER TO postgres - CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default + CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default') INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') @@ -103,7 +101,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1; ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres - CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default + CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default') INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') @@ -131,7 +129,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1; ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres - CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default + CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default') INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') @@ -152,7 +150,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1; ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres - CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default + CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default') INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') @@ -233,12 +231,12 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+---------------- - 1 | 1 | localhost | 57637 | default | t | t | primary | default | f - 2 | 2 | localhost | 57638 | default | f | t | primary | default | f - 4 | 1 | localhost | 8888 | default | f | t | secondary | default | f - 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------+------------------ + 1 | 1 | localhost | 57637 | default | t | t | primary | default | f | t + 2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t + 4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t + 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -316,7 +314,7 @@ SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); -CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, +CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); create_distributed_table @@ -330,7 +328,6 @@ SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); (1 row) - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- @@ -340,9 +337,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- Check that foreign key metadata exists on the worker \c - - - :worker_1_port SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_testing_schema_2.fk_test_2'::regclass; - Constraint | Definition ----------------------+----------------------------------------------------------------------------- - fk_test_2_col1_fkey | FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) + Constraint | Definition +--------------------------+----------------------------------------------------------------------------- + fk_test_2_col1_col2_fkey | FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3) (1 row) \c - - - :master_port @@ -372,12 +369,12 @@ SELECT * FROM pg_dist_local_group; (1 row) SELECT * FROM pg_dist_node ORDER BY nodeid; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced ---------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+---------------- - 1 | 1 | localhost | 57637 | default | t | t | primary | default | t - 2 | 2 | localhost | 57638 | default | f | t | primary | default | f - 4 | 1 | localhost | 8888 | default | f | t | secondary | default | f - 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards +--------+---------+-----------+----------+----------+-------------+----------+-----------+----------------+----------------+------------------ + 1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t + 2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t + 4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t + 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t (4 rows) SELECT * FROM pg_dist_partition ORDER BY logicalrelid; @@ -507,7 +504,7 @@ SELECT * FROM mx_query_test ORDER BY a; \c - - - :master_port DROP TABLE mx_query_test; --- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false +-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -606,14 +603,14 @@ SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); (1 row) -- Check that created tables are marked as streaming replicated tables -SELECT - logicalrelid, repmodel -FROM - pg_dist_partition -WHERE +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid; logicalrelid | repmodel -----------------------------+---------- @@ -621,15 +618,15 @@ ORDER BY mx_test_schema_2.mx_table_2 | s (2 rows) --- See the shards and placements of the mx tables -SELECT +-- See the shards and placements of the mx tables +SELECT logicalrelid, shardid, nodename, nodeport -FROM +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport -----------------------------+---------+-----------+---------- @@ -645,7 +642,6 @@ ORDER BY mx_test_schema_2.mx_table_2 | 1310029 | localhost | 57637 (10 rows) - -- Check that metadata of MX tables exist on the metadata worker \c - - - :worker_1_port -- Check that tables are created @@ -658,11 +654,11 @@ ORDER BY (2 rows) -- Check that table metadata are created -SELECT - logicalrelid, repmodel -FROM - pg_dist_partition -WHERE +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; logicalrelid | repmodel @@ -672,14 +668,14 @@ WHERE (2 rows) -- Check that shard and placement data are created -SELECT +SELECT logicalrelid, shardid, nodename, nodeport -FROM +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid, shardid; logicalrelid | shardid | nodename | nodeport -----------------------------+---------+-----------+---------- @@ -750,11 +746,11 @@ LINE 2: relid = 'mx_test_schema_2.mx_index_3'::regclass; SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; -ALTER TABLE - mx_test_schema_1.mx_table_1 -ADD CONSTRAINT - mx_fk_constraint -FOREIGN KEY +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1); @@ -776,12 +772,12 @@ SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1 -- Check that foreign key constraint with NOT VALID works as well \c - - - :master_port SET citus.multi_shard_commit_protocol TO '2pc'; -ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; -ALTER TABLE - mx_test_schema_1.mx_table_1 -ADD CONSTRAINT +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT mx_fk_constraint_2 -FOREIGN KEY +FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) @@ -793,7 +789,7 @@ SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1 mx_fk_constraint_2 | FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) (1 row) --- Check that mark_tables_colocated call propagates the changes to the workers +-- Check that mark_tables_colocated call propagates the changes to the workers \c - - - :master_port SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; @@ -815,10 +811,10 @@ SELECT create_distributed_table('mx_colocation_test_2', 'a'); (1 row) -- Check the colocation IDs of the created tables -SELECT +SELECT logicalrelid, colocationid -FROM - pg_dist_partition +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass @@ -829,22 +825,21 @@ ORDER BY logicalrelid; mx_colocation_test_2 | 10000 (2 rows) - -- Reset the colocation IDs of the test tables -DELETE FROM +DELETE FROM pg_dist_colocation WHERE EXISTS ( - SELECT 1 - FROM pg_dist_partition - WHERE - colocationid = pg_dist_partition.colocationid + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); -UPDATE - pg_dist_partition -SET +UPDATE + pg_dist_partition +SET colocationid = 0 -WHERE - logicalrelid = 'mx_colocation_test_1'::regclass +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; -- Mark tables colocated and see the changes on the master and the worker SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); @@ -853,10 +848,10 @@ SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2 (1 row) -SELECT - logicalrelid, colocationid -FROM - pg_dist_partition +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; @@ -867,10 +862,10 @@ WHERE (2 rows) \c - - - :worker_1_port -SELECT - logicalrelid, colocationid -FROM - pg_dist_partition +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; @@ -880,7 +875,7 @@ WHERE mx_colocation_test_2 | 10001 (2 rows) -\c - - - :master_port +\c - - - :master_port -- Check that DROP TABLE on MX tables works DROP TABLE mx_colocation_test_1; DROP TABLE mx_colocation_test_2; @@ -889,9 +884,8 @@ DROP TABLE mx_colocation_test_2; \c - - - :worker_1_port \d mx_colocation_test_1 \d mx_colocation_test_2 - -- Check that dropped MX table can be recreated again -\c - - - :master_port +\c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; @@ -924,7 +918,7 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te DROP TABLE mx_temp_drop_test; -- Check that MX tables can be created with SERIAL columns -\c - - - :master_port +\c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; @@ -1295,7 +1289,7 @@ SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0; SELECT logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport FROM - pg_dist_partition + pg_dist_partition NATURAL JOIN pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE @@ -1308,7 +1302,6 @@ ORDER BY mx_ref | n | t | 1310072 | 100073 | localhost | 57638 (2 rows) - SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset -- Check that DDL commands are propagated to reference tables on workers \c - - - :master_port @@ -1345,7 +1338,6 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE col_1 | integer | col_1 (1 row) - -- Check that metada is cleaned successfully upon drop table \c - - - :master_port DROP TABLE mx_ref; @@ -1391,7 +1383,7 @@ SELECT create_reference_table('mx_ref'); (1 row) -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass; shardid | nodename | nodeport @@ -1400,7 +1392,7 @@ WHERE logicalrelid='mx_ref'::regclass; (1 row) \c - - - :worker_1_port -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass; shardid | nodename | nodeport @@ -1416,7 +1408,7 @@ NOTICE: Replicating reference table "mx_ref" to the node localhost:57638 7 (1 row) -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass ORDER BY shardid, nodeport; @@ -1427,7 +1419,7 @@ ORDER BY shardid, nodeport; (2 rows) \c - - - :worker_1_port -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass ORDER BY shardid, nodeport; @@ -1448,6 +1440,63 @@ UPDATE pg_dist_placement UPDATE pg_dist_placement SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port) WHERE groupid = :old_worker_2_group; +-- Confirm that shouldhaveshards is 'true' +\c - - - :master_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + t +(1 row) + +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + t +(1 row) + +-- Check that setting shouldhaveshards to false is correctly transferred to other mx nodes +\c - - - :master_port +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false); + master_set_node_property +-------------------------- + +(1 row) + +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + f +(1 row) + +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + f +(1 row) + +-- Check that setting shouldhaveshards to true is correctly transferred to other mx nodes +\c - postgres - :master_port +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true); + master_set_node_property +-------------------------- + +(1 row) + +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + t +(1 row) + +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + shouldhaveshards +------------------ + t +(1 row) + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; diff --git a/src/test/regress/expected/multi_mx_create_table.out b/src/test/regress/expected/multi_mx_create_table.out index 67e474ab9..bf9d5a0ce 100644 --- a/src/test/regress/expected/multi_mx_create_table.out +++ b/src/test/regress/expected/multi_mx_create_table.out @@ -2,7 +2,6 @@ -- MULTI_MX_CREATE_TABLE -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- @@ -233,7 +232,7 @@ SELECT create_distributed_table('citus_mx_test_schema.nation_hash_composite_type -- insert some data to verify composite type queries \COPY citus_mx_test_schema.nation_hash_composite_types FROM STDIN with delimiter '|'; --- now create tpch tables +-- now create tpch tables -- Create new table definitions for use in testing in distributed planning and -- execution functionality. Also create indexes to boost performance. SET search_path TO public; @@ -444,7 +443,7 @@ SELECT create_distributed_table('articles_single_shard_hash_mx', 'author_id'); (1 row) SET citus.shard_count TO 4; -CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); +CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); SELECT create_distributed_table('company_employees_mx', 'company_id'); create_distributed_table -------------------------- @@ -454,33 +453,33 @@ SELECT create_distributed_table('company_employees_mx', 'company_id'); WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid ) -SELECT logicalrelid, colocationid, shard_count, partmethod, repmodel -FROM pg_dist_partition NATURAL JOIN shard_counts +SELECT logicalrelid, colocationid, shard_count, partmethod, repmodel +FROM pg_dist_partition NATURAL JOIN shard_counts ORDER BY colocationid, logicalrelid; logicalrelid | colocationid | shard_count | partmethod | repmodel --------------------------------------------------------+--------------+-------------+------------+---------- - nation_hash | 1390000 | 16 | h | s - citus_mx_test_schema.nation_hash | 1390000 | 16 | h | s - citus_mx_test_schema_join_1.nation_hash | 1390001 | 4 | h | s - citus_mx_test_schema_join_1.nation_hash_2 | 1390001 | 4 | h | s - citus_mx_test_schema_join_2.nation_hash | 1390001 | 4 | h | s - citus_mx_test_schema.nation_hash_collation_search_path | 1390001 | 4 | h | s - citus_mx_test_schema.nation_hash_composite_types | 1390001 | 4 | h | s - mx_ddl_table | 1390001 | 4 | h | s - app_analytics_events_mx | 1390001 | 4 | h | s - company_employees_mx | 1390001 | 4 | h | s - lineitem_mx | 1390002 | 16 | h | s - orders_mx | 1390002 | 16 | h | s - customer_mx | 1390003 | 1 | n | t - nation_mx | 1390003 | 1 | n | t - part_mx | 1390003 | 1 | n | t - supplier_mx | 1390003 | 1 | n | t - limit_orders_mx | 1390004 | 2 | h | s - articles_hash_mx | 1390004 | 2 | h | s - multiple_hash_mx | 1390005 | 2 | h | s - researchers_mx | 1390006 | 2 | h | s - labs_mx | 1390007 | 1 | h | s - objects_mx | 1390007 | 1 | h | s - articles_single_shard_hash_mx | 1390007 | 1 | h | s + citus_mx_test_schema_join_1.nation_hash | 1390002 | 4 | h | s + citus_mx_test_schema_join_1.nation_hash_2 | 1390002 | 4 | h | s + citus_mx_test_schema_join_2.nation_hash | 1390002 | 4 | h | s + citus_mx_test_schema.nation_hash_collation_search_path | 1390002 | 4 | h | s + citus_mx_test_schema.nation_hash_composite_types | 1390002 | 4 | h | s + mx_ddl_table | 1390002 | 4 | h | s + app_analytics_events_mx | 1390002 | 4 | h | s + company_employees_mx | 1390002 | 4 | h | s + customer_mx | 1390004 | 1 | n | t + nation_mx | 1390004 | 1 | n | t + part_mx | 1390004 | 1 | n | t + supplier_mx | 1390004 | 1 | n | t + nation_hash | 1390006 | 16 | h | s + citus_mx_test_schema.nation_hash | 1390006 | 16 | h | s + lineitem_mx | 1390007 | 16 | h | s + orders_mx | 1390007 | 16 | h | s + limit_orders_mx | 1390008 | 2 | h | s + articles_hash_mx | 1390008 | 2 | h | s + multiple_hash_mx | 1390009 | 2 | h | s + researchers_mx | 1390010 | 2 | h | s + labs_mx | 1390011 | 1 | h | s + objects_mx | 1390011 | 1 | h | s + articles_single_shard_hash_mx | 1390011 | 1 | h | s (23 rows) diff --git a/src/test/regress/expected/multi_mx_hide_shard_names.out b/src/test/regress/expected/multi_mx_hide_shard_names.out index 9a803565f..409d97fff 100644 --- a/src/test/regress/expected/multi_mx_hide_shard_names.out +++ b/src/test/regress/expected/multi_mx_hide_shard_names.out @@ -59,7 +59,6 @@ SELECT * FROM citus_shard_indexes_on_worker; -- now show that we see the shards, but not the -- indexes as there are no indexes \c - - - :worker_1_port -SET citus.next_shard_id TO 1330000; SET search_path TO 'mx_hide_shard_names'; SELECT * FROM citus_shards_on_worker ORDER BY 2; Schema | Name | Type | Owner diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index a1ff3db7b..67ee3ad47 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -5,7 +5,7 @@ SET citus.next_shard_id TO 1270000; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; --- Set the colocation id to a safe value so that +-- Set the colocation id to a safe value so that -- it is not affected by future changes to colocation id sequence SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; @@ -36,8 +36,8 @@ SELECT create_reference_table('mx_ref_table'); (1 row) -- Check that the created tables are colocated MX tables -SELECT logicalrelid, repmodel, colocationid -FROM pg_dist_partition +SELECT logicalrelid, repmodel, colocationid +FROM pg_dist_partition WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) ORDER BY logicalrelid; logicalrelid | repmodel | colocationid @@ -89,7 +89,7 @@ SELECT count(*) FROM pg_dist_partition WHERE logicalrelid='mx_table_worker'::reg DROP TABLE mx_table_worker; -- master_create_worker_shards -CREATE TEMP TABLE pg_dist_shard_temp AS +CREATE TEMP TABLE pg_dist_shard_temp AS SELECT * FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; DELETE FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; SELECT master_create_worker_shards('mx_table', 5, 1); @@ -112,6 +112,13 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; DROP TABLE mx_ref_table; CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); \c - - - :worker_1_port +-- changing isdatanode +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true); +ERROR: operation is not allowed on this node +HINT: Connect to the coordinator and run it again. -- DDL commands SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; Column | Type | Modifiers @@ -208,10 +215,10 @@ SELECT colocationid FROM pg_dist_partition WHERE logicalrelid='mx_table_2'::regc (1 row) SELECT colocationid AS old_colocation_id -FROM pg_dist_partition +FROM pg_dist_partition WHERE logicalrelid='mx_table'::regclass \gset -UPDATE pg_dist_partition -SET colocationid = :old_colocation_id +UPDATE pg_dist_partition +SET colocationid = :old_colocation_id WHERE logicalrelid='mx_table_2'::regclass; -- start_metadata_sync_to_node SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -290,7 +297,7 @@ SELECT count(*) FROM mx_table; (1 row) -- master_copy_shard_placement -SELECT logicalrelid, shardid AS testshardid, nodename, nodeport +SELECT logicalrelid, shardid AS testshardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port ORDER BY shardid @@ -301,7 +308,7 @@ VALUES (:worker_2_group, :testshardid, 3, 0); SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. -SELECT shardid, nodename, nodeport, shardstate +SELECT shardid, nodename, nodeport, shardstate FROM pg_dist_shard_placement WHERE shardid = :testshardid ORDER BY nodeport; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index b5ac59f50..a058a6ef4 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -5,6 +5,7 @@ test: isolation_add_node_vs_reference_table_operations test: isolation_create_table_vs_add_remove_node test: isolation_master_update_node test: isolation_ensure_dependency_activate_node +test: isolation_shouldhaveshards # tests that change node metadata should precede # isolation_cluster_management such that tests diff --git a/src/test/regress/specs/isolation_shouldhaveshards.spec b/src/test/regress/specs/isolation_shouldhaveshards.spec new file mode 100644 index 000000000..fd75fa422 --- /dev/null +++ b/src/test/regress/specs/isolation_shouldhaveshards.spec @@ -0,0 +1,73 @@ +# the test expects to have zero nodes in pg_dist_node at the beginning +# add single one of the nodes for the purpose of the test +setup +{ + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; + SELECT 1 FROM master_add_node('localhost', 57637); +} + +teardown +{ + DROP TABLE IF EXISTS t1 CASCADE; + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; +} + +session "s1" + +step "s1-add-second-node" { + SELECT 1 FROM master_add_node('localhost', 57638); +} + +step "s1-begin" +{ + BEGIN; +} + +step "s1-noshards" +{ + SELECT * from master_set_node_property('localhost', 57637, 'shouldhaveshards', false); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-create-distributed-table" +{ + CREATE TABLE t1 (a int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); +} + +step "s2-update-node" +{ + select * from master_update_node((select nodeid from pg_dist_node where nodeport = 57637), 'localhost', 57638) +} + + +step "s2-commit" +{ + COMMIT; +} + +step "s2-shardcounts" +{ + SELECT nodeport, count(*) + FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) + WHERE logicalrelid = 't1'::regclass GROUP BY nodeport ORDER BY nodeport; +} + + +permutation "s1-add-second-node" "s1-begin" "s2-begin" "s2-create-distributed-table" "s1-noshards" "s2-commit" "s1-commit" "s2-shardcounts" +permutation "s1-add-second-node" "s1-begin" "s2-begin" "s1-noshards" "s2-create-distributed-table" "s1-commit" "s2-commit" "s2-shardcounts" +permutation "s1-begin" "s2-begin" "s1-noshards" "s2-update-node" "s1-commit" "s2-commit" +permutation "s1-begin" "s2-begin" "s2-update-node" "s1-noshards" "s2-commit" "s1-commit" diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 6bb0181b8..577d195ee 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -1,8 +1,9 @@ SET citus.next_shard_id TO 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; -- Tests functions related to cluster membership --- before starting the test, lets try to create reference table and see a +-- before starting the test, lets try to create reference table and see a -- meaningful error CREATE TABLE test_reference_table (y int primary key, name text); SELECT create_reference_table('test_reference_table'); @@ -21,14 +22,14 @@ SELECT * FROM master_add_node('localhost', :worker_1_port); SELECT master_get_active_worker_nodes(); -- try to remove a node (with no placements) -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); -- verify that the node has been deleted SELECT master_get_active_worker_nodes(); -- try to disable a node with no placements see that node is removed SELECT 1 FROM master_add_node('localhost', :worker_2_port); -SELECT master_disable_node('localhost', :worker_2_port); +SELECT master_disable_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); -- add some shard placements to the cluster @@ -43,7 +44,7 @@ SELECT create_distributed_table('cluster_management_test', 'col_1', 'hash'); SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with active placements and see that node removal is failed -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); -- insert a row so that master_disable_node() exercises closing connections @@ -51,7 +52,7 @@ INSERT INTO test_reference_table VALUES (1, '1'); -- try to disable a node with active placements see that node is removed -- observe that a notification is displayed -SELECT master_disable_node('localhost', :worker_2_port); +SELECT master_disable_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); -- try to disable a node which does not exist and see that an error is thrown @@ -98,7 +99,7 @@ SELECT master_get_active_worker_nodes(); SELECT * FROM master_activate_node('localhost', :worker_2_port); -- try to remove a node with active placements and see that node removal is failed -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); -- mark all placements in the candidate node as inactive SELECT groupid AS worker_2_group FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -106,7 +107,7 @@ UPDATE pg_dist_placement SET shardstate=3 WHERE groupid=:worker_2_group; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with only inactive placements and see that removal still fails -SELECT master_remove_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_2_port); SELECT master_get_active_worker_nodes(); -- clean-up @@ -154,8 +155,8 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep \c - - - :master_port -- check that removing two nodes in the same transaction works -SELECT - master_remove_node('localhost', :worker_1_port), +SELECT + master_remove_node('localhost', :worker_1_port), master_remove_node('localhost', :worker_2_port); SELECT count(1) FROM pg_dist_node; @@ -205,15 +206,15 @@ COMMIT; SELECT col1, col2 FROM temp ORDER BY col1; -SELECT - count(*) -FROM - pg_dist_shard_placement, pg_dist_shard -WHERE +SELECT + count(*) +FROM + pg_dist_shard_placement, pg_dist_shard +WHERE pg_dist_shard_placement.shardid = pg_dist_shard.shardid AND pg_dist_shard.logicalrelid = 'temp'::regclass AND pg_dist_shard_placement.nodeport = :worker_2_port; - + DROP TABLE temp; \c - - - :worker_1_port @@ -295,3 +296,96 @@ SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; -- cleanup SELECT master_update_node(:worker_1_node, 'localhost', :worker_1_port); SELECT * FROM pg_dist_node WHERE nodeid = :worker_1_node; + + +SET citus.shard_replication_factor TO 1; + +CREATE TABLE test_dist (x int, y int); +SELECT create_distributed_table('test_dist', 'x'); + +-- testing behaviour when setting shouldhaveshards to false on partially empty node +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); +CREATE TABLE test_dist_colocated (x int, y int); +CREATE TABLE test_dist_non_colocated (x int, y int); +CREATE TABLE test_dist_colocated_with_non_colocated (x int, y int); +CREATE TABLE test_ref (a int, b int); +SELECT create_distributed_table('test_dist_colocated', 'x'); +SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none'); +SELECT create_distributed_table('test_dist_colocated_with_non_colocated', 'x', colocate_with => 'test_dist_non_colocated'); +SELECT create_reference_table('test_ref'); + +-- colocated tables should still be placed on shouldhaveshards false nodes for safety +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- non colocated tables should not be placed on shouldhaveshards false nodes anymore +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- this table should be colocated with the test_dist_non_colocated table +-- correctly only on nodes with shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated_with_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- reference tables should be placed on with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- cleanup for next test +DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated, test_dist_colocated_with_non_colocated; + +-- testing behaviour when setting shouldhaveshards to false on fully empty node +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); +CREATE TABLE test_dist (x int, y int); +CREATE TABLE test_dist_colocated (x int, y int); +CREATE TABLE test_dist_non_colocated (x int, y int); +CREATE TABLE test_ref (a int, b int); +SELECT create_distributed_table('test_dist', 'x'); +SELECT create_reference_table('test_ref'); + +-- distributed tables should not be placed on nodes with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- reference tables should be placed on nodes with shouldhaveshards false +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + +SELECT * from master_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); + +-- distributed tables should still not be placed on nodes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist'::regclass GROUP BY nodeport ORDER BY nodeport; + +-- reference tables should still be placed on all nodes with isdatanode 'true' +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_ref'::regclass GROUP BY nodeport ORDER BY nodeport; + +SELECT create_distributed_table('test_dist_colocated', 'x'); +SELECT create_distributed_table('test_dist_non_colocated', 'x', colocate_with => 'none'); + +-- colocated tables should not be placed on nodedes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + + +-- non colocated tables should be placed on nodedes that were switched to +-- shouldhaveshards true +SELECT nodeport, count(*) +FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid) +WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER BY nodeport; + +SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false); + +DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 7f7c12cad..1845a4f53 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -1,7 +1,11 @@ -- if the output of following query changes, we might need to change --- some heap_getattr() calls to heap_deform_tuple(). +-- some heap_getattr() calls to heap_deform_tuple(). This errors out in +-- postgres versions before 11. If this test fails check out +-- https://github.com/citusdata/citus/pull/2464 for an explanation of what to +-- do. Once you used the new code for the table you can add it to the NOT IN +-- part of the query so new changes to it won't affect this test. SELECT attrelid::regclass, attname, atthasmissing, attmissingval FROM pg_attribute -WHERE atthasmissing +WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass) ORDER BY attrelid, attname; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 490f548cb..53d10c11c 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -20,10 +20,10 @@ CREATE FUNCTION master_metadata_snapshot() RETURNS text[] LANGUAGE C STRICT AS 'citus'; - + COMMENT ON FUNCTION master_metadata_snapshot() IS 'commands to create the metadata snapshot'; - + -- Show that none of the existing tables are qualified to be MX tables SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; @@ -36,7 +36,7 @@ CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERI SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash'); SELECT master_create_worker_shards('mx_test_table', 8, 1); --- Set the replication model of the test table to streaming replication so that it is +-- Set the replication model of the test table to streaming replication so that it is -- considered as an MX table UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; @@ -112,12 +112,12 @@ SET citus.replication_model TO 'streaming'; CREATE SCHEMA mx_testing_schema_2; CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3)); -CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, +CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text, FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3)); SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1'); SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1'); - + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- Check that foreign key metadata exists on the worker @@ -184,7 +184,7 @@ SELECT * FROM mx_query_test ORDER BY a; \c - - - :master_port DROP TABLE mx_query_test; --- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false +-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false \c - - - :master_port SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port; @@ -224,27 +224,27 @@ SELECT create_distributed_table('mx_test_schema_1.mx_table_1', 'col1'); SELECT create_distributed_table('mx_test_schema_2.mx_table_2', 'col1'); -- Check that created tables are marked as streaming replicated tables -SELECT - logicalrelid, repmodel -FROM - pg_dist_partition -WHERE +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid; --- See the shards and placements of the mx tables -SELECT +-- See the shards and placements of the mx tables +SELECT logicalrelid, shardid, nodename, nodeport -FROM +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid, shardid; - + -- Check that metadata of MX tables exist on the metadata worker \c - - - :worker_1_port @@ -252,23 +252,23 @@ ORDER BY \dt mx_test_schema_?.mx_table_? -- Check that table metadata are created -SELECT - logicalrelid, repmodel -FROM - pg_dist_partition -WHERE +SELECT + logicalrelid, repmodel +FROM + pg_dist_partition +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass; -- Check that shard and placement data are created -SELECT +SELECT logicalrelid, shardid, nodename, nodeport -FROM +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement -WHERE +WHERE logicalrelid = 'mx_test_schema_1.mx_table_1'::regclass OR logicalrelid = 'mx_test_schema_2.mx_table_2'::regclass -ORDER BY +ORDER BY logicalrelid, shardid; -- Check that metadata of MX tables don't exist on the non-metadata worker @@ -306,11 +306,11 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE SET citus.multi_shard_commit_protocol TO '2pc'; ALTER TABLE mx_test_schema_1.mx_table_1 ADD COLUMN col3 NUMERIC; ALTER TABLE mx_test_schema_1.mx_table_1 ALTER COLUMN col3 SET DATA TYPE INT; -ALTER TABLE - mx_test_schema_1.mx_table_1 -ADD CONSTRAINT - mx_fk_constraint -FOREIGN KEY +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT + mx_fk_constraint +FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1); @@ -321,12 +321,12 @@ SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1 -- Check that foreign key constraint with NOT VALID works as well \c - - - :master_port SET citus.multi_shard_commit_protocol TO '2pc'; -ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; -ALTER TABLE - mx_test_schema_1.mx_table_1 -ADD CONSTRAINT +ALTER TABLE mx_test_schema_1.mx_table_1 DROP CONSTRAINT mx_fk_constraint; +ALTER TABLE + mx_test_schema_1.mx_table_1 +ADD CONSTRAINT mx_fk_constraint_2 -FOREIGN KEY +FOREIGN KEY (col1) REFERENCES mx_test_schema_2.mx_table_2(col1) @@ -334,7 +334,7 @@ NOT VALID; \c - - - :worker_1_port SELECT "Constraint", "Definition" FROM table_fkeys WHERE relid='mx_test_schema_1.mx_table_1'::regclass; --- Check that mark_tables_colocated call propagates the changes to the workers +-- Check that mark_tables_colocated call propagates the changes to the workers \c - - - :master_port SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000; @@ -349,51 +349,51 @@ CREATE TABLE mx_colocation_test_2 (a int); SELECT create_distributed_table('mx_colocation_test_2', 'a'); -- Check the colocation IDs of the created tables -SELECT +SELECT logicalrelid, colocationid -FROM - pg_dist_partition +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass ORDER BY logicalrelid; - + -- Reset the colocation IDs of the test tables -DELETE FROM +DELETE FROM pg_dist_colocation WHERE EXISTS ( - SELECT 1 - FROM pg_dist_partition - WHERE - colocationid = pg_dist_partition.colocationid + SELECT 1 + FROM pg_dist_partition + WHERE + colocationid = pg_dist_partition.colocationid AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass); -UPDATE - pg_dist_partition -SET +UPDATE + pg_dist_partition +SET colocationid = 0 -WHERE - logicalrelid = 'mx_colocation_test_1'::regclass +WHERE + logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; -- Mark tables colocated and see the changes on the master and the worker SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']); -SELECT - logicalrelid, colocationid -FROM - pg_dist_partition +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; \c - - - :worker_1_port -SELECT - logicalrelid, colocationid -FROM - pg_dist_partition +SELECT + logicalrelid, colocationid +FROM + pg_dist_partition WHERE logicalrelid = 'mx_colocation_test_1'::regclass OR logicalrelid = 'mx_colocation_test_2'::regclass; -\c - - - :master_port +\c - - - :master_port -- Check that DROP TABLE on MX tables works DROP TABLE mx_colocation_test_1; @@ -404,9 +404,9 @@ DROP TABLE mx_colocation_test_2; \c - - - :worker_1_port \d mx_colocation_test_1 \d mx_colocation_test_2 - + -- Check that dropped MX table can be recreated again -\c - - - :master_port +\c - - - :master_port SET citus.shard_count TO 7; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; @@ -424,7 +424,7 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te DROP TABLE mx_temp_drop_test; -- Check that MX tables can be created with SERIAL columns -\c - - - :master_port +\c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; @@ -593,14 +593,14 @@ SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0; SELECT logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport FROM - pg_dist_partition + pg_dist_partition NATURAL JOIN pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'mx_ref'::regclass ORDER BY nodeport; - + SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset -- Check that DDL commands are propagated to reference tables on workers @@ -616,7 +616,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_ref'::regclass; SELECT "Column", "Type", "Definition" FROM index_attrs WHERE relid = 'mx_ref_index'::regclass; - + -- Check that metada is cleaned successfully upon drop table \c - - - :master_port DROP TABLE mx_ref; @@ -641,25 +641,25 @@ SELECT master_remove_node('localhost', :worker_2_port); CREATE TABLE mx_ref (col_1 int, col_2 text); SELECT create_reference_table('mx_ref'); -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass; \c - - - :worker_1_port -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass; \c - - - :master_port SELECT master_add_node('localhost', :worker_2_port); -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass ORDER BY shardid, nodeport; \c - - - :worker_1_port -SELECT shardid, nodename, nodeport +SELECT shardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_ref'::regclass ORDER BY shardid, nodeport; @@ -678,6 +678,30 @@ UPDATE pg_dist_placement SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port) WHERE groupid = :old_worker_2_group; +-- Confirm that shouldhaveshards is 'true' +\c - - - :master_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + + +-- Check that setting shouldhaveshards to false is correctly transferred to other mx nodes +\c - - - :master_port +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false); +select shouldhaveshards from pg_dist_node where nodeport = 8888; + +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + +-- Check that setting shouldhaveshards to true is correctly transferred to other mx nodes +\c - postgres - :master_port +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true); +select shouldhaveshards from pg_dist_node where nodeport = 8888; + +\c - postgres - :worker_1_port +select shouldhaveshards from pg_dist_node where nodeport = 8888; + + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; diff --git a/src/test/regress/sql/multi_mx_create_table.sql b/src/test/regress/sql/multi_mx_create_table.sql index 09b798981..9e7030921 100644 --- a/src/test/regress/sql/multi_mx_create_table.sql +++ b/src/test/regress/sql/multi_mx_create_table.sql @@ -3,7 +3,6 @@ -- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; -ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -220,7 +219,7 @@ SELECT create_distributed_table('nation_hash_collation_search_path', 'n_nationke \COPY nation_hash_collation_search_path FROM STDIN with delimiter '|'; 0|ALGERIA|0|haggle. carefully final deposits detect slyly agai 1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon -2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special 3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold 4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d 5|ETHIOPIA|0|ven packages wake quickly. regu @@ -246,7 +245,7 @@ SELECT create_distributed_table('citus_mx_test_schema.nation_hash_composite_type 5|ETHIOPIA|0|ven packages wake quickly. regu|(a,f) \. --- now create tpch tables +-- now create tpch tables -- Create new table definitions for use in testing in distributed planning and -- execution functionality. Also create indexes to boost performance. SET search_path TO public; @@ -423,12 +422,12 @@ SET citus.shard_count TO 1; SELECT create_distributed_table('articles_single_shard_hash_mx', 'author_id'); SET citus.shard_count TO 4; -CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); +CREATE TABLE company_employees_mx (company_id int, employee_id int, manager_id int); SELECT create_distributed_table('company_employees_mx', 'company_id'); WITH shard_counts AS ( SELECT logicalrelid, count(*) AS shard_count FROM pg_dist_shard GROUP BY logicalrelid ) -SELECT logicalrelid, colocationid, shard_count, partmethod, repmodel -FROM pg_dist_partition NATURAL JOIN shard_counts +SELECT logicalrelid, colocationid, shard_count, partmethod, repmodel +FROM pg_dist_partition NATURAL JOIN shard_counts ORDER BY colocationid, logicalrelid; diff --git a/src/test/regress/sql/multi_mx_hide_shard_names.sql b/src/test/regress/sql/multi_mx_hide_shard_names.sql index 76e8a0345..7d6f1fd6a 100644 --- a/src/test/regress/sql/multi_mx_hide_shard_names.sql +++ b/src/test/regress/sql/multi_mx_hide_shard_names.sql @@ -39,7 +39,6 @@ SELECT * FROM citus_shard_indexes_on_worker; -- now show that we see the shards, but not the -- indexes as there are no indexes \c - - - :worker_1_port -SET citus.next_shard_id TO 1330000; SET search_path TO 'mx_hide_shard_names'; SELECT * FROM citus_shards_on_worker ORDER BY 2; SELECT * FROM citus_shard_indexes_on_worker ORDER BY 2; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 10a86ae21..b167de557 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -8,7 +8,7 @@ SET citus.next_shard_id TO 1270000; ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; --- Set the colocation id to a safe value so that +-- Set the colocation id to a safe value so that -- it is not affected by future changes to colocation id sequence SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; @@ -29,8 +29,8 @@ CREATE TABLE mx_ref_table (col_1 int, col_2 text); SELECT create_reference_table('mx_ref_table'); -- Check that the created tables are colocated MX tables -SELECT logicalrelid, repmodel, colocationid -FROM pg_dist_partition +SELECT logicalrelid, repmodel, colocationid +FROM pg_dist_partition WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) ORDER BY logicalrelid; @@ -68,7 +68,7 @@ SELECT count(*) FROM pg_dist_partition WHERE logicalrelid='mx_table_worker'::reg DROP TABLE mx_table_worker; -- master_create_worker_shards -CREATE TEMP TABLE pg_dist_shard_temp AS +CREATE TEMP TABLE pg_dist_shard_temp AS SELECT * FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; DELETE FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; @@ -84,6 +84,10 @@ DROP TABLE mx_ref_table; CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1); \c - - - :worker_1_port +-- changing isdatanode +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', false); +SELECT * from master_set_node_property('localhost', 8888, 'shouldhaveshards', true); + -- DDL commands SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass; CREATE INDEX mx_test_index ON mx_table(col_2); @@ -126,11 +130,11 @@ SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); SELECT colocationid FROM pg_dist_partition WHERE logicalrelid='mx_table_2'::regclass; SELECT colocationid AS old_colocation_id -FROM pg_dist_partition +FROM pg_dist_partition WHERE logicalrelid='mx_table'::regclass \gset -UPDATE pg_dist_partition -SET colocationid = :old_colocation_id +UPDATE pg_dist_partition +SET colocationid = :old_colocation_id WHERE logicalrelid='mx_table_2'::regclass; -- start_metadata_sync_to_node @@ -166,7 +170,7 @@ SELECT master_remove_partition_metadata('mx_table'::regclass, 'public', 'mx_tabl SELECT count(*) FROM mx_table; -- master_copy_shard_placement -SELECT logicalrelid, shardid AS testshardid, nodename, nodeport +SELECT logicalrelid, shardid AS testshardid, nodename, nodeport FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port ORDER BY shardid @@ -178,7 +182,7 @@ VALUES (:worker_2_group, :testshardid, 3, 0); SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); -SELECT shardid, nodename, nodeport, shardstate +SELECT shardid, nodename, nodeport, shardstate FROM pg_dist_shard_placement WHERE shardid = :testshardid ORDER BY nodeport; diff --git a/src/test/regress/upgrade/config.py b/src/test/regress/upgrade/config.py index bee7e1a3d..e83640d72 100644 --- a/src/test/regress/upgrade/config.py +++ b/src/test/regress/upgrade/config.py @@ -9,7 +9,7 @@ BEFORE_CITUS_UPGRADE_COORD_SCHEDULE = './before_citus_upgrade_coord_schedule' MASTER = 'master' # This should be updated when citus version changes -MASTER_VERSION = '9.0' +MASTER_VERSION = '9.1' HOME = expanduser("~")