diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index a7890d627..5534de9aa 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -307,10 +307,9 @@ GetNextShardId() /* - * master_get_new_placementid allocates and returns a unique placementId for - * the placement to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having shardId - * collisions. + * master_get_new_placementid is a user facing wrapper function around + * GetNextPlacementId() which allocates and returns a unique placement id for the + * placement to be created. * * NB: This can be called by any user; for now we have decided that that's * ok. We might want to restrict this to users part of a specific role or such @@ -318,25 +317,51 @@ GetNextShardId() */ Datum master_get_new_placementid(PG_FUNCTION_ARGS) +{ + uint64 placementId = 0; + Datum placementIdDatum = 0; + + EnsureSchemaNode(); + + placementId = GetNextPlacementId(); + placementIdDatum = Int64GetDatum(placementId); + + PG_RETURN_DATUM(placementIdDatum); +} + + +/* + * GetNextPlacementId allocates and returns a unique placementId for + * the placement to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having shardId + * collisions. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +uint64 +GetNextPlacementId(void) { text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - Datum shardIdDatum = 0; - - EnsureSchemaNode(); + Datum placementIdDatum = 0; + uint64 placementId = 0; GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - /* generate new and unique shardId from sequence */ - shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + /* generate new and unique placement id from sequence */ + placementIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); SetUserIdAndSecContext(savedUserId, savedSecurityContext); - PG_RETURN_DATUM(shardIdDatum); + placementId = DatumGetInt64(placementIdDatum); + + return placementId; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 1515003b2..9351b69b5 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -670,6 +670,24 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) } +/* + * PlacementUpsertCommand creates a SQL command for upserting a pg_dist_shard_placment + * entry with the given properties. In the case of a conflict on placementId, the command + * updates all properties (excluding the placementId) with the given ones. + */ +char * +PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, + uint64 shardLength, char *nodeName, uint32 nodePort) +{ + StringInfo command = makeStringInfo(); + + appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength, + quote_literal_cstr(nodeName), nodePort, placementId); + + return command->data; +} + + /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 3b67919f6..a368bccb2 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -20,6 +20,7 @@ #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" @@ -248,18 +249,45 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval) nodeName, nodePort, missingWorkerOk); + /* + * Although this function is used for reference tables and reference table shard + * placements always have shardState = FILE_FINALIZED, in case of an upgrade of + * a non-reference table to reference table, unhealty placements may exist. In + * this case, we repair the shard placement and update its state in + * pg_dist_shard_placement table. + */ if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { + uint64 placementId = 0; + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, ddlCommandList); if (targetPlacement == NULL) { - InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, 0, + placementId = GetNextPlacementId(); + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, nodeName, nodePort); } else { - UpdateShardPlacementState(targetPlacement->placementId, FILE_FINALIZED); + placementId = targetPlacement->placementId; + UpdateShardPlacementState(placementId, FILE_FINALIZED); + } + + /* + * Although ReplicateShardToAllWorkers is used only for reference tables, + * during the upgrade phase, the placements are created before the table is + * marked as a reference table. All metadata (including the placement + * metadata) will be copied to workers after all reference table changed + * are finished. + */ + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + char *placementCommand = PlacementUpsertCommand(shardId, placementId, + FILE_FINALIZED, 0, + nodeName, nodePort); + + SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); } } } diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 3f7c13dc6..72a2baebf 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -97,6 +97,7 @@ extern bool SchemaNode(void); /* Function declarations local to the distributed module */ extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); +extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName); extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 56e91e5f8..4c9a29544 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -31,6 +31,8 @@ extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * CreateSchemaDDLCommand(Oid schemaId); +extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, + uint64 shardLength, char *nodeName, uint32 nodePort); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" @@ -38,6 +40,16 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)" +#define UPSERT_PLACEMENT "INSERT INTO pg_dist_shard_placement " \ + "(shardid, shardstate, shardlength, " \ + "nodename, nodeport, placementid) " \ + "VALUES (%lu, %d, %lu, %s, %d, %lu) " \ + "ON CONFLICT (placementid) DO UPDATE SET " \ + "shardid = EXCLUDED.shardid, " \ + "shardstate = EXCLUDED.shardstate, " \ + "shardlength = EXCLUDED.shardlength, " \ + "nodename = EXCLUDED.nodename, " \ + "nodeport = EXCLUDED.nodeport" #endif /* METADATA_SYNC_H */ diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index be0c3fe8d..64a908322 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -8,6 +8,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000; SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id \gset ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset -- Create the necessary test utility function CREATE FUNCTION master_metadata_snapshot() RETURNS text[] @@ -1120,7 +1122,7 @@ SELECT create_distributed_table('mx_table', 'a'); SELECT master_add_node('localhost', :worker_2_port); master_add_node --------------------------------- - (3,3,localhost,57638,default,f) + (4,4,localhost,57638,default,f) (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_port); @@ -1147,20 +1149,20 @@ SELECT * FROM mx_table ORDER BY a; \c - mx_user - :worker_2_port SELECT nextval('mx_table_b_seq'); - nextval ------------------ - 844424930131969 + nextval +------------------ + 1125899906842625 (1 row) INSERT INTO mx_table (a) VALUES (39); INSERT INTO mx_table (a) VALUES (40); SELECT * FROM mx_table ORDER BY a; - a | b -----+----------------- - 37 | 281474976710658 - 38 | 281474976710659 - 39 | 844424930131970 - 40 | 844424930131971 + a | b +----+------------------ + 37 | 281474976710658 + 38 | 281474976710659 + 39 | 1125899906842626 + 40 | 1125899906842627 (4 rows) \c - mx_user - :master_port @@ -1265,12 +1267,77 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; ---------+------------+-------------+----------+----------+------------- (0 rows) +-- Check that master_add_node propagates the metadata about new placements of a reference table +\c - - - :master_port +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); + create_reference_table +------------------------ + +(1 row) + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 +(1 row) + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 +(1 row) + +\c - - - :master_port +SELECT master_add_node('localhost', :worker_2_port); +NOTICE: Replicating reference table "mx_ref" to all workers + master_add_node +--------------------------------- + (5,5,localhost,57638,default,f) +(1 row) + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 + 1310184 | localhost | 57638 +(2 rows) + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + shardid | nodename | nodeport +---------+-----------+---------- + 1310184 | localhost | 57637 + 1310184 | localhost | 57638 +(2 rows) + +\c - - - :master_port +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; +DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); stop_metadata_sync_to_node ---------------------------- @@ -1286,5 +1353,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.multi_shard_commit_protocol; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 795e48247..394397d32 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -13,6 +13,9 @@ SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_pla \gset ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; +SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset + -- Create the necessary test utility function CREATE FUNCTION master_metadata_snapshot() RETURNS text[] @@ -544,11 +547,45 @@ DROP TABLE mx_ref; SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid; SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid; +-- Check that master_add_node propagates the metadata about new placements of a reference table +\c - - - :master_port +CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; +SELECT master_remove_node('localhost', :worker_2_port); +CREATE TABLE mx_ref (col_1 int, col_2 text); +SELECT create_reference_table('mx_ref'); + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :master_port +SELECT master_add_node('localhost', :worker_2_port); + +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :worker_1_port +SELECT shardid, nodename, nodeport +FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement +WHERE logicalrelid='mx_ref'::regclass; + +\c - - - :master_port +INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); +DROP TABLE tmp_shard_placement; + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_testing_schema.mx_test_table; +DROP TABLE mx_ref; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); @@ -556,5 +593,7 @@ RESET citus.shard_count; RESET citus.shard_replication_factor; RESET citus.multi_shard_commit_protocol; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;