diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 54f87f9a9..1f8a86e40 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -44,6 +44,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/namespace_utils.h" +#include "distributed/pg_dist_node.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/version_compat.h" @@ -1109,7 +1110,8 @@ TriggerSyncMetadataToPrimaryNodes(void) * this because otherwise node activation might fail withing transaction blocks. */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - MarkNodeHasMetadata(workerNode->workerName, workerNode->workerPort, true); + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); triggerMetadataSync = true; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index a4b2e8492..f3e920733 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,8 +83,6 @@ char *EnableManualMetadataChangesForUser = ""; static void EnsureSequentialModeMetadataOperations(void); static List * GetDistributedTableDDLEvents(Oid relationId); static char * LocalGroupIdUpdateCommand(int32 groupId); -static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, - int attrNum, bool value); static List * SequenceDependencyCommandList(Oid relationId); static char * TruncateTriggerCreateCommand(Oid relationId); static char * SchemaOwnerName(Oid objectId); @@ -170,9 +168,6 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) { char *escapedNodeName = quote_literal_cstr(nodeNameString); - /* fail if metadata synchronization doesn't succeed */ - bool raiseInterrupts = true; - CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); @@ -209,7 +204,21 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) } UseCoordinatedTransaction(); - MarkNodeHasMetadata(nodeNameString, nodePort, true); + + /* + * One would normally expect to set hasmetadata first, and then metadata sync. + * However, at this point we do the order reverse. + * We first set metadatasynced, and then hasmetadata; since setting columns for + * nodes with metadatasynced==false could cause errors. + * (See ErrorIfAnyMetadataNodeOutOfSync) + * We can safely do that because we are in a coordinated transaction and the changes + * are only visible to our own transaction. + * If anything goes wrong, we are going to rollback all the changes. + */ + workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum( + true)); if (!NodeIsPrimary(workerNode)) { @@ -220,8 +229,9 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) return; } + /* fail if metadata synchronization doesn't succeed */ + bool raiseInterrupts = true; SyncMetadataSnapshotToNode(workerNode, raiseInterrupts); - MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); } @@ -303,9 +313,6 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - MarkNodeHasMetadata(nodeNameString, nodePort, false); - MarkNodeMetadataSynced(nodeNameString, nodePort, false); - if (clearMetadata) { if (NodeIsPrimary(workerNode)) @@ -326,6 +333,11 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) } } + workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum( + false)); + workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(false)); + PG_RETURN_VOID(); } @@ -1115,83 +1127,6 @@ LocalGroupIdUpdateCommand(int32 groupId) } -/* - * MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in - * pg_dist_node to hasMetadata. - */ -void -MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata) -{ - UpdateDistNodeBoolAttr(nodeName, nodePort, - Anum_pg_dist_node_hasmetadata, - hasMetadata); -} - - -/* - * MarkNodeMetadataSynced function sets the metadatasynced column of the - * specified worker in pg_dist_node to the given value. - */ -void -MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced) -{ - UpdateDistNodeBoolAttr(nodeName, nodePort, - Anum_pg_dist_node_metadatasynced, - synced); -} - - -/* - * UpdateDistNodeBoolAttr updates a boolean attribute of the specified worker - * to the given value. - */ -static void -UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, int attrNum, bool value) -{ - const bool indexOK = false; - - ScanKeyData scanKey[2]; - Datum values[Natts_pg_dist_node]; - bool isnull[Natts_pg_dist_node]; - bool replace[Natts_pg_dist_node]; - - Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); - TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); - - ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename, - BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodePort)); - - SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, - NULL, 2, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - if (!HeapTupleIsValid(heapTuple)) - { - ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", - nodeName, nodePort))); - } - - memset(replace, 0, sizeof(replace)); - - values[attrNum - 1] = BoolGetDatum(value); - isnull[attrNum - 1] = false; - replace[attrNum - 1] = true; - - heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); - - CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple); - - CitusInvalidateRelcacheByRelid(DistNodeRelationId()); - - CommandCounterIncrement(); - - systable_endscan(scanDescriptor); - table_close(pgDistNode, NoLock); -} - - /* * SequenceDDLCommandsForTable returns a list of commands which create sequences (and * their schemas) to run on workers before creating the relation. The sequence creation @@ -1840,6 +1775,7 @@ SyncMetadataToNodes(void) return METADATA_SYNC_FAILED_LOCK; } + List *syncedWorkerList = NIL; List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) @@ -1847,7 +1783,6 @@ SyncMetadataToNodes(void) if (workerNode->hasMetadata && !workerNode->metadataSynced) { bool raiseInterrupts = false; - if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) { ereport(WARNING, (errmsg("failed to sync metadata to %s:%d", @@ -1857,12 +1792,27 @@ SyncMetadataToNodes(void) } else { - MarkNodeMetadataSynced(workerNode->workerName, - workerNode->workerPort, true); + /* we add successfully synced nodes to set metadatasynced column later */ + syncedWorkerList = lappend(syncedWorkerList, workerNode); } } } + foreach_ptr(workerNode, syncedWorkerList) + { + SetWorkerColumnOptional(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + + /* we fetch the same node again to check if it's synced or not */ + WorkerNode *nodeUpdated = FindWorkerNode(workerNode->workerName, + workerNode->workerPort); + if (!nodeUpdated->metadataSynced) + { + /* set the result to FAILED to trigger the sync again */ + result = METADATA_SYNC_FAILED_SYNC; + } + } + return result; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 284d78944..7f593087a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -109,6 +109,11 @@ static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); +static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, + int columnIndex, + Datum value); +static char * NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata); +static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced); static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); @@ -579,8 +584,8 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) */ if (ClusterHasDistributedFunctionWithDistArgument()) { - MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort, - true); + SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata, + BoolGetDatum(true)); TriggerMetadataSyncOnCommit(); } } @@ -1554,11 +1559,85 @@ AddNodeMetadata(char *nodeName, int32 nodePort, /* * SetWorkerColumn function sets the column with the specified index + * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly. + * It also sends the same command for node update to other metadata nodes. + * If anything fails during the transaction, we rollback it. + * Returns the new worker node after the modification. + */ +WorkerNode * +SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) +{ + workerNode = SetWorkerColumnLocalOnly(workerNode, columnIndex, value); + + char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode, + columnIndex, + value); + + SendCommandToWorkersWithMetadata(metadataSyncCommand); + + return workerNode; +} + + +/* + * SetWorkerColumnOptional function sets the column with the specified index + * on the worker in pg_dist_node, by calling SetWorkerColumnLocalOnly. + * It also sends the same command optionally for node update to other metadata nodes, + * meaning that failures are ignored. Returns the new worker node after the modification. + */ +WorkerNode * +SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) +{ + char *metadataSyncCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode, + columnIndex, + value); + + List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + ShareLock); + + /* open connections in parallel */ + WorkerNode *worker = NULL; + foreach_ptr(worker, workerNodeList) + { + bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction( + worker->workerName, worker->workerPort, + CurrentUserName(), + list_make1(metadataSyncCommand)); + + if (!success) + { + /* metadata out of sync, mark the worker as not synced */ + ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d " + "is failed on node %s:%d." + "Metadata on %s:%d is marked as out of sync.", + workerNode->workerName, workerNode->workerPort, + worker->workerName, worker->workerPort, + worker->workerName, worker->workerPort))); + + SetWorkerColumnLocalOnly(worker, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(false)); + } + else if (workerNode->nodeId == worker->nodeId) + { + /* + * If this is the node we want to update and it is updated succesfully, + * then we can safely update the flag on the coordinator as well. + */ + SetWorkerColumnLocalOnly(workerNode, columnIndex, value); + } + } + + return FindWorkerNode(workerNode->workerName, workerNode->workerPort); +} + + +/* + * SetWorkerColumnLocalOnly function sets the column with the specified index * (see pg_dist_node.h) on the worker in pg_dist_node. * It returns the new worker node after the modification. */ -static WorkerNode * -SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) +WorkerNode * +SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value) { Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); @@ -1567,47 +1646,6 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) Datum values[Natts_pg_dist_node]; bool isnull[Natts_pg_dist_node]; bool replace[Natts_pg_dist_node]; - char *metadataSyncCommand = NULL; - - - switch (columnIndex) - { - case Anum_pg_dist_node_hasmetadata: - { - ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata"); - - break; - } - - case Anum_pg_dist_node_isactive: - { - ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive"); - - metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId, - DatumGetBool(value)); - break; - } - - case Anum_pg_dist_node_shouldhaveshards: - { - metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId, - DatumGetBool(value)); - break; - } - - case Anum_pg_dist_node_metadatasynced: - { - ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced"); - - break; - } - - default: - { - ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", - workerNode->workerName, workerNode->workerPort))); - } - } if (heapTuple == NULL) { @@ -1631,12 +1669,99 @@ SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value) table_close(pgDistNode, NoLock); - /* we also update the column at worker nodes */ - SendCommandToWorkersWithMetadata(metadataSyncCommand); return newWorkerNode; } +/* + * GetMetadataSyncCommandToSetNodeColumn checks if the given workerNode and value is + * valid or not. Then it returns the necessary metadata sync command as a string. + */ +static char * +GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum + value) +{ + char *metadataSyncCommand = NULL; + + switch (columnIndex) + { + case Anum_pg_dist_node_hasmetadata: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "hasmetadata"); + metadataSyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + case Anum_pg_dist_node_isactive: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "isactive"); + + metadataSyncCommand = NodeStateUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + case Anum_pg_dist_node_shouldhaveshards: + { + metadataSyncCommand = ShouldHaveShardsUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + case Anum_pg_dist_node_metadatasynced: + { + ErrorIfCoordinatorMetadataSetFalse(workerNode, value, "metadatasynced"); + metadataSyncCommand = NodeMetadataSyncedUpdateCommand(workerNode->nodeId, + DatumGetBool(value)); + break; + } + + default: + { + ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", + workerNode->workerName, workerNode->workerPort))); + } + } + + return metadataSyncCommand; +} + + +/* + * NodeHasmetadataUpdateCommand generates and returns a SQL UPDATE command + * that updates the hasmetada column of pg_dist_node, for the given nodeid. + */ +static char * +NodeHasmetadataUpdateCommand(uint32 nodeId, bool hasMetadata) +{ + StringInfo updateCommand = makeStringInfo(); + char *hasMetadataString = hasMetadata ? "TRUE" : "FALSE"; + appendStringInfo(updateCommand, + "UPDATE pg_dist_node SET hasmetadata = %s " + "WHERE nodeid = %u", + hasMetadataString, nodeId); + return updateCommand->data; +} + + +/* + * NodeMetadataSyncedUpdateCommand generates and returns a SQL UPDATE command + * that updates the metadataSynced column of pg_dist_node, for the given nodeid. + */ +static char * +NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced) +{ + StringInfo updateCommand = makeStringInfo(); + char *hasMetadataString = metadataSynced ? "TRUE" : "FALSE"; + appendStringInfo(updateCommand, + "UPDATE pg_dist_node SET metadatasynced = %s " + "WHERE nodeid = %u", + hasMetadataString, nodeId); + return updateCommand->data; +} + + /* * ErrorIfCoordinatorMetadataSetFalse throws an error if the input node * is the coordinator and the value is false. @@ -1655,28 +1780,28 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi /* * SetShouldHaveShards function sets the shouldhaveshards column of the - * specified worker in pg_dist_node. + * specified worker in pg_dist_node. also propagates this to other metadata nodes. * It returns the new worker node after the modification. */ static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards) { - return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, - BoolGetDatum(shouldHaveShards)); + return SetWorkerColumn(workerNode, Anum_pg_dist_node_shouldhaveshards, BoolGetDatum( + shouldHaveShards)); } /* * SetNodeState function sets the isactive column of the specified worker in - * pg_dist_node to isActive. + * pg_dist_node to isActive. Also propagates this to other metadata nodes. * It returns the new worker node after the modification. */ static WorkerNode * SetNodeState(char *nodeName, int nodePort, bool isActive) { WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, - BoolGetDatum(isActive)); + return SetWorkerColumn(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum( + isActive)); } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 2917ff39a..89a00aa2b 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -48,8 +48,6 @@ extern List * GrantOnSchemaDDLCommands(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, int32 groupId); extern void CreateTableMetadataOnWorkers(Oid relationId); -extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata); -extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); extern void SyncMetadataToNodesMain(Datum main_arg); extern void SignalMetadataSyncDaemon(Oid database, int sig); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 158c5a7ce..82118f103 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -96,6 +96,11 @@ extern bool NodeIsPrimary(WorkerNode *worker); extern bool NodeIsSecondary(WorkerNode *worker); extern bool NodeIsReadable(WorkerNode *worker); extern bool NodeIsCoordinator(WorkerNode *node); +extern WorkerNode * SetWorkerColumn(WorkerNode *workerNode, int columnIndex, Datum value); +extern WorkerNode * SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum + value); +extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, + Datum value); extern uint32 CountPrimariesWithMetadata(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void); diff --git a/src/test/regress/expected/drop_column_partitioned_table.out b/src/test/regress/expected/drop_column_partitioned_table.out index 51ab90c6f..a32602511 100644 --- a/src/test/regress/expected/drop_column_partitioned_table.out +++ b/src/test/regress/expected/drop_column_partitioned_table.out @@ -248,7 +248,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2000 WHERE measureid = 3; -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate - -> Bitmap Heap Scan on sensors_2000_2580005 sensors_2000 + -> Bitmap Heap Scan on sensors_2000_2580005 sensors_xxx Recheck Cond: (measureid = 3) -> Bitmap Index Scan on sensors_2000_pkey_2580005 Index Cond: (measureid = 3) @@ -263,7 +263,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2001 WHERE measureid = 3; -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate - -> Bitmap Heap Scan on sensors_2001_2580009 sensors_2001 + -> Bitmap Heap Scan on sensors_2001_2580009 sensors_xxx Recheck Cond: (measureid = 3) -> Bitmap Index Scan on sensors_2001_pkey_2580009 Index Cond: (measureid = 3) @@ -278,7 +278,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2002 WHERE measureid = 3; -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate - -> Bitmap Heap Scan on sensors_2002_2580013 sensors_2002 + -> Bitmap Heap Scan on sensors_2002_2580013 sensors_xxx Recheck Cond: (measureid = 3) -> Bitmap Index Scan on sensors_2002_pkey_2580013 Index Cond: (measureid = 3) @@ -293,7 +293,7 @@ EXPLAIN (COSTS FALSE) SELECT count(*) FROM sensors_2003 WHERE measureid = 3; -> Task Node: host=localhost port=xxxxx dbname=regression -> Aggregate - -> Bitmap Heap Scan on sensors_2003_2580017 sensors_2003 + -> Bitmap Heap Scan on sensors_2003_2580017 sensors_xxx Recheck Cond: (measureid = 3) -> Bitmap Index Scan on sensors_2003_pkey_2580017 Index Cond: (measureid = 3) diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 02bdfd78b..97d5f1c9d 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -266,7 +266,7 @@ SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------------------------------------------------------------------- - 1 | 1 | localhost | 57637 | default | t | t | primary | default | f | t + 1 | 1 | localhost | 57637 | default | t | t | primary | default | t | t 2 | 2 | localhost | 57638 | default | f | t | primary | default | f | t 4 | 1 | localhost | 8888 | default | f | t | secondary | default | f | t 5 | 1 | localhost | 8889 | default | f | t | secondary | second-cluster | f | t diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index 01676131c..0d01ead01 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -26,7 +26,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY placementid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL diff --git a/src/test/regress/expected/start_stop_metadata_sync.out b/src/test/regress/expected/start_stop_metadata_sync.out index accfd9ae5..15a1fcf18 100644 --- a/src/test/regress/expected/start_stop_metadata_sync.out +++ b/src/test/regress/expected/start_stop_metadata_sync.out @@ -271,6 +271,148 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel (1 row) \c - - - :master_port +-- test synchronization for pg_dist_node flags +SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- + t | t | f + t | t | f +(2 rows) + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- + t | t | f + t | t | f +(2 rows) + +\c - - - :master_port +SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); + citus_set_node_property +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- + f | f | t + t | t | t +(2 rows) + +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +NOTICE: dropping metadata on the node (localhost,57638) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- +(0 rows) + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + hasmetadata | metadatasynced | shouldhaveshards +--------------------------------------------------------------------- +(0 rows) + +\c - - - :master_port +-- verify that mx workers are updated when disabling/activating nodes +SELECT citus_disable_node('localhost', :worker_1_port); +NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT master_activate_node('localhost', 57637) to activate this node back. + citus_disable_node +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + nodeport | isactive +--------------------------------------------------------------------- + 57637 | f + 57638 | t +(2 rows) + +\c - - - :master_port +SET client_min_messages TO ERROR; +SELECT citus_activate_node('localhost', :worker_1_port); + citus_activate_node +--------------------------------------------------------------------- + 17 +(1 row) + +\c - - - :worker_2_port +SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + nodeport | isactive +--------------------------------------------------------------------- + 57637 | t + 57638 | t +(2 rows) + +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +NOTICE: dropping metadata on the node (localhost,57638) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + SET search_path TO "start_stop_metadata_sync"; -- both start & stop metadata sync operations can be transactional BEGIN; diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index 0bd360b12..e1110d962 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -23,7 +23,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY placementid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL diff --git a/src/test/regress/sql/start_stop_metadata_sync.sql b/src/test/regress/sql/start_stop_metadata_sync.sql index 2fbdd1ec7..dc01cf87c 100644 --- a/src/test/regress/sql/start_stop_metadata_sync.sql +++ b/src/test/regress/sql/start_stop_metadata_sync.sql @@ -120,6 +120,55 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND r SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); \c - - - :master_port +-- test synchronization for pg_dist_node flags +SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false); +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); +SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false); + +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :master_port +SELECT citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true); +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true); +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + +\c - - - :worker_1_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :worker_2_port +SELECT hasmetadata, metadatasynced, shouldhaveshards FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :master_port +-- verify that mx workers are updated when disabling/activating nodes +SELECT citus_disable_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + +\c - - - :worker_2_port +SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :master_port +SET client_min_messages TO ERROR; +SELECT citus_activate_node('localhost', :worker_1_port); + +\c - - - :worker_2_port +SELECT nodeport, isactive FROM pg_dist_node WHERE nodeport IN (:worker_1_port, :worker_2_port) ORDER BY nodeport; + +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + SET search_path TO "start_stop_metadata_sync"; -- both start & stop metadata sync operations can be transactional