mirror of https://github.com/citusdata/citus.git
Make start_metadata_sync_to_node UDF to propagate foreign-key constraints
parent
5e96e4f60e
commit
fb08093b00
|
@ -158,6 +158,7 @@ MetadataCreateCommands(void)
|
||||||
{
|
{
|
||||||
List *metadataSnapshotCommandList = NIL;
|
List *metadataSnapshotCommandList = NIL;
|
||||||
List *distributedTableList = DistributedTableList();
|
List *distributedTableList = DistributedTableList();
|
||||||
|
List *mxTableList = NIL;
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = WorkerNodeList();
|
||||||
ListCell *distributedTableCell = NULL;
|
ListCell *distributedTableCell = NULL;
|
||||||
char *nodeListInsertCommand = NULL;
|
char *nodeListInsertCommand = NULL;
|
||||||
|
@ -167,26 +168,67 @@ MetadataCreateCommands(void)
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||||
nodeListInsertCommand);
|
nodeListInsertCommand);
|
||||||
|
|
||||||
/* iterate over the distributed tables */
|
/* create the list of mx tables */
|
||||||
foreach(distributedTableCell, distributedTableList)
|
foreach(distributedTableCell, distributedTableList)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry =
|
DistTableCacheEntry *cacheEntry =
|
||||||
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
||||||
List *clusteredTableDDLEvents = NIL;
|
if (ShouldSyncTableMetadata(cacheEntry->relationId))
|
||||||
|
{
|
||||||
|
mxTableList = lappend(mxTableList, cacheEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* create the mx tables, but not the metadata */
|
||||||
|
foreach(distributedTableCell, mxTableList)
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *cacheEntry =
|
||||||
|
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
||||||
|
Oid relationId = cacheEntry->relationId;
|
||||||
|
|
||||||
|
List *commandList = GetTableDDLEvents(relationId);
|
||||||
|
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
|
||||||
|
|
||||||
|
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||||
|
commandList);
|
||||||
|
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||||
|
tableOwnerResetCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* construct the foreign key constraints after all tables are created */
|
||||||
|
foreach(distributedTableCell, mxTableList)
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *cacheEntry =
|
||||||
|
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
||||||
|
|
||||||
|
List *foreignConstraintCommands =
|
||||||
|
GetTableForeignConstraintCommands(cacheEntry->relationId);
|
||||||
|
|
||||||
|
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||||
|
foreignConstraintCommands);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* after all tables are created, create the metadata */
|
||||||
|
foreach(distributedTableCell, mxTableList)
|
||||||
|
{
|
||||||
|
DistTableCacheEntry *cacheEntry =
|
||||||
|
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
||||||
List *shardIntervalList = NIL;
|
List *shardIntervalList = NIL;
|
||||||
List *shardCreateCommandList = NIL;
|
List *shardCreateCommandList = NIL;
|
||||||
|
char *metadataCommand = NULL;
|
||||||
|
char *truncateTriggerCreateCommand = NULL;
|
||||||
Oid clusteredTableId = cacheEntry->relationId;
|
Oid clusteredTableId = cacheEntry->relationId;
|
||||||
|
|
||||||
/* add only clustered tables */
|
/* add the table metadata command first*/
|
||||||
if (!ShouldSyncTableMetadata(clusteredTableId))
|
metadataCommand = DistributionCreateCommand(cacheEntry);
|
||||||
{
|
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||||
continue;
|
metadataCommand);
|
||||||
}
|
|
||||||
|
|
||||||
/* add the DDL events first */
|
/* add the truncate trigger command after the table became distributed */
|
||||||
clusteredTableDDLEvents = GetDistributedTableDDLEvents(cacheEntry);
|
truncateTriggerCreateCommand =
|
||||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
TruncateTriggerCreateCommand(cacheEntry->relationId);
|
||||||
clusteredTableDDLEvents);
|
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||||
|
truncateTriggerCreateCommand);
|
||||||
|
|
||||||
/* add the pg_dist_shard{,placement} entries */
|
/* add the pg_dist_shard{,placement} entries */
|
||||||
shardIntervalList = LoadShardIntervalList(clusteredTableId);
|
shardIntervalList = LoadShardIntervalList(clusteredTableId);
|
||||||
|
@ -476,34 +518,6 @@ NodeDeleteCommand(uint32 nodeId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
|
|
||||||
* create this relation on a worker. This includes setting up any sequences,
|
|
||||||
* setting the owner of the table, and inserting into metadata tables.
|
|
||||||
*/
|
|
||||||
List *
|
|
||||||
GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry)
|
|
||||||
{
|
|
||||||
char *ownerResetCommand = NULL;
|
|
||||||
char *metadataCommand = NULL;
|
|
||||||
char *truncateTriggerCreateCommand = NULL;
|
|
||||||
Oid relationId = cacheEntry->relationId;
|
|
||||||
|
|
||||||
List *commandList = GetTableDDLEvents(relationId);
|
|
||||||
|
|
||||||
ownerResetCommand = TableOwnerResetCommand(relationId);
|
|
||||||
commandList = lappend(commandList, ownerResetCommand);
|
|
||||||
|
|
||||||
metadataCommand = DistributionCreateCommand(cacheEntry);
|
|
||||||
commandList = lappend(commandList, metadataCommand);
|
|
||||||
|
|
||||||
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
|
|
||||||
commandList = lappend(commandList, truncateTriggerCreateCommand);
|
|
||||||
|
|
||||||
return commandList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
|
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
|
||||||
* of a worker and returns the command in a string.
|
* of a worker and returns the command in a string.
|
||||||
|
|
|
@ -42,6 +42,8 @@ static void DeletePartitionRow(Oid distributedRelationId);
|
||||||
* not dropped as in the case of "DROP TABLE distributed_table;" command.
|
* not dropped as in the case of "DROP TABLE distributed_table;" command.
|
||||||
*
|
*
|
||||||
* The function errors out if the input relation Oid is not a regular or foreign table.
|
* The function errors out if the input relation Oid is not a regular or foreign table.
|
||||||
|
* The function is meant to be called only by the coordinator, therefore requires
|
||||||
|
* superuser privileges.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
@ -55,6 +57,8 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
||||||
ListCell *shardCell = NULL;
|
ListCell *shardCell = NULL;
|
||||||
char relationKind = '\0';
|
char relationKind = '\0';
|
||||||
|
|
||||||
|
EnsureSuperUser();
|
||||||
|
|
||||||
/* first check the relation type */
|
/* first check the relation type */
|
||||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||||
relationKind = distributedRelation->rd_rel->relkind;
|
relationKind = distributedRelation->rd_rel->relkind;
|
||||||
|
@ -96,8 +100,8 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* drop the table only */
|
/* drop the table with cascade since other tables may be referring to it */
|
||||||
performDeletion(&distributedTableObject, DROP_RESTRICT,
|
performDeletion(&distributedTableObject, DROP_CASCADE,
|
||||||
PERFORM_DELETION_INTERNAL);
|
PERFORM_DELETION_INTERNAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId);
|
||||||
extern char * NodeListInsertCommand(List *workerNodeList);
|
extern char * NodeListInsertCommand(List *workerNodeList);
|
||||||
extern List * ShardListInsertCommand(List *shardIntervalList);
|
extern List * ShardListInsertCommand(List *shardIntervalList);
|
||||||
extern char * NodeDeleteCommand(uint32 nodeId);
|
extern char * NodeDeleteCommand(uint32 nodeId);
|
||||||
extern List * GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry);
|
|
||||||
|
|
||||||
|
|
||||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
|
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
|
||||||
|
|
|
@ -249,6 +249,50 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table':
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Make sure that start_metadata_sync_to_node considers foreign key constraints
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE SCHEMA mx_testing_schema_2;
|
||||||
|
CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3));
|
||||||
|
CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text,
|
||||||
|
FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3));
|
||||||
|
SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE
|
||||||
|
pg_dist_partition SET repmodel='s'
|
||||||
|
WHERE
|
||||||
|
logicalrelid='mx_testing_schema.fk_test_1'::regclass
|
||||||
|
OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass;
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that foreign key metadata exists on the worker
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
\d mx_testing_schema_2.fk_test_2
|
||||||
|
Table "mx_testing_schema_2.fk_test_2"
|
||||||
|
Column | Type | Modifiers
|
||||||
|
--------+---------+-----------
|
||||||
|
col1 | integer |
|
||||||
|
col2 | integer |
|
||||||
|
col3 | text |
|
||||||
|
Foreign-key constraints:
|
||||||
|
"fk_test_2_col1_fkey" FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1(col1, col3)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
RESET citus.shard_replication_factor;
|
||||||
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
|
@ -83,6 +83,33 @@ SELECT * FROM pg_dist_colocation ORDER BY colocationid;
|
||||||
-- Make sure that truncate trigger has been set for the MX table on worker
|
-- Make sure that truncate trigger has been set for the MX table on worker
|
||||||
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
|
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
|
||||||
|
|
||||||
|
-- Make sure that start_metadata_sync_to_node considers foreign key constraints
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE SCHEMA mx_testing_schema_2;
|
||||||
|
|
||||||
|
CREATE TABLE mx_testing_schema.fk_test_1 (col1 int, col2 text, col3 int, UNIQUE(col1, col3));
|
||||||
|
CREATE TABLE mx_testing_schema_2.fk_test_2 (col1 int, col2 int, col3 text,
|
||||||
|
FOREIGN KEY (col1, col2) REFERENCES mx_testing_schema.fk_test_1 (col1, col3));
|
||||||
|
|
||||||
|
SELECT create_distributed_table('mx_testing_schema.fk_test_1', 'col1');
|
||||||
|
SELECT create_distributed_table('mx_testing_schema_2.fk_test_2', 'col1');
|
||||||
|
|
||||||
|
UPDATE
|
||||||
|
pg_dist_partition SET repmodel='s'
|
||||||
|
WHERE
|
||||||
|
logicalrelid='mx_testing_schema.fk_test_1'::regclass
|
||||||
|
OR logicalrelid='mx_testing_schema_2.fk_test_2'::regclass;
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
|
-- Check that foreign key metadata exists on the worker
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
\d mx_testing_schema_2.fk_test_2
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
RESET citus.shard_replication_factor;
|
||||||
|
|
||||||
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
-- Check that repeated calls to start_metadata_sync_to_node has no side effects
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
Loading…
Reference in New Issue