From d67cf907a2ed20c9f8a1d190d1eeec69d3d9a75d Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 27 Jul 2022 18:03:53 +0200 Subject: [PATCH 1/3] Detach relation access tracking from connection management --- .../connection/placement_connection.c | 3 -- src/backend/distributed/shared_library_init.c | 1 + .../transaction/relation_access_tracking.c | 34 ++++++++++++++++--- .../distributed/relation_access_tracking.h | 2 +- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index d3929f4b9..2aa3994a0 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -1089,9 +1089,6 @@ InitPlacementConnectionManagement(void) ConnectionShardHash = hash_create("citus connection cache (shardid)", 64, &info, hashFlags); - - /* (relationId) = [relationAccessMode] hash */ - AllocateRelationAccessHash(); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 8ea60fe74..600b62b69 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -390,6 +390,7 @@ _PG_init(void) InitializeBackendManagement(); InitializeConnectionManagement(); InitPlacementConnectionManagement(); + InitRelationAccessHash(); InitializeCitusQueryStats(); InitializeSharedConnectionStats(); InitializeLocallyReservedSharedConnections(); diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index 9fdb226e1..f69de6f8a 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -47,6 +47,8 @@ bool EnforceForeignKeyRestrictions = true; (1 << (PLACEMENT_ACCESS_DDL + \ PARALLEL_MODE_FLAG_OFFSET))) +MemoryContext RelationAcessContext = NULL; + /* * Hash table mapping relations to the @@ -84,8 +86,8 @@ typedef struct RelationAccessHashEntry static HTAB *RelationAccessHash; - /* functions related to access recording */ +static void AllocateRelationAccessHash(void); static void RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType); static void RecordPlacementAccessToCache(Oid relationId, ShardPlacementAccessType accessType); @@ -120,6 +122,18 @@ static bool HoldsConflictingLockWithReferencedRelations(Oid relationId, conflictingAccessMode); +/* + * InitRelationAccessHash performs initialization of the + * infrastructure in this file at backend start. + */ +void +InitRelationAccessHash(void) +{ + /* allocate (relationId) = [relationAccessMode] hash */ + AllocateRelationAccessHash(); +} + + /* * Empty RelationAccessHash, without destroying the hash table itself. */ @@ -133,19 +147,29 @@ ResetRelationAccessHash() /* * Allocate RelationAccessHash. */ -void +static void AllocateRelationAccessHash(void) { - HASHCTL info; + /* + * Create a single context for relation access related memory + * management. Doing so, instead of allocating in TopMemoryContext, makes + * it easier to associate used memory. + */ + RelationAcessContext = AllocSetContextCreateExtended(TopMemoryContext, + "Relation Access Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + HASHCTL info; memset(&info, 0, sizeof(info)); info.keysize = sizeof(RelationAccessHashKey); info.entrysize = sizeof(RelationAccessHashEntry); info.hash = tag_hash; - info.hcxt = ConnectionContext; + info.hcxt = RelationAcessContext; uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - RelationAccessHash = hash_create("citus connection cache (relationid)", + RelationAccessHash = hash_create("citus relation access cache (relationid)", 8, &info, hashFlags); } diff --git a/src/include/distributed/relation_access_tracking.h b/src/include/distributed/relation_access_tracking.h index deacdec94..295d54351 100644 --- a/src/include/distributed/relation_access_tracking.h +++ b/src/include/distributed/relation_access_tracking.h @@ -34,7 +34,7 @@ typedef enum RelationAccessMode RELATION_PARALLEL_ACCESSED } RelationAccessMode; -extern void AllocateRelationAccessHash(void); +extern void InitRelationAccessHash(void); extern void ResetRelationAccessHash(void); extern void RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType accessType); From 0a5112964de19ae4571a3a4f3f75800b661d369d Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 27 Jul 2022 18:06:53 +0200 Subject: [PATCH 2/3] Call relation access hash clean-up irrespective of remote transaction state Mainly because local-only transactions should be cleaned up --- src/backend/distributed/connection/placement_connection.c | 1 - src/backend/distributed/transaction/transaction_management.c | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 2aa3994a0..225bf9708 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -969,7 +969,6 @@ ResetPlacementConnectionManagement(void) hash_delete_all(ConnectionPlacementHash); hash_delete_all(ConnectionShardHash); hash_delete_all(ColocatedPlacementsHash); - ResetRelationAccessHash(); /* * NB: memory for ConnectionReference structs and subordinate data is diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 66525e6ac..0778a34fa 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -36,6 +36,7 @@ #include "distributed/repartition_join_execution.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" +#include "distributed/relation_access_tracking.h" #include "distributed/shared_connection_stats.h" #include "distributed/subplan_execution.h" #include "distributed/version_compat.h" @@ -307,6 +308,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) } ResetGlobalVariables(); + ResetRelationAccessHash(); /* * Make sure that we give the shared connections back to the shared @@ -376,6 +378,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) AfterXactConnectionHandling(false); ResetGlobalVariables(); + ResetRelationAccessHash(); /* * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus From b41c3fd30dbe06614b95df2ff0bfe82af90c4806 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 27 Jul 2022 18:16:21 +0200 Subject: [PATCH 3/3] Add tests --- src/test/regress/expected/single_node.out | 84 +++++++++++++++++++++-- src/test/regress/sql/single_node.sql | 38 ++++++++++ 2 files changed, 116 insertions(+), 6 deletions(-) diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 6177e215e..0490d1848 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -503,6 +503,82 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST (0 rows) DROP TABLE upsert_test; +CREATE TABLE relation_tracking_table_1(id int, nonid int); +SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0; +CREATE or REPLACE function foo() +returns setof relation_tracking_table_1 +AS $$ +BEGIN +RETURN query select * from relation_tracking_table_1 order by 1 limit 10; +end; +$$ language plpgsql; +CREATE TABLE relation_tracking_table_2 (id int, nonid int); +-- use the relation-access in this session +select foo(); + foo +--------------------------------------------------------------------- + (6,0) + (7,0) + (8,0) + (9,0) + (10,0) + (11,0) + (12,0) + (13,0) + (14,0) + (15,0) +(10 rows) + +-- we should be able to use sequential mode, as the previous multi-shard +-- relation access has been cleaned-up +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO sequential; +INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; +SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM relation_tracking_table_2; + count +--------------------------------------------------------------------- + 995 +(1 row) + +ROLLBACK; +BEGIN; +INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; +SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM relation_tracking_table_2; + count +--------------------------------------------------------------------- + 995 +(1 row) + +COMMIT; +SET client_min_messages TO ERROR; +DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE; +RESET client_min_messages; CREATE SCHEMA "Quoed.Schema"; SET search_path TO "Quoed.Schema"; CREATE TABLE "long_constraint_upsert\_test" @@ -541,13 +617,9 @@ NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quot ERROR: renaming constraints belonging to distributed tables is currently unsupported --INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; SET search_path TO single_node; +SET client_min_messages TO ERROR; DROP SCHEMA "Quoed.Schema" CASCADE; -NOTICE: drop cascades to 5 other objects -DETAIL: drop cascades to table "Quoed.Schema".simple_table_name -drop cascades to table "Quoed.Schema".simple_table_name_90630528 -drop cascades to table "Quoed.Schema".simple_table_name_90630529 -drop cascades to table "Quoed.Schema".simple_table_name_90630530 -drop cascades to table "Quoed.Schema".simple_table_name_90630531 +RESET client_min_messages; -- test partitioned index creation with long name CREATE TABLE test_index_creation1 ( diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 21ae9e3ac..09a8c9870 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -252,6 +252,42 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST DROP TABLE upsert_test; +CREATE TABLE relation_tracking_table_1(id int, nonid int); +SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none'); +INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0; + +CREATE or REPLACE function foo() +returns setof relation_tracking_table_1 +AS $$ +BEGIN +RETURN query select * from relation_tracking_table_1 order by 1 limit 10; +end; +$$ language plpgsql; + +CREATE TABLE relation_tracking_table_2 (id int, nonid int); + +-- use the relation-access in this session +select foo(); + +-- we should be able to use sequential mode, as the previous multi-shard +-- relation access has been cleaned-up +BEGIN; +SET LOCAL citus.multi_shard_modify_mode TO sequential; +INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; +SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); +SELECT count(*) FROM relation_tracking_table_2; +ROLLBACK; + +BEGIN; +INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0; +SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none'); +SELECT count(*) FROM relation_tracking_table_2; +COMMIT; + +SET client_min_messages TO ERROR; +DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE; +RESET client_min_messages; + CREATE SCHEMA "Quoed.Schema"; SET search_path TO "Quoed.Schema"; @@ -280,7 +316,9 @@ ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooo --INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; SET search_path TO single_node; +SET client_min_messages TO ERROR; DROP SCHEMA "Quoed.Schema" CASCADE; +RESET client_min_messages; -- test partitioned index creation with long name CREATE TABLE test_index_creation1