mirror of https://github.com/citusdata/citus.git
Create all foreign keys quickly at the end of a shard move (#6148)
Previously we would create foreign keys to reference table in an extra fast way at the end of a shard move. This uses that same logic to also do it for foreign keys between distributed tables. Fixes #6141pull/6296/head^2
parent
cc0eeea4c5
commit
76137e967f
|
@ -1192,126 +1192,6 @@ IsTableTypeIncluded(Oid relationId, int flags)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignConstraintCommandsToReferenceTable takes in a shardInterval, and
|
||||
* returns the list of commands that are required to create the foreign
|
||||
* constraints for that shardInterval.
|
||||
*
|
||||
* The function adds a "SET LOCAL citus.skip_constraint_validation TO ON"
|
||||
* command to command list, to prevent the validation for foreign keys.
|
||||
*
|
||||
* We skip the validation phase of foreign keys to reference tables because
|
||||
* the validation is pretty costly and given that the source placements are
|
||||
* already valid, the validation in the target nodes is useless.
|
||||
*
|
||||
* The function does not apply the same logic for the already invalid foreign
|
||||
* constraints.
|
||||
*/
|
||||
List *
|
||||
GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid relationId = shardInterval->relationId;
|
||||
|
||||
List *commandList = list_make1("SET LOCAL citus.skip_constraint_validation TO ON;");
|
||||
|
||||
/*
|
||||
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||
* PushOverrideSearchPath(), since we set addCatalog to true;
|
||||
*/
|
||||
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
|
||||
overridePath->schemas = NIL;
|
||||
overridePath->addCatalog = true;
|
||||
PushOverrideSearchPath(overridePath);
|
||||
|
||||
/* open system catalog and scan all constraints that belong to this table */
|
||||
Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relationId);
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
|
||||
ConstraintRelidTypidNameIndexId,
|
||||
true, NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype != CONSTRAINT_FOREIGN)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
Oid referencedRelationId = constraintForm->confrelid;
|
||||
if (PartitionMethod(referencedRelationId) != DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
Oid constraintId = get_relation_constraint_oid(relationId,
|
||||
constraintForm->conname.data,
|
||||
true);
|
||||
|
||||
int64 referencedShardId = GetFirstShardId(referencedRelationId);
|
||||
Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
|
||||
char *referencedSchemaName = get_namespace_name(referencedSchemaId);
|
||||
char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
|
||||
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *escapedSchemaName = quote_literal_cstr(schemaName);
|
||||
|
||||
char *constraintDefinition = pg_get_constraintdef_command(constraintId);
|
||||
|
||||
StringInfo applyForeignConstraintCommand = makeStringInfo();
|
||||
appendStringInfo(applyForeignConstraintCommand,
|
||||
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId,
|
||||
escapedSchemaName, referencedShardId,
|
||||
escapedReferencedSchemaName,
|
||||
quote_literal_cstr(constraintDefinition));
|
||||
|
||||
commandList = lappend(commandList, applyForeignConstraintCommand->data);
|
||||
|
||||
/* mark the constraint as valid again on the shard */
|
||||
if (constraintForm->convalidated == true)
|
||||
{
|
||||
StringInfo markConstraintValid = makeStringInfo();
|
||||
char *qualifiedReferencingShardName =
|
||||
ConstructQualifiedShardName(shardInterval);
|
||||
|
||||
char *shardConstraintName = pstrdup(constraintForm->conname.data);
|
||||
AppendShardIdToName(&shardConstraintName, shardId);
|
||||
|
||||
appendStringInfo(markConstraintValid,
|
||||
"UPDATE pg_constraint SET convalidated = true WHERE "
|
||||
"conrelid = %s::regclass AND conname = '%s'",
|
||||
quote_literal_cstr(qualifiedReferencingShardName),
|
||||
shardConstraintName);
|
||||
commandList = lappend(commandList, markConstraintValid->data);
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgConstraint, AccessShareLock);
|
||||
|
||||
/* revert back to original search_path */
|
||||
PopOverrideSearchPath();
|
||||
|
||||
commandList = lappend(commandList, "RESET citus.skip_constraint_validation;");
|
||||
|
||||
return commandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnableSkippingConstraintValidation is simply a C interface for setting the following:
|
||||
* SET LOCAL citus.skip_constraint_validation TO on;
|
||||
|
|
|
@ -1721,9 +1721,9 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
|
|||
List *colocatedShardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
|
||||
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
||||
&
|
||||
colocatedShardForeignConstraintCommandList,
|
||||
CopyShardForeignConstraintCommandListGrouped(
|
||||
shardInterval,
|
||||
&colocatedShardForeignConstraintCommandList,
|
||||
&referenceTableForeignConstraintList);
|
||||
|
||||
return list_concat(colocatedShardForeignConstraintCommandList,
|
||||
|
|
|
@ -114,7 +114,7 @@ bool PlacementMovedUsingLogicalReplicationInTX = false;
|
|||
static int logicalReplicationProgressReportTimeout = 10 * 1000;
|
||||
|
||||
|
||||
static void CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList);
|
||||
static void CreateForeignKeyConstraints(List *logicalRepTargetList);
|
||||
static List * PrepareReplicationSubscriptionList(List *shardList);
|
||||
static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId);
|
||||
static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId,
|
||||
|
@ -139,8 +139,6 @@ static void ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetN
|
|||
int targetNodePort);
|
||||
static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName,
|
||||
int targetNodePort);
|
||||
static void CreateColocatedForeignKeys(List *shardList, char *targetNodeName,
|
||||
int targetNodePort);
|
||||
static char * escape_param_str(const char *str);
|
||||
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
|
||||
static bool RelationSubscriptionsAreReady(
|
||||
|
@ -323,7 +321,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo
|
|||
* cascade to the hash distributed tables' shards if we had created
|
||||
* the constraints earlier.
|
||||
*/
|
||||
CreateForeignConstraintsToReferenceTable(logicalRepTargetList);
|
||||
CreateForeignKeyConstraints(logicalRepTargetList);
|
||||
|
||||
/* we're done, cleanup the publication and subscription */
|
||||
DropSubscriptions(logicalRepTargetList);
|
||||
|
@ -775,9 +773,6 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam
|
|||
|
||||
/* create partitioning hierarchy, if any */
|
||||
CreatePartitioningHierarchy(shardList, targetNodeName, targetNodePort);
|
||||
|
||||
/* create colocated foreign keys, if any */
|
||||
CreateColocatedForeignKeys(shardList, targetNodeName, targetNodePort);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1129,64 +1124,21 @@ CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNod
|
|||
|
||||
|
||||
/*
|
||||
* CreateColocatedForeignKeys gets a shardList and creates the colocated foreign
|
||||
* keys between the shardList, if any,
|
||||
*/
|
||||
static void
|
||||
CreateColocatedForeignKeys(List *shardList, char *targetNodeName, int targetNodePort)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("Creating post logical replication objects "
|
||||
"(co-located foreign keys) on node %s:%d", targetNodeName,
|
||||
targetNodePort)));
|
||||
|
||||
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CreateColocatedForeignKeys",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||
|
||||
ListCell *shardCell = NULL;
|
||||
foreach(shardCell, shardList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
|
||||
|
||||
List *shardForeignConstraintCommandList = NIL;
|
||||
List *referenceTableForeignConstraintList = NIL;
|
||||
CopyShardForeignConstraintCommandListGrouped(shardInterval,
|
||||
&shardForeignConstraintCommandList,
|
||||
&referenceTableForeignConstraintList);
|
||||
|
||||
if (shardForeignConstraintCommandList == NIL)
|
||||
{
|
||||
/* no colocated foreign keys, skip */
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Creating foreign keys may acquire conflicting locks when done in
|
||||
* parallel. Hence we create foreign keys one at a time.
|
||||
* CreateForeignKeyConstraints is used to create the foreign constraints
|
||||
* on the logical replication target without checking that they are actually
|
||||
* valid.
|
||||
*
|
||||
*/
|
||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||
SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
|
||||
tableOwner,
|
||||
shardForeignConstraintCommandList);
|
||||
MemoryContextReset(localContext);
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateForeignConstraintsToReferenceTable is used to create the foreign constraints
|
||||
* from distributed to reference tables in the newly created shard replicas.
|
||||
* We skip the validation phase of foreign keys to after a shard
|
||||
* move/copy/split because the validation is pretty costly and given that the
|
||||
* source placements are already valid, the validation in the target nodes is
|
||||
* useless.
|
||||
*/
|
||||
static void
|
||||
CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList)
|
||||
CreateForeignKeyConstraints(List *logicalRepTargetList)
|
||||
{
|
||||
MemoryContext localContext =
|
||||
AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CreateForeignConstraintsToReferenceTable",
|
||||
"CreateKeyForeignConstraints",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||
|
||||
|
@ -1204,16 +1156,18 @@ CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList)
|
|||
*/
|
||||
foreach_ptr(shardInterval, target->newShards)
|
||||
{
|
||||
List *commandList = GetForeignConstraintCommandsToReferenceTable(
|
||||
List *commandList = CopyShardForeignConstraintCommandList(
|
||||
shardInterval);
|
||||
commandList = list_concat(
|
||||
list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"),
|
||||
commandList);
|
||||
|
||||
char *command = NULL;
|
||||
SendCommandListToWorkerOutsideTransaction(
|
||||
target->superuserConnection->hostname,
|
||||
target->superuserConnection->port,
|
||||
target->superuserConnection->user,
|
||||
commandList);
|
||||
|
||||
/* iterate over the commands and execute them in the same connection */
|
||||
foreach_ptr(command, commandList)
|
||||
{
|
||||
ExecuteCriticalRemoteCommand(target->superuserConnection, command);
|
||||
}
|
||||
MemoryContextReset(localContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,87 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* foreign_key_relationship_query.c
|
||||
*
|
||||
* This file contains UDFs for getting foreign constraint relationship between
|
||||
* distributed tables.
|
||||
*
|
||||
* Copyright (c) 2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
|
||||
/* these functions are only exported in the regression tests */
|
||||
PG_FUNCTION_INFO_V1(get_foreign_key_to_reference_table_commands);
|
||||
|
||||
/*
|
||||
* get_foreign_key_to_reference_table_commands returns the list of commands
|
||||
* for creating foreign keys to reference tables.
|
||||
*/
|
||||
Datum
|
||||
get_foreign_key_to_reference_table_commands(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
FuncCallContext *functionContext = NULL;
|
||||
ListCell *commandsCell = NULL;
|
||||
|
||||
/* for the first we call this UDF, we need to populate the result to return set */
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
/* create a function context for cross-call persistence */
|
||||
functionContext = SRF_FIRSTCALL_INIT();
|
||||
|
||||
/* switch to memory context appropriate for multiple function calls */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(
|
||||
functionContext->multi_call_memory_ctx);
|
||||
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
ShardInterval *firstShardInterval = cacheEntry->sortedShardIntervalArray[0];
|
||||
ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper));
|
||||
List *commandsList =
|
||||
GetForeignConstraintCommandsToReferenceTable(firstShardInterval);
|
||||
|
||||
commandsCell = list_head(commandsList);
|
||||
wrapper->list = commandsList;
|
||||
wrapper->listCell = commandsCell;
|
||||
functionContext->user_fctx = wrapper;
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
/*
|
||||
* On every call to this function, we get the current position in the
|
||||
* statement list. We then iterate to the next position in the list and
|
||||
* return the current statement, if we have not yet reached the end of
|
||||
* list.
|
||||
*/
|
||||
functionContext = SRF_PERCALL_SETUP();
|
||||
|
||||
ListCellAndListWrapper *wrapper =
|
||||
(ListCellAndListWrapper *) functionContext->user_fctx;
|
||||
|
||||
if (wrapper->listCell != NULL)
|
||||
{
|
||||
char *command = (char *) lfirst(wrapper->listCell);
|
||||
text *commandText = cstring_to_text(command);
|
||||
|
||||
wrapper->listCell = lnext(wrapper->list, wrapper->listCell);
|
||||
|
||||
SRF_RETURN_NEXT(functionContext, PointerGetDatum(commandText));
|
||||
}
|
||||
else
|
||||
{
|
||||
SRF_RETURN_DONE(functionContext);
|
||||
}
|
||||
}
|
|
@ -309,7 +309,6 @@ extern void ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int
|
|||
extern char LookupShardTransferMode(Oid shardReplicationModeOid);
|
||||
extern void BlockWritesToShardList(List *shardList);
|
||||
extern List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
|
||||
extern List * GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval);
|
||||
|
||||
|
||||
#endif /* COORDINATOR_PROTOCOL_H */
|
||||
|
|
|
@ -147,11 +147,6 @@ SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_r
|
|||
referencing_table_id_fkey_15000008 | fkey_to_reference_shard_rebalance.referencing_table_15000008 | fkey_to_reference_shard_rebalance.referenced_table_15000000
|
||||
(30 rows)
|
||||
|
||||
-- create a function to show the
|
||||
CREATE FUNCTION get_foreign_key_to_reference_table_commands(Oid)
|
||||
RETURNS SETOF text
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus', $$get_foreign_key_to_reference_table_commands$$;
|
||||
CREATE TABLE reference_table_commands (id int UNIQUE);
|
||||
CREATE TABLE referenceing_dist_table (id int, col1 int, col2 int, col3 int);
|
||||
SELECT create_reference_table('reference_table_commands');
|
||||
|
@ -170,18 +165,6 @@ ALTER TABLE referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFEREN
|
|||
ALTER TABLE referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES reference_table_commands(id) ON UPDATE CASCADE NOT VALID;
|
||||
ALTER TABLE referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_very_long FOREIGN KEY (col3) REFERENCES reference_table_commands(id) ON UPDATE CASCADE;
|
||||
NOTICE: identifier "very_very_very_very_very_very_very_very_very_very_very_very_very_long" will be truncated to "very_very_very_very_very_very_very_very_very_very_very_very_ver"
|
||||
SELECT * FROM get_foreign_key_to_reference_table_commands('referenceing_dist_table'::regclass);
|
||||
get_foreign_key_to_reference_table_commands
|
||||
---------------------------------------------------------------------
|
||||
SET LOCAL citus.skip_constraint_validation TO ON;
|
||||
SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE')
|
||||
UPDATE pg_constraint SET convalidated = true WHERE conrelid = 'fkey_to_reference_shard_rebalance.referenceing_dist_table_15000018'::regclass AND conname = 'c1_15000018'
|
||||
SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE NOT VALID')
|
||||
SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_ver FOREIGN KEY (col3) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE')
|
||||
UPDATE pg_constraint SET convalidated = true WHERE conrelid = 'fkey_to_reference_shard_rebalance.referenceing_dist_table_15000018'::regclass AND conname = 'very_very_very_very_very_very_very_very_very__754e8716_15000018'
|
||||
RESET citus.skip_constraint_validation;
|
||||
(7 rows)
|
||||
|
||||
-- and show that rebalancer works fine
|
||||
SELECT master_move_shard_placement(15000018, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
master_move_shard_placement
|
||||
|
|
|
@ -55,12 +55,6 @@ SELECT count(*) FROM referencing_table2;
|
|||
CALL citus_cleanup_orphaned_shards();
|
||||
SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3;
|
||||
|
||||
-- create a function to show the
|
||||
CREATE FUNCTION get_foreign_key_to_reference_table_commands(Oid)
|
||||
RETURNS SETOF text
|
||||
LANGUAGE C STABLE STRICT
|
||||
AS 'citus', $$get_foreign_key_to_reference_table_commands$$;
|
||||
|
||||
CREATE TABLE reference_table_commands (id int UNIQUE);
|
||||
CREATE TABLE referenceing_dist_table (id int, col1 int, col2 int, col3 int);
|
||||
SELECT create_reference_table('reference_table_commands');
|
||||
|
@ -68,7 +62,6 @@ SELECT create_distributed_table('referenceing_dist_table', 'id');
|
|||
ALTER TABLE referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFERENCES reference_table_commands(id) ON UPDATE CASCADE;
|
||||
ALTER TABLE referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES reference_table_commands(id) ON UPDATE CASCADE NOT VALID;
|
||||
ALTER TABLE referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_very_long FOREIGN KEY (col3) REFERENCES reference_table_commands(id) ON UPDATE CASCADE;
|
||||
SELECT * FROM get_foreign_key_to_reference_table_commands('referenceing_dist_table'::regclass);
|
||||
|
||||
-- and show that rebalancer works fine
|
||||
SELECT master_move_shard_placement(15000018, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||
|
|
Loading…
Reference in New Issue