From 76137e967f2d1e12f3e24d988b0a022bed66421c Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Fri, 9 Sep 2022 09:58:33 +0200 Subject: [PATCH] Create all foreign keys quickly at the end of a shard move (#6148) Previously we would create foreign keys to reference table in an extra fast way at the end of a shard move. This uses that same logic to also do it for foreign keys between distributed tables. Fixes #6141 --- .../distributed/commands/foreign_constraint.c | 120 ------------------ .../distributed/operations/repair_shards.c | 8 +- .../replication/multi_logical_replication.c | 88 +++---------- ...foreign_key_to_reference_table_rebalance.c | 87 ------------- .../distributed/coordinator_protocol.h | 1 - ...reign_key_to_reference_shard_rebalance.out | 17 --- ...reign_key_to_reference_shard_rebalance.sql | 7 - 7 files changed, 25 insertions(+), 303 deletions(-) delete mode 100644 src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index 45aa8e5e9..3be12f928 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -1192,126 +1192,6 @@ IsTableTypeIncluded(Oid relationId, int flags) } -/* - * GetForeignConstraintCommandsToReferenceTable takes in a shardInterval, and - * returns the list of commands that are required to create the foreign - * constraints for that shardInterval. - * - * The function adds a "SET LOCAL citus.skip_constraint_validation TO ON" - * command to command list, to prevent the validation for foreign keys. - * - * We skip the validation phase of foreign keys to reference tables because - * the validation is pretty costly and given that the source placements are - * already valid, the validation in the target nodes is useless. - * - * The function does not apply the same logic for the already invalid foreign - * constraints. - */ -List * -GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval) -{ - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - uint64 shardId = shardInterval->shardId; - Oid relationId = shardInterval->relationId; - - List *commandList = list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"); - - /* - * Set search_path to NIL so that all objects outside of pg_catalog will be - * schema-prefixed. pg_catalog will be added automatically when we call - * PushOverrideSearchPath(), since we set addCatalog to true; - */ - OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext); - overridePath->schemas = NIL; - overridePath->addCatalog = true; - PushOverrideSearchPath(overridePath); - - /* open system catalog and scan all constraints that belong to this table */ - Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock); - ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, - relationId); - - SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, - ConstraintRelidTypidNameIndexId, - true, NULL, scanKeyCount, scanKey); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) - { - Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); - - if (constraintForm->contype != CONSTRAINT_FOREIGN) - { - heapTuple = systable_getnext(scanDescriptor); - continue; - } - - Oid referencedRelationId = constraintForm->confrelid; - if (PartitionMethod(referencedRelationId) != DISTRIBUTE_BY_NONE) - { - heapTuple = systable_getnext(scanDescriptor); - continue; - } - - Oid constraintId = get_relation_constraint_oid(relationId, - constraintForm->conname.data, - true); - - int64 referencedShardId = GetFirstShardId(referencedRelationId); - Oid referencedSchemaId = get_rel_namespace(referencedRelationId); - char *referencedSchemaName = get_namespace_name(referencedSchemaId); - char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName); - - Oid schemaId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaId); - char *escapedSchemaName = quote_literal_cstr(schemaName); - - char *constraintDefinition = pg_get_constraintdef_command(constraintId); - - StringInfo applyForeignConstraintCommand = makeStringInfo(); - appendStringInfo(applyForeignConstraintCommand, - WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, - escapedSchemaName, referencedShardId, - escapedReferencedSchemaName, - quote_literal_cstr(constraintDefinition)); - - commandList = lappend(commandList, applyForeignConstraintCommand->data); - - /* mark the constraint as valid again on the shard */ - if (constraintForm->convalidated == true) - { - StringInfo markConstraintValid = makeStringInfo(); - char *qualifiedReferencingShardName = - ConstructQualifiedShardName(shardInterval); - - char *shardConstraintName = pstrdup(constraintForm->conname.data); - AppendShardIdToName(&shardConstraintName, shardId); - - appendStringInfo(markConstraintValid, - "UPDATE pg_constraint SET convalidated = true WHERE " - "conrelid = %s::regclass AND conname = '%s'", - quote_literal_cstr(qualifiedReferencingShardName), - shardConstraintName); - commandList = lappend(commandList, markConstraintValid->data); - } - - heapTuple = systable_getnext(scanDescriptor); - } - - /* clean up scan and close system catalog */ - systable_endscan(scanDescriptor); - table_close(pgConstraint, AccessShareLock); - - /* revert back to original search_path */ - PopOverrideSearchPath(); - - commandList = lappend(commandList, "RESET citus.skip_constraint_validation;"); - - return commandList; -} - - /* * EnableSkippingConstraintValidation is simply a C interface for setting the following: * SET LOCAL citus.skip_constraint_validation TO on; diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 4388b86fd..f42925ff8 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1721,10 +1721,10 @@ CopyShardForeignConstraintCommandList(ShardInterval *shardInterval) List *colocatedShardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; - CopyShardForeignConstraintCommandListGrouped(shardInterval, - & - colocatedShardForeignConstraintCommandList, - &referenceTableForeignConstraintList); + CopyShardForeignConstraintCommandListGrouped( + shardInterval, + &colocatedShardForeignConstraintCommandList, + &referenceTableForeignConstraintList); return list_concat(colocatedShardForeignConstraintCommandList, referenceTableForeignConstraintList); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 7ee70f7f3..54efd08db 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -114,7 +114,7 @@ bool PlacementMovedUsingLogicalReplicationInTX = false; static int logicalReplicationProgressReportTimeout = 10 * 1000; -static void CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList); +static void CreateForeignKeyConstraints(List *logicalRepTargetList); static List * PrepareReplicationSubscriptionList(List *shardList); static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId); static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, @@ -139,8 +139,6 @@ static void ExecuteRemainingPostLoadTableCommands(List *shardList, char *targetN int targetNodePort); static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNodePort); -static void CreateColocatedForeignKeys(List *shardList, char *targetNodeName, - int targetNodePort); static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static bool RelationSubscriptionsAreReady( @@ -323,7 +321,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * cascade to the hash distributed tables' shards if we had created * the constraints earlier. */ - CreateForeignConstraintsToReferenceTable(logicalRepTargetList); + CreateForeignKeyConstraints(logicalRepTargetList); /* we're done, cleanup the publication and subscription */ DropSubscriptions(logicalRepTargetList); @@ -775,9 +773,6 @@ CreatePostLogicalReplicationDataLoadObjects(List *shardList, char *targetNodeNam /* create partitioning hierarchy, if any */ CreatePartitioningHierarchy(shardList, targetNodeName, targetNodePort); - - /* create colocated foreign keys, if any */ - CreateColocatedForeignKeys(shardList, targetNodeName, targetNodePort); } @@ -1129,64 +1124,21 @@ CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNod /* - * CreateColocatedForeignKeys gets a shardList and creates the colocated foreign - * keys between the shardList, if any, + * CreateForeignKeyConstraints is used to create the foreign constraints + * on the logical replication target without checking that they are actually + * valid. + * + * We skip the validation phase of foreign keys to after a shard + * move/copy/split because the validation is pretty costly and given that the + * source placements are already valid, the validation in the target nodes is + * useless. */ static void -CreateColocatedForeignKeys(List *shardList, char *targetNodeName, int targetNodePort) -{ - ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(co-located foreign keys) on node %s:%d", targetNodeName, - targetNodePort))); - - MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "CreateColocatedForeignKeys", - ALLOCSET_DEFAULT_SIZES); - MemoryContext oldContext = MemoryContextSwitchTo(localContext); - - ListCell *shardCell = NULL; - foreach(shardCell, shardList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - - List *shardForeignConstraintCommandList = NIL; - List *referenceTableForeignConstraintList = NIL; - CopyShardForeignConstraintCommandListGrouped(shardInterval, - &shardForeignConstraintCommandList, - &referenceTableForeignConstraintList); - - if (shardForeignConstraintCommandList == NIL) - { - /* no colocated foreign keys, skip */ - continue; - } - - /* - * Creating foreign keys may acquire conflicting locks when done in - * parallel. Hence we create foreign keys one at a time. - * - */ - char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, - tableOwner, - shardForeignConstraintCommandList); - MemoryContextReset(localContext); - } - - MemoryContextSwitchTo(oldContext); -} - - -/* - * CreateForeignConstraintsToReferenceTable is used to create the foreign constraints - * from distributed to reference tables in the newly created shard replicas. - */ -static void -CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList) +CreateForeignKeyConstraints(List *logicalRepTargetList) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "CreateForeignConstraintsToReferenceTable", + "CreateKeyForeignConstraints", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); @@ -1204,16 +1156,18 @@ CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList) */ foreach_ptr(shardInterval, target->newShards) { - List *commandList = GetForeignConstraintCommandsToReferenceTable( + List *commandList = CopyShardForeignConstraintCommandList( shardInterval); + commandList = list_concat( + list_make1("SET LOCAL citus.skip_constraint_validation TO ON;"), + commandList); - char *command = NULL; + SendCommandListToWorkerOutsideTransaction( + target->superuserConnection->hostname, + target->superuserConnection->port, + target->superuserConnection->user, + commandList); - /* iterate over the commands and execute them in the same connection */ - foreach_ptr(command, commandList) - { - ExecuteCriticalRemoteCommand(target->superuserConnection, command); - } MemoryContextReset(localContext); } } diff --git a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c b/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c deleted file mode 100644 index 80c6fe338..000000000 --- a/src/backend/distributed/test/foreign_key_to_reference_table_rebalance.c +++ /dev/null @@ -1,87 +0,0 @@ -/*------------------------------------------------------------------------- - * - * foreign_key_relationship_query.c - * - * This file contains UDFs for getting foreign constraint relationship between - * distributed tables. - * - * Copyright (c) 2018, Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "fmgr.h" -#include "funcapi.h" - -#include "distributed/coordinator_protocol.h" -#include "distributed/listutils.h" -#include "distributed/metadata_cache.h" -#include "utils/builtins.h" - - -/* these functions are only exported in the regression tests */ -PG_FUNCTION_INFO_V1(get_foreign_key_to_reference_table_commands); - -/* - * get_foreign_key_to_reference_table_commands returns the list of commands - * for creating foreign keys to reference tables. - */ -Datum -get_foreign_key_to_reference_table_commands(PG_FUNCTION_ARGS) -{ - CheckCitusVersion(ERROR); - - FuncCallContext *functionContext = NULL; - ListCell *commandsCell = NULL; - - /* for the first we call this UDF, we need to populate the result to return set */ - if (SRF_IS_FIRSTCALL()) - { - Oid relationId = PG_GETARG_OID(0); - - /* create a function context for cross-call persistence */ - functionContext = SRF_FIRSTCALL_INIT(); - - /* switch to memory context appropriate for multiple function calls */ - MemoryContext oldContext = MemoryContextSwitchTo( - functionContext->multi_call_memory_ctx); - - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - ShardInterval *firstShardInterval = cacheEntry->sortedShardIntervalArray[0]; - ListCellAndListWrapper *wrapper = palloc0(sizeof(ListCellAndListWrapper)); - List *commandsList = - GetForeignConstraintCommandsToReferenceTable(firstShardInterval); - - commandsCell = list_head(commandsList); - wrapper->list = commandsList; - wrapper->listCell = commandsCell; - functionContext->user_fctx = wrapper; - MemoryContextSwitchTo(oldContext); - } - - /* - * On every call to this function, we get the current position in the - * statement list. We then iterate to the next position in the list and - * return the current statement, if we have not yet reached the end of - * list. - */ - functionContext = SRF_PERCALL_SETUP(); - - ListCellAndListWrapper *wrapper = - (ListCellAndListWrapper *) functionContext->user_fctx; - - if (wrapper->listCell != NULL) - { - char *command = (char *) lfirst(wrapper->listCell); - text *commandText = cstring_to_text(command); - - wrapper->listCell = lnext(wrapper->list, wrapper->listCell); - - SRF_RETURN_NEXT(functionContext, PointerGetDatum(commandText)); - } - else - { - SRF_RETURN_DONE(functionContext); - } -} diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 501f5c233..73709a864 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -309,7 +309,6 @@ extern void ErrorIfTargetNodeIsNotSafeToMove(const char *targetNodeName, int extern char LookupShardTransferMode(Oid shardReplicationModeOid); extern void BlockWritesToShardList(List *shardList); extern List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId); -extern List * GetForeignConstraintCommandsToReferenceTable(ShardInterval *shardInterval); #endif /* COORDINATOR_PROTOCOL_H */ diff --git a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out index 25049473a..54054d99b 100644 --- a/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out +++ b/src/test/regress/expected/foreign_key_to_reference_shard_rebalance.out @@ -147,11 +147,6 @@ SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_r referencing_table_id_fkey_15000008 | fkey_to_reference_shard_rebalance.referencing_table_15000008 | fkey_to_reference_shard_rebalance.referenced_table_15000000 (30 rows) --- create a function to show the -CREATE FUNCTION get_foreign_key_to_reference_table_commands(Oid) - RETURNS SETOF text - LANGUAGE C STABLE STRICT - AS 'citus', $$get_foreign_key_to_reference_table_commands$$; CREATE TABLE reference_table_commands (id int UNIQUE); CREATE TABLE referenceing_dist_table (id int, col1 int, col2 int, col3 int); SELECT create_reference_table('reference_table_commands'); @@ -170,18 +165,6 @@ ALTER TABLE referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFEREN ALTER TABLE referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES reference_table_commands(id) ON UPDATE CASCADE NOT VALID; ALTER TABLE referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_very_long FOREIGN KEY (col3) REFERENCES reference_table_commands(id) ON UPDATE CASCADE; NOTICE: identifier "very_very_very_very_very_very_very_very_very_very_very_very_very_long" will be truncated to "very_very_very_very_very_very_very_very_very_very_very_very_ver" -SELECT * FROM get_foreign_key_to_reference_table_commands('referenceing_dist_table'::regclass); - get_foreign_key_to_reference_table_commands ---------------------------------------------------------------------- - SET LOCAL citus.skip_constraint_validation TO ON; - SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE') - UPDATE pg_constraint SET convalidated = true WHERE conrelid = 'fkey_to_reference_shard_rebalance.referenceing_dist_table_15000018'::regclass AND conname = 'c1_15000018' - SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE NOT VALID') - SELECT worker_apply_inter_shard_ddl_command (15000018, 'fkey_to_reference_shard_rebalance', 15000017, 'fkey_to_reference_shard_rebalance', 'ALTER TABLE fkey_to_reference_shard_rebalance.referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_ver FOREIGN KEY (col3) REFERENCES fkey_to_reference_shard_rebalance.reference_table_commands(id) ON UPDATE CASCADE') - UPDATE pg_constraint SET convalidated = true WHERE conrelid = 'fkey_to_reference_shard_rebalance.referenceing_dist_table_15000018'::regclass AND conname = 'very_very_very_very_very_very_very_very_very__754e8716_15000018' - RESET citus.skip_constraint_validation; -(7 rows) - -- and show that rebalancer works fine SELECT master_move_shard_placement(15000018, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); master_move_shard_placement diff --git a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql index 99fb7e73c..dc70f8563 100644 --- a/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql +++ b/src/test/regress/sql/foreign_key_to_reference_shard_rebalance.sql @@ -55,12 +55,6 @@ SELECT count(*) FROM referencing_table2; CALL citus_cleanup_orphaned_shards(); SELECT * FROM table_fkeys_in_workers WHERE relid LIKE 'fkey_to_reference_shard_rebalance.%' AND refd_relid LIKE 'fkey_to_reference_shard_rebalance.%' ORDER BY 1,2,3; --- create a function to show the -CREATE FUNCTION get_foreign_key_to_reference_table_commands(Oid) - RETURNS SETOF text - LANGUAGE C STABLE STRICT - AS 'citus', $$get_foreign_key_to_reference_table_commands$$; - CREATE TABLE reference_table_commands (id int UNIQUE); CREATE TABLE referenceing_dist_table (id int, col1 int, col2 int, col3 int); SELECT create_reference_table('reference_table_commands'); @@ -68,7 +62,6 @@ SELECT create_distributed_table('referenceing_dist_table', 'id'); ALTER TABLE referenceing_dist_table ADD CONSTRAINT c1 FOREIGN KEY (col1) REFERENCES reference_table_commands(id) ON UPDATE CASCADE; ALTER TABLE referenceing_dist_table ADD CONSTRAINT c2 FOREIGN KEY (col2) REFERENCES reference_table_commands(id) ON UPDATE CASCADE NOT VALID; ALTER TABLE referenceing_dist_table ADD CONSTRAINT very_very_very_very_very_very_very_very_very_very_very_very_very_long FOREIGN KEY (col3) REFERENCES reference_table_commands(id) ON UPDATE CASCADE; -SELECT * FROM get_foreign_key_to_reference_table_commands('referenceing_dist_table'::regclass); -- and show that rebalancer works fine SELECT master_move_shard_placement(15000018, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');