Error for metadata commands if any metadata node is out-of-sync (#3226)

* Error for metadata commands if any metadata node is out-of-sync

* Make the functions have separate APIs for all workers/metadata workers
pull/3233/head
Hadi Moshayedi 2019-11-27 00:52:57 -08:00 committed by Önder Kalacı
parent 2329157406
commit 2268a9cae6
20 changed files with 304 additions and 92 deletions

View File

@ -146,5 +146,5 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName
/* drop the distributed table metadata on the workers */ /* drop the distributed table metadata on the workers */
char *deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName); char *deleteDistributionCommand = DistributionDeleteCommand(schemaName, tableName);
SendCommandToWorkers(WORKERS_WITH_METADATA, deleteDistributionCommand); SendCommandToWorkersWithMetadata(deleteDistributionCommand);
} }

View File

@ -103,7 +103,7 @@ ProcessDropTableStmt(DropStmt *dropTableStatement)
continue; continue;
} }
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
foreach(partitionCell, partitionList) foreach(partitionCell, partitionList)
{ {
@ -111,7 +111,7 @@ ProcessDropTableStmt(DropStmt *dropTableStatement)
char *detachPartitionCommand = char *detachPartitionCommand =
GenerateDetachPartitionCommand(partitionRelationId); GenerateDetachPartitionCommand(partitionRelationId);
SendCommandToWorkers(WORKERS_WITH_METADATA, detachPartitionCommand); SendCommandToWorkersWithMetadata(detachPartitionCommand);
} }
} }
} }

View File

@ -415,8 +415,7 @@ ProcessAlterEnumStmt(AlterEnumStmt *stmt, const char *queryString)
List *commands = list_make2(DISABLE_DDL_PROPAGATION, (void *) alterEnumStmtSql); List *commands = list_make2(DISABLE_DDL_PROPAGATION, (void *) alterEnumStmtSql);
int result = SendBareOptionalCommandListToWorkersAsUser(ALL_WORKERS, commands, int result = SendBareOptionalCommandListToAllWorkersAsUser(commands, NULL);
NULL);
if (result != RESPONSE_OKAY) if (result != RESPONSE_OKAY)
{ {

View File

@ -1015,7 +1015,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{ {
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand(); char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* /*
* Given that we're relaying the query to the worker nodes directly, * Given that we're relaying the query to the worker nodes directly,
@ -1023,10 +1023,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
*/ */
if (setSearchPathCommand != NULL) if (setSearchPathCommand != NULL)
{ {
SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand); SendCommandToWorkersWithMetadata(setSearchPathCommand);
} }
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString); SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
} }
/* use adaptive executor when enabled */ /* use adaptive executor when enabled */
@ -1060,7 +1060,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
commandList = lappend(commandList, (char *) ddlJob->commandString); commandList = lappend(commandList, (char *) ddlJob->commandString);
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList); SendBareCommandListToMetadataWorkers(commandList);
} }
} }
PG_CATCH(); PG_CATCH();

View File

@ -316,8 +316,8 @@ master_drop_sequences(PG_FUNCTION_ARGS)
{ {
appendStringInfoString(dropSeqCommand, " CASCADE"); appendStringInfoString(dropSeqCommand, " CASCADE");
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, dropSeqCommand->data); SendCommandToWorkersWithMetadata(dropSeqCommand->data);
} }
PG_RETURN_VOID(); PG_RETURN_VOID();

View File

@ -1188,14 +1188,14 @@ CreateTableMetadataOnWorkers(Oid relationId)
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */ /* send the commands one by one */
foreach(commandCell, commandList) foreach(commandCell, commandList)
{ {
char *command = (char *) lfirst(commandCell); char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command); SendCommandToWorkersWithMetadata(command);
} }
} }

View File

@ -990,7 +990,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
/* make sure we don't have any lingering session lifespan connections */ /* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort); CloseNodeConnectionsAfterTransaction(workerNode->workerName, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); SendCommandToWorkersWithMetadata(nodeDeleteCommand);
} }
@ -1098,7 +1098,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
/* send the delete command to all primary nodes with metadata */ /* send the delete command to all primary nodes with metadata */
char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); char *nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeDeleteCommand); SendCommandToWorkersWithMetadata(nodeDeleteCommand);
/* finally prepare the insert command and send it to all primary nodes */ /* finally prepare the insert command and send it to all primary nodes */
uint32 primariesWithMetadata = CountPrimariesWithMetadata(); uint32 primariesWithMetadata = CountPrimariesWithMetadata();
@ -1107,7 +1107,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
List *workerNodeList = list_make1(workerNode); List *workerNodeList = list_make1(workerNode);
char *nodeInsertCommand = NodeListInsertCommand(workerNodeList); char *nodeInsertCommand = NodeListInsertCommand(workerNodeList);
SendCommandToWorkers(WORKERS_WITH_METADATA, nodeInsertCommand); SendCommandToWorkersWithMetadata(nodeInsertCommand);
} }
return workerNode->nodeId; return workerNode->nodeId;
@ -1178,7 +1178,7 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value)
heap_close(pgDistNode, NoLock); heap_close(pgDistNode, NoLock);
/* we also update the column at worker nodes */ /* we also update the column at worker nodes */
SendCommandToWorkers(WORKERS_WITH_METADATA, metadataSyncCommand); SendCommandToWorkersWithMetadata(metadataSyncCommand);
return newWorkerNode; return newWorkerNode;
} }

View File

@ -32,6 +32,18 @@
#include "utils/memutils.h" #include "utils/memutils.h"
static void SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes, const
char *const *parameterValues);
static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
const char *command, const char *user,
int parameterCount, const
Oid *parameterTypes,
const char *const *parameterValues);
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
/* /*
* SendCommandToWorker sends a command to a particular worker as part of the * SendCommandToWorker sends a command to a particular worker as part of the
* 2PC. * 2PC.
@ -98,9 +110,9 @@ SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser,
* owner to ensure write access to the Citus metadata tables. * owner to ensure write access to the Citus metadata tables.
*/ */
void void
SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command) SendCommandToWorkersWithMetadata(const char *command)
{ {
SendCommandToWorkersParams(targetWorkerSet, command, CitusExtensionOwnerName(), SendCommandToMetadataWorkersParams(command, CitusExtensionOwnerName(),
0, NULL, NULL); 0, NULL, NULL);
} }
@ -120,8 +132,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (targetWorkerSet == WORKERS_WITH_METADATA && if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
(!workerNode->hasMetadata || !workerNode->metadataSynced))
{ {
continue; continue;
} }
@ -139,20 +150,23 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
/* /*
* SendBareCommandListToWorkers sends a list of commands to a set of target * SendBareCommandListToMetadataWorkers sends a list of commands to metadata
* workers in serial. Commands are committed immediately: new connections are * workers in serial. Commands are committed immediately: new connections are
* always used and no transaction block is used (hence "bare"). The connections * always used and no transaction block is used (hence "bare"). The connections
* are made as the extension owner to ensure write access to the Citus metadata * are made as the extension owner to ensure write access to the Citus metadata
* tables. Primarly useful for INDEX commands using CONCURRENTLY. * tables. Primarly useful for INDEX commands using CONCURRENTLY.
*/ */
void void
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) SendBareCommandListToMetadataWorkers(List *commandList)
{ {
TargetWorkerSet targetWorkerSet = WORKERS_WITH_METADATA;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName(); char *nodeUser = CitusExtensionOwnerName();
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
/* run commands serially */ /* run commands serially */
foreach(workerNodeCell, workerNodeList) foreach(workerNodeCell, workerNodeList)
{ {
@ -180,14 +194,14 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
/* /*
* SendBareOptionalCommandListToWorkersAsUser sends a list of commands to a set of target * SendBareOptionalCommandListToAllWorkersAsUser sends a list of commands
* workers in serial. Commands are committed immediately: new connections are * to all workers in serial. Commands are committed immediately: new
* always used and no transaction block is used (hence "bare"). * connections are always used and no transaction block is used (hence "bare").
*/ */
int int
SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet, SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user)
List *commandList, const char *user)
{ {
TargetWorkerSet targetWorkerSet = ALL_WORKERS;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock); List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, ShareLock);
ListCell *workerNodeCell = NULL; ListCell *workerNodeCell = NULL;
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
@ -227,17 +241,38 @@ SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet,
/* /*
* SendCommandToWorkersParams sends a command to all workers in parallel. * SendCommandToMetadataWorkersParams is a wrapper around
* SendCommandToWorkersParamsInternal() enforcing some extra checks.
*/
static void
SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes, const
char *const *parameterValues)
{
List *workerNodeList = TargetWorkerSetNodeList(WORKERS_WITH_METADATA, ShareLock);
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(WORKERS_WITH_METADATA, command, user,
parameterCount, parameterTypes,
parameterValues);
}
/*
* SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The * Commands are committed on the workers when the local transaction commits. The
* connection are made as the extension owner to ensure write access to the Citus * connection are made as the extension owner to ensure write access to the Citus
* metadata tables. Parameters can be specified as for PQexecParams, except that * metadata tables. Parameters can be specified as for PQexecParams, except that
* paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0 * paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0
* respectively. * respectively.
*/ */
void static void
SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, const char *command, SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const char *const *parameterValues) const Oid *parameterTypes, const
char *const *parameterValues)
{ {
List *connectionList = NIL; List *connectionList = NIL;
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
@ -347,3 +382,42 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort,
RemoteTransactionCommit(workerConnection); RemoteTransactionCommit(workerConnection);
CloseConnection(workerConnection); CloseConnection(workerConnection);
} }
/*
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
* metadata nodes are out of sync. It is safer to avoid metadata changing
* commands (e.g. DDL or node addition) until all metadata nodes have
* been synced.
*
* An example of we could get in a bad situation without doing so is:
* 1. Create a reference table
* 2. After the node becomes out of sync, add a new active node
* 3. Insert into the reference table from the out of sync node
*
* Since the out-of-sync might not know about the new node, it won't propagate
* the changes to the new node and replicas will be in an inconsistent state.
*/
static void
ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList)
{
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, metadataNodeList)
{
WorkerNode *metadataNode = lfirst(workerNodeCell);
Assert(metadataNode->hasMetadata);
if (!metadataNode->metadataSynced)
{
const char *workerName = metadataNode->workerName;
int workerPort = metadataNode->workerPort;
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("%s:%d is a metadata node, but is out of sync",
workerName, workerPort),
errhint("If the node is up, wait until metadata"
" gets synced to it and try again.")));
}
}
}

View File

@ -662,7 +662,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId, char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId,
colocationId); colocationId);
SendCommandToWorkers(WORKERS_WITH_METADATA, updateColocationIdCommand); SendCommandToWorkersWithMetadata(updateColocationIdCommand);
} }
} }

View File

@ -329,7 +329,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
FILE_FINALIZED, 0, FILE_FINALIZED, 0,
groupId); groupId);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); SendCommandToWorkersWithMetadata(placementCommand);
} }
} }
} }
@ -448,7 +448,7 @@ DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
"DELETE FROM pg_dist_placement WHERE placementid = " "DELETE FROM pg_dist_placement WHERE placementid = "
UINT64_FORMAT, UINT64_FORMAT,
placement->placementId); placement->placementId);
SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data); SendCommandToWorkersWithMetadata(deletePlacementCommand->data);
} }
} }

View File

@ -286,7 +286,7 @@ LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList)
appendStringInfo(lockCommand, "])"); appendStringInfo(lockCommand, "])");
SendCommandToWorkers(WORKERS_WITH_METADATA, lockCommand->data); SendCommandToWorkersWithMetadata(lockCommand->data);
} }

View File

@ -37,16 +37,10 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const
const char *command); const char *command);
extern void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, extern void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort,
const char *nodeUser, const char *command); const char *nodeUser, const char *command);
extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, extern void SendBareCommandListToMetadataWorkers(List *commandList);
List *commandList); extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList,
extern int SendBareOptionalCommandListToWorkersAsUser(TargetWorkerSet targetWorkerSet,
List *commandList,
const char *user); const char *user);
extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet,
const char *command, const char *user,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,

View File

@ -624,6 +624,12 @@ SELECT create_distributed_function('add_with_param_names(int, int)', '$1', coloc
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test" ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model. DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming' HINT: When distributing tables make sure that citus.replication_model = 'streaming'
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
-- a function can be colocated with a different distribution argument type -- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path -- as long as there is a coercion path
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -752,6 +758,9 @@ SELECT wait_until_metadata_sync();
(1 row) (1 row)
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
-- clear objects -- clear objects
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
stop_metadata_sync_to_node stop_metadata_sync_to_node
@ -760,41 +769,23 @@ SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isa
(2 rows) (2 rows)
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
-- This is hacky, but we should clean-up the resources as below -- This is hacky, but we should clean-up the resources as below
\c - - - :worker_1_port \c - - - :worker_1_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0; UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node;
worker_drop_distributed_table SET client_min_messages TO error; -- suppress cascading objects dropping
-------------------------------
(3 rows)
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node; SET search_path TO function_tests, function_tests2;
\c - - - :worker_2_port \c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0; UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node;
worker_drop_distributed_table SET client_min_messages TO error; -- suppress cascading objects dropping
-------------------------------
(3 rows)
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port \c - - - :master_port
DROP USER functionuser; DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$); SELECT run_command_on_workers($$DROP USER functionuser$$);
run_command_on_workers run_command_on_workers
--------------------------------- ---------------------------------
(localhost,57637,t,"DROP ROLE") (localhost,57637,t,"DROP ROLE")

View File

@ -20,6 +20,19 @@ BEGIN
RAISE INFO 'information message %', $1; RAISE INFO 'information message %', $1;
END; END;
$proc$; $proc$;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text); CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming'; SET citus.replication_model TO 'streaming';
@ -36,6 +49,12 @@ SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'c
(1 row) (1 row)
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
nodename | nodeport | success | result nodename | nodeport | success | result
-----------+----------+---------+-------- -----------+----------+---------+--------

View File

@ -1041,6 +1041,8 @@ DROP TABLE table1_groupd;
DROP TABLE table2_groupd; DROP TABLE table2_groupd;
DROP TABLE table1_groupf; DROP TABLE table1_groupf;
DROP TABLE table2_groupf; DROP TABLE table2_groupf;
DROP TABLE table1_groupg;
DROP TABLE table2_groupg;
DROP TABLE table1_groupe; DROP TABLE table1_groupe;
DROP TABLE table2_groupe; DROP TABLE table2_groupe;
DROP TABLE table3_groupe; DROP TABLE table3_groupe;

View File

@ -1497,13 +1497,86 @@ select shouldhaveshards from pg_dist_node where nodeport = 8888;
t t
(1 row) (1 row)
-- Cleanup
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; --
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 -- Check that metadata commands error out if any nodes are out-of-sync
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; --
DROP TABLE mx_testing_schema.mx_test_table; -- increase metadata_sync intervals to avoid metadata sync while we test
DROP TABLE mx_ref; ALTER SYSTEM SET citus.metadata_sync_interval TO 300000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 300000;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
CREATE TABLE dist_table_1(a int);
SELECT create_distributed_table('dist_table_1', 'a');
create_distributed_table
--------------------------
(1 row)
UPDATE pg_dist_node SET metadatasynced=false WHERE nodeport=:worker_1_port;
SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport=:worker_1_port;
hasmetadata | metadatasynced
-------------+----------------
t | f
(1 row)
CREATE TABLE dist_table_2(a int);
SELECT create_distributed_table('dist_table_2', 'a');
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
SELECT create_reference_table('dist_table_2');
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
ALTER TABLE dist_table_1 ADD COLUMN b int;
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
SELECT master_add_node('localhost', :master_port, groupid => 0);
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
SELECT master_disable_node('localhost', :worker_1_port);
ERROR: Disabling localhost:57637 failed
DETAIL: localhost:57637 is a metadata node, but is out of sync
HINT: If you are using MX, try stop_metadata_sync_to_node(hostname, port) for nodes that are down before disabling them.
SELECT master_disable_node('localhost', :worker_2_port);
ERROR: Disabling localhost:57638 failed
DETAIL: localhost:57637 is a metadata node, but is out of sync
HINT: If you are using MX, try stop_metadata_sync_to_node(hostname, port) for nodes that are down before disabling them.
SELECT master_remove_node('localhost', :worker_1_port);
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
SELECT master_remove_node('localhost', :worker_2_port);
ERROR: localhost:57637 is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
-- master_update_node should succeed
SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT master_update_node(:worker_2_nodeid, 'localhost', 4444);
master_update_node
--------------------
(1 row)
SELECT master_update_node(:worker_2_nodeid, 'localhost', :worker_2_port);
master_update_node
--------------------
(1 row)
ALTER SYSTEM SET citus.metadata_sync_interval TO DEFAULT;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO DEFAULT;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)
UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port;
-- Cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node stop_metadata_sync_to_node
---------------------------- ----------------------------
@ -1516,6 +1589,12 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row) (1 row)
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
DROP TABLE dist_table_1, dist_table_2;
RESET citus.shard_count; RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
RESET citus.replication_model; RESET citus.replication_model;

View File

@ -361,6 +361,8 @@ SET citus.replication_model TO "statement";
SELECT create_distributed_table('replicated_table_func_test', 'a'); SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test'); SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
SELECT wait_until_metadata_sync();
-- a function can be colocated with a different distribution argument type -- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path -- as long as there is a coercion path
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -429,32 +431,29 @@ SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
-- sync metadata to workers for consistent results when clearing objects -- sync metadata to workers for consistent results when clearing objects
SELECT wait_until_metadata_sync(); SELECT wait_until_metadata_sync();
-- clear objects
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;
-- clear objects
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
-- This is hacky, but we should clean-up the resources as below -- This is hacky, but we should clean-up the resources as below
\c - - - :worker_1_port \c - - - :worker_1_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0; UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node;
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node; SET search_path TO function_tests, function_tests2;
\c - - - :worker_2_port \c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0; UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%'; TRUNCATE pg_dist_node;
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port \c - - - :master_port
DROP USER functionuser; DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$); SELECT run_command_on_workers($$DROP USER functionuser$$);

View File

@ -17,6 +17,16 @@ BEGIN
END; END;
$proc$; $proc$;
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
SELECT pg_reload_conf();
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists. -- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text); CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming'; SET citus.replication_model TO 'streaming';
@ -24,6 +34,8 @@ SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id'); SELECT create_distributed_table('colocation_table','id');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table'); SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
SELECT wait_until_metadata_sync();
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2; SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)'); SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');

View File

@ -439,6 +439,8 @@ DROP TABLE table1_groupd;
DROP TABLE table2_groupd; DROP TABLE table2_groupd;
DROP TABLE table1_groupf; DROP TABLE table1_groupf;
DROP TABLE table2_groupf; DROP TABLE table2_groupf;
DROP TABLE table1_groupg;
DROP TABLE table2_groupg;
DROP TABLE table1_groupe; DROP TABLE table1_groupe;
DROP TABLE table2_groupe; DROP TABLE table2_groupe;
DROP TABLE table3_groupe; DROP TABLE table3_groupe;

View File

@ -701,15 +701,56 @@ select shouldhaveshards from pg_dist_node where nodeport = 8888;
\c - postgres - :worker_1_port \c - postgres - :worker_1_port
select shouldhaveshards from pg_dist_node where nodeport = 8888; select shouldhaveshards from pg_dist_node where nodeport = 8888;
\c - - - :master_port
--
-- Check that metadata commands error out if any nodes are out-of-sync
--
-- increase metadata_sync intervals to avoid metadata sync while we test
ALTER SYSTEM SET citus.metadata_sync_interval TO 300000;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 300000;
SELECT pg_reload_conf();
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
CREATE TABLE dist_table_1(a int);
SELECT create_distributed_table('dist_table_1', 'a');
UPDATE pg_dist_node SET metadatasynced=false WHERE nodeport=:worker_1_port;
SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport=:worker_1_port;
CREATE TABLE dist_table_2(a int);
SELECT create_distributed_table('dist_table_2', 'a');
SELECT create_reference_table('dist_table_2');
ALTER TABLE dist_table_1 ADD COLUMN b int;
SELECT master_add_node('localhost', :master_port, groupid => 0);
SELECT master_disable_node('localhost', :worker_1_port);
SELECT master_disable_node('localhost', :worker_2_port);
SELECT master_remove_node('localhost', :worker_1_port);
SELECT master_remove_node('localhost', :worker_2_port);
-- master_update_node should succeed
SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT master_update_node(:worker_2_nodeid, 'localhost', 4444);
SELECT master_update_node(:worker_2_nodeid, 'localhost', :worker_2_port);
ALTER SYSTEM SET citus.metadata_sync_interval TO DEFAULT;
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO DEFAULT;
SELECT pg_reload_conf();
UPDATE pg_dist_node SET metadatasynced=true WHERE nodeport=:worker_1_port;
-- Cleanup -- Cleanup
\c - - - :master_port SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref; DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); DROP TABLE dist_table_1, dist_table_2;
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
RESET citus.shard_count; RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;