diff --git a/src/backend/distributed/commands/drop_distributed_table.c b/src/backend/distributed/commands/drop_distributed_table.c index 736bc060a..303e2c899 100644 --- a/src/backend/distributed/commands/drop_distributed_table.c +++ b/src/backend/distributed/commands/drop_distributed_table.c @@ -146,5 +146,5 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName /* drop the distributed table metadata on the workers */ char *deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName); - SendCommandToWorkers(WORKERS_WITH_METADATA, deleteDistributionCommand); + SendCommandToWorkersWithMetadata(deleteDistributionCommand); } diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 7b791fa96..c3c0309b1 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -103,7 +103,7 @@ ProcessDropTableStmt(DropStmt *dropTableStatement) continue; } - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); foreach(partitionCell, partitionList) { @@ -111,7 +111,7 @@ ProcessDropTableStmt(DropStmt *dropTableStatement) char *detachPartitionCommand = GenerateDetachPartitionCommand(partitionRelationId); - SendCommandToWorkers(WORKERS_WITH_METADATA, detachPartitionCommand); + SendCommandToWorkersWithMetadata(detachPartitionCommand); } } } diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 888a72ac9..8dd4f09c1 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -415,8 +415,7 @@ ProcessAlterEnumStmt(AlterEnumStmt *stmt, const char *queryString) List *commands = list_make2(DISABLE_DDL_PROPAGATION, (void *) alterEnumStmtSql); - int result = SendBareOptionalCommandListToWorkersAsUser(ALL_WORKERS, commands, - NULL); + int result = SendBareOptionalCommandListToAllWorkersAsUser(commands, NULL); if (result != RESPONSE_OKAY) { diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index cc4026060..d9703f849 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1015,7 +1015,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand(); - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); /* * Given that we're relaying the query to the worker nodes directly, @@ -1023,10 +1023,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) */ if (setSearchPathCommand != NULL) { - SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand); + SendCommandToWorkersWithMetadata(setSearchPathCommand); } - SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); + SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); } /* use adaptive executor when enabled */ @@ -1060,7 +1060,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) commandList = lappend(commandList, (char *) ddlJob->commandString); - SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); + SendBareCommandListToMetadataWorkers(commandList); } } PG_CATCH(); diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index f54f40f15..9abc8241a 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -316,8 +316,8 @@ master_drop_sequences(PG_FUNCTION_ARGS) { appendStringInfoString(dropSeqCommand, " CASCADE"); - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(WORKERS_WITH_METADATA, dropSeqCommand->data); + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + SendCommandToWorkersWithMetadata(dropSeqCommand->data); } PG_RETURN_VOID(); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 9b28dd3e8..488fb1f15 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1188,14 +1188,14 @@ CreateTableMetadataOnWorkers(Oid relationId) ListCell *commandCell = NULL; /* prevent recursive propagation */ - SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); /* send the commands one by one */ foreach(commandCell, commandList) { char *command = (char *) lfirst(commandCell); - SendCommandToWorkers(WORKERS_WITH_METADATA, command); + SendCommandToWorkersWithMetadata(command); } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 8625a36ca..dd6e26119 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -990,7 +990,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) /* make sure we don't have any lingering session lifespan connections */ CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); - SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); + SendCommandToWorkersWithMetadata(nodeDeleteCommand); } @@ -1098,7 +1098,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, /* send the delete command to all primary nodes with metadata */ char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); - SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); + SendCommandToWorkersWithMetadata(nodeDeleteCommand); /* finally prepare the insert command and send it to all primary nodes */ uint32 primariesWithMetadata = CountPrimariesWithMetadata(); @@ -1107,7 +1107,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort, List *workerNodeList = list_make1(workerNode); char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); - SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); + SendCommandToWorkersWithMetadata(nodeInsertCommand); } return workerNode->nodeId; @@ -1178,7 +1178,7 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) heap_close(pgDistNode, NoLock); /* we also update the column at worker nodes */ - SendCommandToWorkers(WORKERS_WITH_METADATA, metadataSyncCommand); + SendCommandToWorkersWithMetadata(metadataSyncCommand); return newWorkerNode; } diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 1d5dcc0b4..11a4160b8 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -32,6 +32,18 @@ #include "utils/memutils.h" +static void SendCommandToMetadataWorkersParams(const char *command, + const char *user, int parameterCount, + const Oid *parameterTypes, const + char *const *parameterValues); +static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, + const char *command, const char *user, + int parameterCount, const + Oid *parameterTypes, + const char *const *parameterValues); +static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList); + + /* * SendCommandToWorker sends a command to a particular worker as part of the * 2PC. @@ -98,10 +110,10 @@ SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, * owner to ensure write access to the Citus metadata tables. */ void -SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command) +SendCommandToWorkersWithMetadata(const char *command) { - SendCommandToWorkersParams(targetWorkerSet, command, CitusExtensionOwnerName(), - 0, NULL, NULL); + SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(), + 0, NULL, NULL); } @@ -120,8 +132,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - if (targetWorkerSet == WORKERS_WITH_METADATA && - (!workerNode->hasMetadata || !workerNode->metadataSynced)) + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) { continue; } @@ -139,20 +150,23 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) /* - * SendBareCommandListToWorkers sends a list of commands to a set of target + * SendBareCommandListToMetadataWorkers sends a list of commands to metadata * workers in serial. Commands are committed immediately: new connections are * always used and no transaction block is used (hence "bare"). The connections * are made as the extension owner to ensure write access to the Citus metadata * tables. Primarly useful for INDEX commands using CONCURRENTLY. */ void -SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) +SendBareCommandListToMetadataWorkers(List *commandList) { + TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); ListCell *workerNodeCell = NULL; char *nodeUser = CitusExtensionOwnerName(); ListCell *commandCell = NULL; + ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); + /* run commands serially */ foreach(workerNodeCell, workerNodeList) { @@ -180,14 +194,14 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) /* - * SendBareOptionalCommandListToWorkersAsUser sends a list of commands to a set of target - * workers in serial. Commands are committed immediately: new connections are - * always used and no transaction block is used (hence "bare"). + * SendBareOptionalCommandListToAllWorkersAsUser sends a list of commands + * to all workers in serial. Commands are committed immediately: new + * connections are always used and no transaction block is used (hence "bare"). */ int -SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet, - List *commandList, const char *user) +SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user) { + TargetWorkerSet targetWorkerSet = ALL_WORKERS; List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); ListCell *workerNodeCell = NULL; ListCell *commandCell = NULL; @@ -227,17 +241,38 @@ SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet, /* - * SendCommandToWorkersParams sends a command to all workers in parallel. + * SendCommandToMetadataWorkersParams is a wrapper around + * SendCommandToWorkersParamsInternal() enforcing some extra checks. + */ +static void +SendCommandToMetadataWorkersParams(const char *command, + const char *user, int parameterCount, + const Oid *parameterTypes, const + char *const *parameterValues) +{ + List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock); + + ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); + + SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user, + parameterCount, parameterTypes, + parameterValues); +} + + +/* + * SendCommandToWorkersParamsInternal sends a command to all workers in parallel. * Commands are committed on the workers when the local transaction commits. The * connection are made as the extension owner to ensure write access to the Citus * metadata tables. Parameters can be specified as for PQexecParams, except that * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0 * respectively. */ -void -SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, const char *command, - const char *user, int parameterCount, - const Oid *parameterTypes, const char *const *parameterValues) +static void +SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command, + const char *user, int parameterCount, + const Oid *parameterTypes, const + char *const *parameterValues) { List *connectionList = NIL; ListCell *connectionCell = NULL; @@ -347,3 +382,42 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, RemoteTransactionCommit(workerConnection); CloseConnection(workerConnection); } + + +/* + * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given + * metadata nodes are out of sync. It is safer to avoid metadata changing + * commands (e.g. DDL or node addition) until all metadata nodes have + * been synced. + * + * An example of we could get in a bad situation without doing so is: + * 1. Create a reference table + * 2. After the node becomes out of sync, add a new active node + * 3. Insert into the reference table from the out of sync node + * + * Since the out-of-sync might not know about the new node, it won't propagate + * the changes to the new node and replicas will be in an inconsistent state. + */ +static void +ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList) +{ + ListCell *workerNodeCell = NULL; + + foreach(workerNodeCell, metadataNodeList) + { + WorkerNode *metadataNode = lfirst(workerNodeCell); + + Assert(metadataNode->hasMetadata); + + if (!metadataNode->metadataSynced) + { + const char *workerName = metadataNode->workerName; + int workerPort = metadataNode->workerPort; + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("%s:%d is a metadata node, but is out of sync", + workerName, workerPort), + errhint("If the node is up, wait until metadata" + " gets synced to it and try again."))); + } + } +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 5c7f4a890..c1ed7ae05 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -662,7 +662,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId, colocationId); - SendCommandToWorkers(WORKERS_WITH_METADATA, updateColocationIdCommand); + SendCommandToWorkersWithMetadata(updateColocationIdCommand); } } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 48d134112..8a1255ef0 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -329,7 +329,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) FILE_FINALIZED, 0, groupId); - SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); + SendCommandToWorkersWithMetadata(placementCommand); } } } @@ -448,7 +448,7 @@ DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId) "DELETE FROM pg_dist_placement WHERE placementid = " UINT64_FORMAT, placement->placementId); - SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data); + SendCommandToWorkersWithMetadata(deletePlacementCommand->data); } } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 79d8cf342..a6240fc2c 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -286,7 +286,7 @@ LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList) appendStringInfo(lockCommand, "])"); - SendCommandToWorkers(WORKERS_WITH_METADATA, lockCommand->data); + SendCommandToWorkersWithMetadata(lockCommand->data); } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a86945ccf..66c522c2f 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -37,16 +37,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const const char *command); extern void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, const char *command); -extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command); -extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, - List *commandList); -extern int SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet, - List *commandList, - const char *user); -extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, - const char *command, const char *user, - int parameterCount, const Oid *parameterTypes, - const char *const *parameterValues); +extern void SendCommandToWorkersWithMetadata(const char *command); +extern void SendBareCommandListToMetadataWorkers(List *commandList); +extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, + const char *user); extern void EnsureNoModificationsHaveBeenDone(void); extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 41a84f27f..84ebb350c 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -624,6 +624,12 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. HINT: When distributing tables make sure that citus.replication_model = 'streaming' +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; @@ -752,6 +758,9 @@ SELECT wait_until_metadata_sync(); (1 row) +SET client_min_messages TO error; -- suppress cascading objects dropping +DROP SCHEMA function_tests CASCADE; +DROP SCHEMA function_tests2 CASCADE; -- clear objects SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; stop_metadata_sync_to_node @@ -760,41 +769,23 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa (2 rows) -SET client_min_messages TO error; -- suppress cascading objects dropping -DROP SCHEMA function_tests CASCADE; -DROP SCHEMA function_tests2 CASCADE; -- This is hacky, but we should clean-up the resources as below \c - - - :worker_1_port -SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; -SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; - worker_drop_distributed_table -------------------------------- - - - -(3 rows) - +TRUNCATE pg_dist_node; +SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; -TRUNCATE pg_dist_node; +SET search_path TO function_tests, function_tests2; \c - - - :worker_2_port -SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; -SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; - worker_drop_distributed_table -------------------------------- - - - -(3 rows) - +TRUNCATE pg_dist_node; +SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; -TRUNCATE pg_dist_node; \c - - - :master_port DROP USER functionuser; -SELECT run_command_on_workers($$DROP USER functionuser;$$); +SELECT run_command_on_workers($$DROP USER functionuser$$); run_command_on_workers --------------------------------- (localhost,57637,t,"DROP ROLE") diff --git a/src/test/regress/expected/distributed_procedure.out b/src/test/regress/expected/distributed_procedure.out index cf438ed5d..31258530c 100644 --- a/src/test/regress/expected/distributed_procedure.out +++ b/src/test/regress/expected/distributed_procedure.out @@ -20,6 +20,19 @@ BEGIN RAISE INFO 'information message %', $1; END; $proc$; +-- set sync intervals to less than 15s so wait_until_metadata_sync never times out +ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); SET citus.replication_model TO 'streaming'; @@ -36,6 +49,12 @@ SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'c (1 row) +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; nodename | nodeport | success | result -----------+----------+---------+-------- diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 08d1e5fac..f690f436b 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -1041,6 +1041,8 @@ DROP TABLE table1_groupd; DROP TABLE table2_groupd; DROP TABLE table1_groupf; DROP TABLE table2_groupf; +DROP TABLE table1_groupg; +DROP TABLE table2_groupg; DROP TABLE table1_groupe; DROP TABLE table2_groupe; DROP TABLE table3_groupe; diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 5bd752c00..3c446fd03 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -1497,13 +1497,86 @@ select shouldhaveshards from pg_dist_node where nodeport = 8888; t (1 row) --- Cleanup \c - - - :master_port -DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; -NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 -DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; -DROP TABLE mx_testing_schema.mx_test_table; -DROP TABLE mx_ref; +-- +-- Check that metadata commands error out if any nodes are out-of-sync +-- +-- increase metadata_sync intervals to avoid metadata sync while we test +ALTER SYSTEM SET citus.metadata_sync_interval TO 300000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 300000; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; +CREATE TABLE dist_table_1(a int); +SELECT create_distributed_table('dist_table_1', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +UPDATE pg_dist_node SET metadatasynced=false WHERE nodeport=:worker_1_port; +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport=:worker_1_port; + hasmetadata | metadatasynced +-------------+---------------- + t | f +(1 row) + +CREATE TABLE dist_table_2(a int); +SELECT create_distributed_table('dist_table_2', 'a'); +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +SELECT create_reference_table('dist_table_2'); +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +ALTER TABLE dist_table_1 ADD COLUMN b int; +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +SELECT master_add_node('localhost', :master_port, groupid => 0); +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +SELECT master_disable_node('localhost', :worker_1_port); +ERROR: Disabling localhost:57637 failed +DETAIL: localhost:57637 is a metadata node, but is out of sync +HINT: If you are using MX, try stop_metadata_sync_to_node(hostname, port) for nodes that are down before disabling them. +SELECT master_disable_node('localhost', :worker_2_port); +ERROR: Disabling localhost:57638 failed +DETAIL: localhost:57637 is a metadata node, but is out of sync +HINT: If you are using MX, try stop_metadata_sync_to_node(hostname, port) for nodes that are down before disabling them. +SELECT master_remove_node('localhost', :worker_1_port); +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +SELECT master_remove_node('localhost', :worker_2_port); +ERROR: localhost:57637 is a metadata node, but is out of sync +HINT: If the node is up, wait until metadata gets synced to it and try again. +-- master_update_node should succeed +SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT master_update_node(:worker_2_nodeid, 'localhost', 4444); + master_update_node +-------------------- + +(1 row) + +SELECT master_update_node(:worker_2_nodeid, 'localhost', :worker_2_port); + master_update_node +-------------------- + +(1 row) + +ALTER SYSTEM SET citus.metadata_sync_interval TO DEFAULT; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO DEFAULT; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port; +-- Cleanup SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- @@ -1516,6 +1589,12 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); (1 row) +DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; +NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 +DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; +DROP TABLE mx_testing_schema.mx_test_table; +DROP TABLE mx_ref; +DROP TABLE dist_table_1, dist_table_2; RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.replication_model; diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index c14d2309f..1672b04ac 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -361,6 +361,8 @@ SET citus.replication_model TO "statement"; SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); +SELECT wait_until_metadata_sync(); + -- a function can be colocated with a different distribution argument type -- as long as there is a coercion path SET citus.shard_replication_factor TO 1; @@ -429,32 +431,29 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); -- sync metadata to workers for consistent results when clearing objects SELECT wait_until_metadata_sync(); --- clear objects -SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; +-- clear objects +SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; -- This is hacky, but we should clean-up the resources as below \c - - - :worker_1_port -SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; -SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; -TRUNCATE pg_dist_node; - +SET search_path TO function_tests, function_tests2; \c - - - :worker_2_port -SET client_min_messages TO error; -- suppress cascading objects dropping UPDATE pg_dist_local_group SET groupid = 0; -SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; +TRUNCATE pg_dist_node; +SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; -TRUNCATE pg_dist_node; - \c - - - :master_port DROP USER functionuser; -SELECT run_command_on_workers($$DROP USER functionuser;$$); +SELECT run_command_on_workers($$DROP USER functionuser$$); diff --git a/src/test/regress/sql/distributed_procedure.sql b/src/test/regress/sql/distributed_procedure.sql index 3ea35c7b9..1bed22750 100644 --- a/src/test/regress/sql/distributed_procedure.sql +++ b/src/test/regress/sql/distributed_procedure.sql @@ -17,6 +17,16 @@ BEGIN END; $proc$; +-- set sync intervals to less than 15s so wait_until_metadata_sync never times out +ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; +SELECT pg_reload_conf(); + +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. CREATE TABLE colocation_table(id text); SET citus.replication_model TO 'streaming'; @@ -24,6 +34,8 @@ SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('colocation_table','id'); SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); +SELECT wait_until_metadata_sync(); + SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index fa2297b51..501546373 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -439,6 +439,8 @@ DROP TABLE table1_groupd; DROP TABLE table2_groupd; DROP TABLE table1_groupf; DROP TABLE table2_groupf; +DROP TABLE table1_groupg; +DROP TABLE table2_groupg; DROP TABLE table1_groupe; DROP TABLE table2_groupe; DROP TABLE table3_groupe; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 53d10c11c..c0f7652b1 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -701,15 +701,56 @@ select shouldhaveshards from pg_dist_node where nodeport = 8888; \c - postgres - :worker_1_port select shouldhaveshards from pg_dist_node where nodeport = 8888; +\c - - - :master_port +-- +-- Check that metadata commands error out if any nodes are out-of-sync +-- + +-- increase metadata_sync intervals to avoid metadata sync while we test +ALTER SYSTEM SET citus.metadata_sync_interval TO 300000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 300000; +SELECT pg_reload_conf(); + +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE dist_table_1(a int); +SELECT create_distributed_table('dist_table_1', 'a'); + +UPDATE pg_dist_node SET metadatasynced=false WHERE nodeport=:worker_1_port; +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport=:worker_1_port; + +CREATE TABLE dist_table_2(a int); +SELECT create_distributed_table('dist_table_2', 'a'); +SELECT create_reference_table('dist_table_2'); + +ALTER TABLE dist_table_1 ADD COLUMN b int; + +SELECT master_add_node('localhost', :master_port, groupid => 0); +SELECT master_disable_node('localhost', :worker_1_port); +SELECT master_disable_node('localhost', :worker_2_port); +SELECT master_remove_node('localhost', :worker_1_port); +SELECT master_remove_node('localhost', :worker_2_port); + +-- master_update_node should succeed +SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT master_update_node(:worker_2_nodeid, 'localhost', 4444); +SELECT master_update_node(:worker_2_nodeid, 'localhost', :worker_2_port); + +ALTER SYSTEM SET citus.metadata_sync_interval TO DEFAULT; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO DEFAULT; +SELECT pg_reload_conf(); + +UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port; -- Cleanup -\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_ref; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +DROP TABLE dist_table_1, dist_table_2; RESET citus.shard_count; RESET citus.shard_replication_factor;