diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 93d1689a2..3b3559467 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -108,6 +108,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + EnsureSchemaNode(); + ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, INVALID_COLOCATION_ID, REPLICATION_MODEL_COORDINATOR); @@ -133,6 +135,8 @@ create_distributed_table(PG_FUNCTION_ARGS) text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; + EnsureSchemaNode(); + /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) { @@ -226,6 +230,8 @@ CreateReferenceTable(Oid relationId) Oid distributionColumnType = InvalidOid; char *distributionColumnName = NULL; + EnsureSchemaNode(); + /* if there are no workers, error out */ if (replicationFactor == 0) { diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index ebe40e9d8..ba71ed719 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -38,6 +38,8 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *tableName = text_to_cstring(tableNameText); + EnsureSchemaNode(); + CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName); DeletePartitionRow(relationId); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index b7250746d..68e2c64ef 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -173,7 +173,6 @@ multi_ProcessUtility(Node *parsetree, char *completionTag) { bool schemaNode = SchemaNode(); - bool propagateChanges = schemaNode && EnableDDLPropagation; bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -233,7 +232,7 @@ multi_ProcessUtility(Node *parsetree, * DDL commands are propagated to workers only if EnableDDLPropagation is * set to true and the current node is the schema node */ - if (propagateChanges) + if (EnableDDLPropagation) { bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); @@ -1977,6 +1976,7 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, "modifications"))); } + EnsureSchemaNode(); ShowNoticeIfNotUsing2PC(); if (shouldSyncMetadata) @@ -2018,6 +2018,7 @@ ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId, "modifications"))); } + EnsureSchemaNode(); ShowNoticeIfNotUsing2PC(); /* diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index db7db578b..874bda931 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -70,6 +70,9 @@ master_create_worker_shards(PG_FUNCTION_ARGS) int32 replicationFactor = PG_GETARG_INT32(2); Oid distributedTableId = ResolveRelationId(tableNameText); + + EnsureSchemaNode(); + CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); PG_RETURN_VOID(); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 6809e706f..411f1f763 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -108,6 +108,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) bool failOK = false; bool isTopLevel = true; + EnsureSchemaNode(); PreventTransactionChain(isTopLevel, "master_apply_delete_command"); queryTreeNode = ParseTreeNode(queryString); @@ -206,6 +207,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS) char *schemaName = text_to_cstring(schemaNameText); char *relationName = text_to_cstring(relationNameText); + EnsureSchemaNode(); PreventTransactionChain(isTopLevel, "DROP distributed table"); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); @@ -240,6 +242,8 @@ master_drop_sequences(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + EnsureSchemaNode(); + /* iterate over sequence names to build single command to DROP them all */ sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL); while (array_iterate(sequenceIterator, &sequenceText, &isNull)) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 45be9044c..60f84c738 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -88,6 +88,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) List *taskList = NIL; int32 affectedTupleCount = 0; + EnsureSchemaNode(); + queryTreeNode = ParseTreeNode(queryString); if (IsA(queryTreeNode, DeleteStmt)) { diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 43c3332bf..1f229261c 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -261,8 +261,13 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) Datum master_get_new_shardid(PG_FUNCTION_ARGS) { - uint64 shardId = GetNextShardId(); - Datum shardIdDatum = Int64GetDatum(shardId); + uint64 shardId = 0; + Datum shardIdDatum = 0; + + EnsureSchemaNode(); + + shardId = GetNextShardId(); + shardIdDatum = Int64GetDatum(shardId); PG_RETURN_DATUM(shardIdDatum); } @@ -321,6 +326,8 @@ master_get_new_placementid(PG_FUNCTION_ARGS) int savedSecurityContext = 0; Datum shardIdDatum = 0; + EnsureSchemaNode(); + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 549c895eb..b64c2bb40 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -90,6 +90,8 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) "is only supported on Citus Enterprise"))); } + EnsureSchemaNode(); + /* RepairShardPlacement function repairs only given shard */ RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, targetNodePort); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 03e3e7019..0bd24b33a 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -73,6 +73,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) List *dropMetadataCommandList = NIL; List *createMetadataCommandList = NIL; + EnsureSchemaNode(); EnsureSuperUser(); PreventTransactionChain(true, "start_metadata_sync_to_node"); @@ -131,6 +132,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) char *nodeNameString = text_to_cstring(nodeName); WorkerNode *workerNode = NULL; + EnsureSchemaNode(); EnsureSuperUser(); workerNode = FindWorkerNode(nodeNameString, nodePort); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 11d62b7fb..f478484d4 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -26,6 +26,7 @@ #include "distributed/deparse_shard_query.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" @@ -643,6 +644,8 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte, /* we only do this check for INSERT ... SELECT queries */ AssertArg(InsertSelectQuery(queryTree)); + EnsureSchemaNode(); + subquery = subqueryRte->subquery; if (contain_volatile_functions((Node *) queryTree)) @@ -1002,6 +1005,7 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) Oid distributedTableId = ExtractFirstDistributedTableId(queryTree); uint32 rangeTableId = 1; Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId); + bool schemaNode = SchemaNode(); List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; bool hasValuesScan = false; @@ -1045,8 +1049,32 @@ ErrorIfModifyQueryNotSupported(Query *queryTree) foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); + bool referenceTable = false; + if (rangeTableEntry->rtekind == RTE_RELATION) { + /* + * We are sure that the table should be distributed, therefore no need to + * call IsDistributedTable() here and DistributedTableCacheEntry will + * error out if the table is not distributed + */ + DistTableCacheEntry *distTableEntry = + DistributedTableCacheEntry(rangeTableEntry->relid); + + if (distTableEntry->partitionMethod == DISTRIBUTE_BY_NONE) + { + referenceTable = true; + } + + if (referenceTable && !schemaNode) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning for the given" + " modification"), + errdetail("Modifications to reference tables are " + "supported only from the schema node."))); + } + queryTableCount++; } else if (rangeTableEntry->rtekind == RTE_VALUES) diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 82b25040a..14122a232 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -74,6 +74,8 @@ mark_tables_colocated(PG_FUNCTION_ARGS) "operation"))); } + EnsureSchemaNode(); + relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject); for (relationIndex = 0; relationIndex < relationCount; relationIndex++) diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 0471503a8..01911ed0b 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -102,6 +102,7 @@ master_remove_node(PG_FUNCTION_ARGS) bool hasShardPlacements = false; WorkerNode *workerNode = NULL; + EnsureSchemaNode(); EnsureSuperUser(); hasShardPlacements = NodeHasActiveShardPlacements(nodeNameString, nodePort); @@ -345,6 +346,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, int32 groupId, char *nodeRack, char *nodeInsertCommand = NULL; List *workerNodeList = NIL; + EnsureSchemaNode(); EnsureSuperUser(); /* acquire a lock so that no one can do this concurrently */ @@ -543,6 +545,23 @@ GetNextNodeId() } +/* + * EnsureSchemaNode checks if the current node is the schema node. If it does not, + * the function errors out. + */ +void +EnsureSchemaNode(void) +{ + int localGroupId = GetLocalGroupId(); + + if (localGroupId != 0) + { + ereport(ERROR, (errmsg("operation is not allowed on this node"), + errhint("Connect to the schema node and run it again."))); + } +} + + /* * InsertNodedRow opens the node system catalog, and inserts a new row with the * given values into that system catalog. diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 9590860d5..19b2ff229 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -62,6 +62,7 @@ extern uint32 WorkerGetLiveNodeCount(void); extern List * WorkerNodeList(void); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern List * ReadWorkerNodes(void); +extern void EnsureSchemaNode(void); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index f0ae3bc1d..98077f51a 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -67,8 +67,23 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE 1 (1 row) +-- commands with a USING a non distributed table error out +CREATE TABLE temp_nations(name text, key integer); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); +ERROR: relation temp_nations is not distributed -- commands with a USING clause are unsupported -CREATE TEMP TABLE temp_nations(name text, key integer); +SELECT master_create_distributed_table('temp_nations', 'name', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('temp_nations', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); ERROR: cannot perform distributed planning for the given modification DETAIL: Joins are not supported in distributed modifications. diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out new file mode 100644 index 000000000..10b27f766 --- /dev/null +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -0,0 +1,361 @@ +-- +-- MULTI_UNSUPPORTED_WORKER_OPERATIONS +-- +-- Tests for ensuring unsupported functions on workers error out. +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1270000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1270000; +-- Set the colocation id to a safe value so that +-- it is not affected by future changes to colocation id sequence +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; +-- Prepare the environment +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 5; +-- Create test tables +CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table', 'col_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table_2', 'col_1'); + create_distributed_table +-------------------------- + +(1 row) + +-- Check that the created tables are colocated MX tables +SELECT logicalrelid, repmodel, colocationid +FROM pg_dist_partition +WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) +ORDER BY logicalrelid; + logicalrelid | repmodel | colocationid +--------------+----------+-------------- + mx_table | s | 150000 + mx_table_2 | s | 150000 +(2 rows) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +SELECT * FROM mx_table ORDER BY col_1; + col_1 | col_2 | col_3 +-------+----------+------- + -37 | 'lorem' | 1 + 80 | 'dolor' | 3 + 7344 | 'sit' | 4 + 65536 | 'ipsum' | 2 + 65832 | 'amet' | 5 +(5 rows) + +-- Try commands from metadata worker +\c - - - :worker_1_port +CREATE TABLE mx_table_worker(col_1 text); +-- master_create_distributed_table +SELECT master_create_distributed_table('mx_table_worker', 'col_1', 'hash'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +-- create_distributed_table +SELECT create_distributed_table('mx_table_worker', 'col_1'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +-- create_reference_table +SELECT create_reference_table('mx_table_worker'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM pg_dist_partition WHERE logicalrelid='mx_table_worker'::regclass; + count +------- + 0 +(1 row) + +DROP TABLE mx_table_worker; +-- master_create_worker_shards +CREATE TEMP TABLE pg_dist_shard_temp AS +SELECT * FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; +DELETE FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; +SELECT master_create_worker_shards('mx_table', 5, 1); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + count +------- + 0 +(1 row) + +INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + count +------- + 5 +(1 row) + +-- DDL commands +\d mx_table + Table "public.mx_table" + Column | Type | Modifiers +--------+---------+---------------------------------------------------------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) + +CREATE INDEX mx_test_index ON mx_table(col_1); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +ALTER TABLE mx_table ADD COLUMN col_4 int; +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +\d mx_table + Table "public.mx_table" + Column | Type | Modifiers +--------+---------+---------------------------------------------------------- + col_1 | integer | + col_2 | text | + col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) + +-- master_modify_multiple_shards +SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none'''); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM mx_table WHERE col_2='none'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM mx_table WHERE col_2!='none'; + count +------- + 5 +(1 row) + +SELECT master_modify_multiple_shards('DELETE FROM mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_drop_all_shards +SELECT master_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_table'::regclass; + count +------- + 5 +(1 row) + +-- master_apply_delete_command +SELECT master_apply_delete_command('DELETE FROM mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_add_node +SELECT master_add_node('localhost', 5432); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+----------+----------+----------+------------- +(0 rows) + +-- master_remove_node +\c - - - :master_port +SELECT master_add_node('localhost', 5432); + master_add_node +-------------------------------- + (3,3,localhost,5432,default,f) +(1 row) + +\c - - - :worker_1_port +SELECT master_remove_node('localhost', 5432); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +--------+---------+-----------+----------+----------+------------- + 3 | 3 | localhost | 5432 | default | f +(1 row) + +\c - - - :master_port +SELECT master_remove_node('localhost', 5432); + master_remove_node +-------------------- + +(1 row) + +-- TRUNCATE +\c - - - :worker_1_port +TRUNCATE mx_table; +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)" +PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- INSERT / SELECT +INSERT INTO mx_table_2 SELECT * FROM mx_table; +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM mx_table_2; + count +------- + 0 +(1 row) + +-- mark_tables_colocated +UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; +SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT colocationid FROM pg_dist_partition WHERE logicalrelid='mx_table_2'::regclass; + colocationid +-------------- + 0 +(1 row) + +SELECT colocationid AS old_colocation_id +FROM pg_dist_partition +WHERE logicalrelid='mx_table'::regclass \gset +UPDATE pg_dist_partition +SET colocationid = :old_colocation_id +WHERE logicalrelid='mx_table_2'::regclass; +-- start_metadata_sync_to_node +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + f +(1 row) + +-- stop_metadata_sync_to_node +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +\c - - - :master_port +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + t +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + hasmetadata +------------- + f +(1 row) + +\c - - - :worker_2_port +DELETE FROM pg_dist_node; +SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; + worker_drop_distributed_table +------------------------------- + + +(2 rows) + +\c - - - :worker_1_port +-- DROP TABLE +DROP TABLE mx_table; +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_drop_distributed_table_metadata +SELECT master_drop_distributed_table_metadata('mx_table'::regclass, 'public', 'mx_table'); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT count(*) FROM mx_table; + count +------- + 5 +(1 row) + +-- master_copy_shard_placement +SELECT logicalrelid, shardid AS testshardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port +ORDER BY shardid +LIMIT 1 \gset +INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength) +VALUES ('localhost', :worker_2_port, :testshardid, 3, 0); +SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +SELECT shardid, nodename, nodeport, shardstate +FROM pg_dist_shard_placement +WHERE shardid = :testshardid +ORDER BY nodeport; + shardid | nodename | nodeport | shardstate +---------+-----------+----------+------------ + 1270000 | localhost | 57637 | 1 + 1270000 | localhost | 57638 | 3 +(2 rows) + +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port AND shardid = :testshardid; +-- master_get_new_placementid +SELECT master_get_new_placementid(); +ERROR: operation is not allowed on this node +HINT: Connect to the schema node and run it again. +-- Cleanup +\c - - - :master_port +DROP TABLE mx_table; +DROP TABLE mx_table_2; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +\c - - - :worker_1_port +DELETE FROM pg_dist_node; +SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; + worker_drop_distributed_table +------------------------------- +(0 rows) + +\c - - - :master_port +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index ec5b60acf..1029d0088 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -140,7 +140,6 @@ test: multi_data_types test: multi_repartition_udt test: multi_repartitioned_subquery_udf test: multi_modifying_xacts -test: multi_metadata_sync test: multi_transaction_recovery # --------- @@ -158,6 +157,13 @@ test: multi_router_planner # ---------- test: multi_large_shardid +# ---------- +# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers +# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers +# ---------- +test: multi_metadata_sync +test: multi_unsupported_worker_operations + # ---------- # multi_drop_extension makes sure we can safely drop and recreate the extension # ---------- diff --git a/src/test/regress/sql/multi_shard_modify.sql b/src/test/regress/sql/multi_shard_modify.sql index 6564e6f11..908bc9f46 100644 --- a/src/test/regress/sql/multi_shard_modify.sql +++ b/src/test/regress/sql/multi_shard_modify.sql @@ -70,8 +70,13 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE -- DELETE with expression in WHERE clause SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE t_key = (3*18-40)'); +-- commands with a USING a non distributed table error out +CREATE TABLE temp_nations(name text, key integer); +SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); + -- commands with a USING clause are unsupported -CREATE TEMP TABLE temp_nations(name text, key integer); +SELECT master_create_distributed_table('temp_nations', 'name', 'hash'); +SELECT master_create_worker_shards('temp_nations', 4, 2); SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' '); -- commands with a RETURNING clause are unsupported diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql new file mode 100644 index 000000000..9b30428d4 --- /dev/null +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -0,0 +1,193 @@ +-- +-- MULTI_UNSUPPORTED_WORKER_OPERATIONS +-- + +-- Tests for ensuring unsupported functions on workers error out. + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1270000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1270000; + +-- Set the colocation id to a safe value so that +-- it is not affected by future changes to colocation id sequence +SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 150000; + +-- Prepare the environment +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 5; + +-- Create test tables +CREATE TABLE mx_table (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table', 'col_1'); + +CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); +SELECT create_distributed_table('mx_table_2', 'col_1'); + +-- Check that the created tables are colocated MX tables +SELECT logicalrelid, repmodel, colocationid +FROM pg_dist_partition +WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) +ORDER BY logicalrelid; + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + +COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); +-37, 'lorem' +65536, 'ipsum' +80, 'dolor' +7344, 'sit' +65832, 'amet' +\. + +SELECT * FROM mx_table ORDER BY col_1; + +-- Try commands from metadata worker +\c - - - :worker_1_port + +CREATE TABLE mx_table_worker(col_1 text); + +-- master_create_distributed_table +SELECT master_create_distributed_table('mx_table_worker', 'col_1', 'hash'); + +-- create_distributed_table +SELECT create_distributed_table('mx_table_worker', 'col_1'); + +-- create_reference_table +SELECT create_reference_table('mx_table_worker'); + +SELECT count(*) FROM pg_dist_partition WHERE logicalrelid='mx_table_worker'::regclass; +DROP TABLE mx_table_worker; + +-- master_create_worker_shards +CREATE TEMP TABLE pg_dist_shard_temp AS +SELECT * FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; + +DELETE FROM pg_dist_shard WHERE logicalrelid = 'mx_table'::regclass; + +SELECT master_create_worker_shards('mx_table', 5, 1); +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + +INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp; +SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; + + +-- DDL commands +\d mx_table +CREATE INDEX mx_test_index ON mx_table(col_1); +ALTER TABLE mx_table ADD COLUMN col_4 int; +ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); +\d mx_table + +-- master_modify_multiple_shards +SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none'''); +SELECT count(*) FROM mx_table WHERE col_2='none'; +SELECT count(*) FROM mx_table WHERE col_2!='none'; +SELECT master_modify_multiple_shards('DELETE FROM mx_table'); +SELECT count(*) FROM mx_table; + +-- master_drop_all_shards +SELECT master_drop_all_shards('mx_table'::regclass, 'public', 'mx_table'); +SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE logicalrelid='mx_table'::regclass; + +-- master_apply_delete_command +SELECT master_apply_delete_command('DELETE FROM mx_table'); +SELECT count(*) FROM mx_table; + +-- master_add_node +SELECT master_add_node('localhost', 5432); +SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + +-- master_remove_node +\c - - - :master_port +SELECT master_add_node('localhost', 5432); + +\c - - - :worker_1_port +SELECT master_remove_node('localhost', 5432); +SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; + +\c - - - :master_port +SELECT master_remove_node('localhost', 5432); + +-- TRUNCATE +\c - - - :worker_1_port +TRUNCATE mx_table; +SELECT count(*) FROM mx_table; + +-- INSERT / SELECT +INSERT INTO mx_table_2 SELECT * FROM mx_table; +SELECT count(*) FROM mx_table_2; + +-- mark_tables_colocated +UPDATE pg_dist_partition SET colocationid = 0 WHERE logicalrelid='mx_table_2'::regclass; + +SELECT mark_tables_colocated('mx_table', ARRAY['mx_table_2']); +SELECT colocationid FROM pg_dist_partition WHERE logicalrelid='mx_table_2'::regclass; + +SELECT colocationid AS old_colocation_id +FROM pg_dist_partition +WHERE logicalrelid='mx_table'::regclass \gset + +UPDATE pg_dist_partition +SET colocationid = :old_colocation_id +WHERE logicalrelid='mx_table_2'::regclass; + +-- start_metadata_sync_to_node +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; + +-- stop_metadata_sync_to_node +\c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +\c - - - :worker_1_port + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + +\c - - - :master_port +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; +\c - - - :worker_2_port +DELETE FROM pg_dist_node; +SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; +\c - - - :worker_1_port + +-- DROP TABLE +DROP TABLE mx_table; +SELECT count(*) FROM mx_table; + +-- master_drop_distributed_table_metadata +SELECT master_drop_distributed_table_metadata('mx_table'::regclass, 'public', 'mx_table'); +SELECT count(*) FROM mx_table; + +-- master_copy_shard_placement +SELECT logicalrelid, shardid AS testshardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid = 'mx_table'::regclass AND nodeport=:worker_1_port +ORDER BY shardid +LIMIT 1 \gset + +INSERT INTO pg_dist_shard_placement (nodename, nodeport, shardid, shardstate, shardlength) +VALUES ('localhost', :worker_2_port, :testshardid, 3, 0); + +SELECT master_copy_shard_placement(:testshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +SELECT shardid, nodename, nodeport, shardstate +FROM pg_dist_shard_placement +WHERE shardid = :testshardid +ORDER BY nodeport; + +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port AND shardid = :testshardid; + +-- master_get_new_placementid +SELECT master_get_new_placementid(); + +-- Cleanup +\c - - - :master_port +DROP TABLE mx_table; +DROP TABLE mx_table_2; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +\c - - - :worker_1_port +DELETE FROM pg_dist_node; +SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition; +\c - - - :master_port +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;