Propagate Metadata to Workers on `create_reference_table` call.

pull/1938/head
Eren Basak 2017-01-06 11:17:25 +03:00
parent a7f8d4bc88
commit 86e772e4ae
6 changed files with 178 additions and 61 deletions

View File

@ -79,7 +79,7 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
static void CreateTableMetadataOnWorkers(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -177,18 +177,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
if (ShouldSyncTableMetadata(relationId))
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
CreateTableMetadataOnWorkers(relationId);
}
PG_RETURN_VOID();
@ -244,6 +233,8 @@ CreateReferenceTable(Oid relationId)
/* now, create the single shard replicated to all nodes */
CreateReferenceTableShard(relationId);
CreateTableMetadataOnWorkers(relationId);
}
@ -989,3 +980,29 @@ ColumnType(Oid relationId, char *columnName)
return columnType;
}
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
static void
CreateTableMetadataOnWorkers(Oid relationId)
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
/* prevenet recursive propagation */
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -436,19 +436,30 @@ DistributionCreateCommand(DistTableCacheEntry *cacheEntry)
char *partitionKeyString = cacheEntry->partitionKeyString;
char *qualifiedRelationName =
generate_qualified_relation_name(relationId);
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
uint32 colocationId = cacheEntry->colocationId;
char replicationModel = cacheEntry->replicationModel;
StringInfo tablePartitionKeyString = makeStringInfo();
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
appendStringInfo(tablePartitionKeyString, "NULL");
}
else
{
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
appendStringInfo(tablePartitionKeyString, "column_name_to_column(%s,%s)",
quote_literal_cstr(qualifiedRelationName),
quote_literal_cstr(partitionKeyColumnName));
}
appendStringInfo(insertDistributionCommand,
"INSERT INTO pg_dist_partition "
"(logicalrelid, partmethod, partkey, colocationid, repmodel) "
"VALUES "
"(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')",
"(%s::regclass, '%c', %s, %d, '%c')",
quote_literal_cstr(qualifiedRelationName),
distributionMethod,
quote_literal_cstr(qualifiedRelationName),
quote_literal_cstr(partitionKeyColumnName),
tablePartitionKeyString->data,
colocationId,
replicationModel);
@ -511,7 +522,6 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo();
int shardCount = list_length(shardIntervalList);
int processedShardCount = 0;
int processedShardPlacementCount = 0;
/* if there are no shards, return empty list */
if (shardCount == 0)
@ -519,13 +529,6 @@ ShardListInsertCommand(List *shardIntervalList)
return commandList;
}
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
"VALUES ");
/* add placements to insertPlacementCommand */
foreach(shardIntervalCell, shardIntervalList)
{
@ -533,25 +536,33 @@ ShardListInsertCommand(List *shardIntervalList)
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
ShardPlacement *placement = NULL;
ListCell *shardPlacementCell = NULL;
/* the function only handles single placement per shard */
Assert(list_length(shardPlacementList) == 1);
placement = (ShardPlacement *) linitial(shardPlacementList);
appendStringInfo(insertPlacementCommand,
"(%lu, 1, %lu, %s, %d, %lu)",
shardId,
placement->shardLength,
quote_literal_cstr(placement->nodeName),
placement->nodePort,
placement->placementId);
processedShardPlacementCount++;
if (processedShardPlacementCount != shardCount)
foreach(shardPlacementCell, shardPlacementList)
{
appendStringInfo(insertPlacementCommand, ",");
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
if (insertPlacementCommand->len == 0)
{
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
"VALUES ");
}
else
{
appendStringInfo(insertPlacementCommand, ",");
}
appendStringInfo(insertPlacementCommand,
"(%lu, 1, %lu, %s, %d, %lu)",
shardId,
placement->shardLength,
quote_literal_cstr(placement->nodeName),
placement->nodePort,
placement->placementId);
}
}
@ -573,17 +584,37 @@ ShardListInsertCommand(List *shardIntervalList)
Oid distributedRelationId = shardInterval->relationId;
char *qualifiedRelationName = generate_qualified_relation_name(
distributedRelationId);
StringInfo minHashToken = makeStringInfo();
StringInfo maxHashToken = makeStringInfo();
int minHashToken = DatumGetInt32(shardInterval->minValue);
int maxHashToken = DatumGetInt32(shardInterval->maxValue);
if (shardInterval->minValueExists)
{
appendStringInfo(minHashToken, "'%d'", DatumGetInt32(
shardInterval->minValue));
}
else
{
appendStringInfo(minHashToken, "NULL");
}
if (shardInterval->maxValueExists)
{
appendStringInfo(maxHashToken, "'%d'", DatumGetInt32(
shardInterval->maxValue));
}
else
{
appendStringInfo(maxHashToken, "NULL");
}
appendStringInfo(insertShardCommand,
"(%s::regclass, %lu, '%c', '%d', '%d')",
"(%s::regclass, %lu, '%c', %s, %s)",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
minHashToken,
maxHashToken);
minHashToken->data,
maxHashToken->data);
processedShardCount++;
if (processedShardCount != shardCount)

View File

@ -23,8 +23,8 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
--------------+------------+---------+--------------+----------
(0 rows)
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and
-- pg_dist_node entries
-- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot());
unnest
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@ -1181,12 +1181,59 @@ DROP USER mx_user;
DROP USER mx_user;
\c - - - :worker_2_port
DROP USER mx_user;
-- Check that create_reference_table creates the metadata on workers
\c - - - :master_port
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
create_reference_table
------------------------
(1 row)
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
\c - - - :worker_1_port
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
SELECT
logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport
FROM
pg_dist_partition
NATURAL JOIN pg_dist_shard
NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_ref'::regclass;
logicalrelid | partmethod | repmodel | shardid | placementid | nodename | nodeport
--------------+------------+----------+---------+-------------+-----------+----------
mx_ref | n | t | 1310183 | 100184 | localhost | 57638
mx_ref | n | t | 1310183 | 100183 | localhost | 57637
(2 rows)
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Cleanup
SELECT worker_drop_distributed_table('mx_ref'::regclass);
worker_drop_distributed_table
-------------------------------
(1 row)
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------

View File

@ -46,7 +46,7 @@ SELECT master_create_empty_shard('test_truncate_append');
(1 row)
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid;
shardid
---------
1210000
@ -113,7 +113,7 @@ SELECT count(*) FROM test_truncate_range;
(1 row)
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
shardid
---------
1210003
@ -130,7 +130,7 @@ SELECT count(*) FROM test_truncate_range;
(1 row)
-- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
shardid
---------
1210003
@ -190,7 +190,7 @@ SELECT count(*) FROM test_truncate_hash;
(1 row)
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid
---------
(0 rows)
@ -221,7 +221,7 @@ SELECT count(*) FROM test_truncate_hash;
(1 row)
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid
---------
1210006

View File

@ -25,8 +25,8 @@ COMMENT ON FUNCTION master_metadata_snapshot()
-- Show that none of the existing tables are qualified to be MX tables
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and
-- pg_dist_node entries
-- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot());
-- Create a test table with constraints and SERIAL
@ -506,11 +506,33 @@ DROP USER mx_user;
\c - - - :worker_2_port
DROP USER mx_user;
-- Check that create_reference_table creates the metadata on workers
\c - - - :master_port
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT
logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport
FROM
pg_dist_partition
NATURAL JOIN pg_dist_shard
NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_ref'::regclass;
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Cleanup
SELECT worker_drop_distributed_table('mx_ref'::regclass);
\c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -31,7 +31,7 @@ SELECT master_create_empty_shard('test_truncate_append');
SELECT master_create_empty_shard('test_truncate_append');
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_append;
@ -79,7 +79,7 @@ INSERT INTO test_truncate_range values (100);
SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_range;
@ -87,7 +87,7 @@ TRUNCATE TABLE test_truncate_range;
SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
-- verify that truncate can be aborted
INSERT INTO test_truncate_range VALUES (1);
@ -117,7 +117,7 @@ INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_hash;
@ -136,7 +136,7 @@ TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
-- verify that truncate can be aborted
INSERT INTO test_truncate_hash VALUES (1);