Propagate new reference table placement metadata on `master_add_node`

pull/1103/head
Eren Basak 2017-01-06 18:10:15 +03:00
parent 23b2619412
commit be78769ae4
7 changed files with 214 additions and 22 deletions

View File

@ -307,10 +307,9 @@ GetNextShardId()
/*
* master_get_new_placementid allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having shardId
* collisions.
* master_get_new_placementid is a user facing wrapper function around
* GetNextPlacementId() which allocates and returns a unique placement id for the
* placement to be created.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
@ -318,25 +317,51 @@ GetNextShardId()
*/
Datum
master_get_new_placementid(PG_FUNCTION_ARGS)
{
uint64 placementId = 0;
Datum placementIdDatum = 0;
EnsureSchemaNode();
placementId = GetNextPlacementId();
placementIdDatum = Int64GetDatum(placementId);
PG_RETURN_DATUM(placementIdDatum);
}
/*
* GetNextPlacementId allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having shardId
* collisions.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
* at some later point.
*/
uint64
GetNextPlacementId(void)
{
text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum shardIdDatum = 0;
EnsureSchemaNode();
Datum placementIdDatum = 0;
uint64 placementId = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
/* generate new and unique placement id from sequence */
placementIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum);
placementId = DatumGetInt64(placementIdDatum);
return placementId;
}

View File

@ -670,6 +670,24 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
}
/*
* PlacementUpsertCommand creates a SQL command for upserting a pg_dist_shard_placment
* entry with the given properties. In the case of a conflict on placementId, the command
* updates all properties (excluding the placementId) with the given ones.
*/
char *
PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength,
quote_literal_cstr(nodeName), nodePort, placementId);
return command->data;
}
/*
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string.

View File

@ -20,6 +20,7 @@
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
@ -248,18 +249,45 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
nodeName, nodePort,
missingWorkerOk);
/*
* Although this function is used for reference tables and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of
* a non-reference table to reference table, unhealty placements may exist. In
* this case, we repair the shard placement and update its state in
* pg_dist_shard_placement table.
*/
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{
uint64 placementId = 0;
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
ddlCommandList);
if (targetPlacement == NULL)
{
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, 0,
placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0,
nodeName, nodePort);
}
else
{
UpdateShardPlacementState(targetPlacement->placementId, FILE_FINALIZED);
placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED);
}
/*
* Although ReplicateShardToAllWorkers is used only for reference tables,
* during the upgrade phase, the placements are created before the table is
* marked as a reference table. All metadata (including the placement
* metadata) will be copied to workers after all reference table changed
* are finished.
*/
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0,
nodeName, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand);
}
}
}

View File

@ -97,6 +97,7 @@ extern bool SchemaNode(void);
/* Function declarations local to the distributed module */
extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId);

View File

@ -31,6 +31,8 @@ extern List * ShardListInsertCommand(List *shardIntervalList);
extern char * NodeDeleteCommand(uint32 nodeId);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
@ -38,6 +40,16 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)"
#define UPSERT_PLACEMENT "INSERT INTO pg_dist_shard_placement " \
"(shardid, shardstate, shardlength, " \
"nodename, nodeport, placementid) " \
"VALUES (%lu, %d, %lu, %s, %d, %lu) " \
"ON CONFLICT (placementid) DO UPDATE SET " \
"shardid = EXCLUDED.shardid, " \
"shardstate = EXCLUDED.shardstate, " \
"shardlength = EXCLUDED.shardlength, " \
"nodename = EXCLUDED.nodename, " \
"nodeport = EXCLUDED.nodeport"
#endif /* METADATA_SYNC_H */

View File

@ -8,6 +8,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000;
SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id
\gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function
CREATE FUNCTION master_metadata_snapshot()
RETURNS text[]
@ -1120,7 +1122,7 @@ SELECT create_distributed_table('mx_table', 'a');
SELECT master_add_node('localhost', :worker_2_port);
master_add_node
---------------------------------
(3,3,localhost,57638,default,f)
(4,4,localhost,57638,default,f)
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
@ -1148,19 +1150,19 @@ SELECT * FROM mx_table ORDER BY a;
\c - mx_user - :worker_2_port
SELECT nextval('mx_table_b_seq');
nextval
-----------------
844424930131969
------------------
1125899906842625
(1 row)
INSERT INTO mx_table (a) VALUES (39);
INSERT INTO mx_table (a) VALUES (40);
SELECT * FROM mx_table ORDER BY a;
a | b
----+-----------------
----+------------------
37 | 281474976710658
38 | 281474976710659
39 | 844424930131970
40 | 844424930131971
39 | 1125899906842626
40 | 1125899906842627
(4 rows)
\c - mx_user - :master_port
@ -1265,12 +1267,77 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
---------+------------+-------------+----------+----------+-------------
(0 rows)
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
create_reference_table
------------------------
(1 row)
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
(1 row)
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
(1 row)
\c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "mx_ref" to all workers
master_add_node
---------------------------------
(5,5,localhost,57638,default,f)
(1 row)
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
1310184 | localhost | 57638
(2 rows)
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
1310184 | localhost | 57638
(2 rows)
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
@ -1286,5 +1353,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -13,6 +13,9 @@ SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_pla
\gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function
CREATE FUNCTION master_metadata_snapshot()
RETURNS text[]
@ -544,11 +547,45 @@ DROP TABLE mx_ref;
SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port);
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Cleanup
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
@ -556,5 +593,7 @@ RESET citus.shard_count;
RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;