Merge pull request #6092 from citusdata/fix_relation_acess

Fix relation access tracking for local only transactions
pull/6095/head
Önder Kalacı 2022-07-28 11:35:40 +02:00 committed by GitHub
commit 5670dffd33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 150 additions and 16 deletions

View File

@ -969,7 +969,6 @@ ResetPlacementConnectionManagement(void)
hash_delete_all(ConnectionPlacementHash); hash_delete_all(ConnectionPlacementHash);
hash_delete_all(ConnectionShardHash); hash_delete_all(ConnectionShardHash);
hash_delete_all(ColocatedPlacementsHash); hash_delete_all(ColocatedPlacementsHash);
ResetRelationAccessHash();
/* /*
* NB: memory for ConnectionReference structs and subordinate data is * NB: memory for ConnectionReference structs and subordinate data is
@ -1089,9 +1088,6 @@ InitPlacementConnectionManagement(void)
ConnectionShardHash = hash_create("citus connection cache (shardid)", ConnectionShardHash = hash_create("citus connection cache (shardid)",
64, &info, hashFlags); 64, &info, hashFlags);
/* (relationId) = [relationAccessMode] hash */
AllocateRelationAccessHash();
} }

View File

@ -390,6 +390,7 @@ _PG_init(void)
InitializeBackendManagement(); InitializeBackendManagement();
InitializeConnectionManagement(); InitializeConnectionManagement();
InitPlacementConnectionManagement(); InitPlacementConnectionManagement();
InitRelationAccessHash();
InitializeCitusQueryStats(); InitializeCitusQueryStats();
InitializeSharedConnectionStats(); InitializeSharedConnectionStats();
InitializeLocallyReservedSharedConnections(); InitializeLocallyReservedSharedConnections();

View File

@ -47,6 +47,8 @@ bool EnforceForeignKeyRestrictions = true;
(1 << (PLACEMENT_ACCESS_DDL + \ (1 << (PLACEMENT_ACCESS_DDL + \
PARALLEL_MODE_FLAG_OFFSET))) PARALLEL_MODE_FLAG_OFFSET)))
MemoryContext RelationAcessContext = NULL;
/* /*
* Hash table mapping relations to the * Hash table mapping relations to the
@ -84,8 +86,8 @@ typedef struct RelationAccessHashEntry
static HTAB *RelationAccessHash; static HTAB *RelationAccessHash;
/* functions related to access recording */ /* functions related to access recording */
static void AllocateRelationAccessHash(void);
static void RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType); static void RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType);
static void RecordPlacementAccessToCache(Oid relationId, static void RecordPlacementAccessToCache(Oid relationId,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
@ -120,6 +122,18 @@ static bool HoldsConflictingLockWithReferencedRelations(Oid relationId,
conflictingAccessMode); conflictingAccessMode);
/*
* InitRelationAccessHash performs initialization of the
* infrastructure in this file at backend start.
*/
void
InitRelationAccessHash(void)
{
/* allocate (relationId) = [relationAccessMode] hash */
AllocateRelationAccessHash();
}
/* /*
* Empty RelationAccessHash, without destroying the hash table itself. * Empty RelationAccessHash, without destroying the hash table itself.
*/ */
@ -133,19 +147,29 @@ ResetRelationAccessHash()
/* /*
* Allocate RelationAccessHash. * Allocate RelationAccessHash.
*/ */
void static void
AllocateRelationAccessHash(void) AllocateRelationAccessHash(void)
{ {
HASHCTL info; /*
* Create a single context for relation access related memory
* management. Doing so, instead of allocating in TopMemoryContext, makes
* it easier to associate used memory.
*/
RelationAcessContext = AllocSetContextCreateExtended(TopMemoryContext,
"Relation Access Context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
HASHCTL info;
memset(&info, 0, sizeof(info)); memset(&info, 0, sizeof(info));
info.keysize = sizeof(RelationAccessHashKey); info.keysize = sizeof(RelationAccessHashKey);
info.entrysize = sizeof(RelationAccessHashEntry); info.entrysize = sizeof(RelationAccessHashEntry);
info.hash = tag_hash; info.hash = tag_hash;
info.hcxt = ConnectionContext; info.hcxt = RelationAcessContext;
uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
RelationAccessHash = hash_create("citus connection cache (relationid)", RelationAccessHash = hash_create("citus relation access cache (relationid)",
8, &info, hashFlags); 8, &info, hashFlags);
} }

View File

@ -36,6 +36,7 @@
#include "distributed/repartition_join_execution.h" #include "distributed/repartition_join_execution.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/placement_connection.h" #include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
@ -307,6 +308,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
} }
ResetGlobalVariables(); ResetGlobalVariables();
ResetRelationAccessHash();
/* /*
* Make sure that we give the shared connections back to the shared * Make sure that we give the shared connections back to the shared
@ -376,6 +378,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
AfterXactConnectionHandling(false); AfterXactConnectionHandling(false);
ResetGlobalVariables(); ResetGlobalVariables();
ResetRelationAccessHash();
/* /*
* Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus * Clear MetadataCache table if we're aborting from a CREATE EXTENSION Citus

View File

@ -34,7 +34,7 @@ typedef enum RelationAccessMode
RELATION_PARALLEL_ACCESSED RELATION_PARALLEL_ACCESSED
} RelationAccessMode; } RelationAccessMode;
extern void AllocateRelationAccessHash(void); extern void InitRelationAccessHash(void);
extern void ResetRelationAccessHash(void); extern void ResetRelationAccessHash(void);
extern void RecordRelationAccessIfNonDistTable(Oid relationId, extern void RecordRelationAccessIfNonDistTable(Oid relationId,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);

View File

@ -503,6 +503,82 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST
(0 rows) (0 rows)
DROP TABLE upsert_test; DROP TABLE upsert_test;
CREATE TABLE relation_tracking_table_1(id int, nonid int);
SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0;
CREATE or REPLACE function foo()
returns setof relation_tracking_table_1
AS $$
BEGIN
RETURN query select * from relation_tracking_table_1 order by 1 limit 10;
end;
$$ language plpgsql;
CREATE TABLE relation_tracking_table_2 (id int, nonid int);
-- use the relation-access in this session
select foo();
foo
---------------------------------------------------------------------
(6,0)
(7,0)
(8,0)
(9,0)
(10,0)
(11,0)
(12,0)
(13,0)
(14,0)
(15,0)
(10 rows)
-- we should be able to use sequential mode, as the previous multi-shard
-- relation access has been cleaned-up
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM relation_tracking_table_2;
count
---------------------------------------------------------------------
995
(1 row)
ROLLBACK;
BEGIN;
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM relation_tracking_table_2;
count
---------------------------------------------------------------------
995
(1 row)
COMMIT;
SET client_min_messages TO ERROR;
DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE;
RESET client_min_messages;
CREATE SCHEMA "Quoed.Schema"; CREATE SCHEMA "Quoed.Schema";
SET search_path TO "Quoed.Schema"; SET search_path TO "Quoed.Schema";
CREATE TABLE "long_constraint_upsert\_test" CREATE TABLE "long_constraint_upsert\_test"
@ -541,13 +617,9 @@ NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quot
ERROR: renaming constraints belonging to distributed tables is currently unsupported ERROR: renaming constraints belonging to distributed tables is currently unsupported
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; --INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
SET search_path TO single_node; SET search_path TO single_node;
SET client_min_messages TO ERROR;
DROP SCHEMA "Quoed.Schema" CASCADE; DROP SCHEMA "Quoed.Schema" CASCADE;
NOTICE: drop cascades to 5 other objects RESET client_min_messages;
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
drop cascades to table "Quoed.Schema".simple_table_name_90630528
drop cascades to table "Quoed.Schema".simple_table_name_90630529
drop cascades to table "Quoed.Schema".simple_table_name_90630530
drop cascades to table "Quoed.Schema".simple_table_name_90630531
-- test partitioned index creation with long name -- test partitioned index creation with long name
CREATE TABLE test_index_creation1 CREATE TABLE test_index_creation1
( (

View File

@ -252,6 +252,42 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST
DROP TABLE upsert_test; DROP TABLE upsert_test;
CREATE TABLE relation_tracking_table_1(id int, nonid int);
SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none');
INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0;
CREATE or REPLACE function foo()
returns setof relation_tracking_table_1
AS $$
BEGIN
RETURN query select * from relation_tracking_table_1 order by 1 limit 10;
end;
$$ language plpgsql;
CREATE TABLE relation_tracking_table_2 (id int, nonid int);
-- use the relation-access in this session
select foo();
-- we should be able to use sequential mode, as the previous multi-shard
-- relation access has been cleaned-up
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
SELECT count(*) FROM relation_tracking_table_2;
ROLLBACK;
BEGIN;
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
SELECT count(*) FROM relation_tracking_table_2;
COMMIT;
SET client_min_messages TO ERROR;
DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE;
RESET client_min_messages;
CREATE SCHEMA "Quoed.Schema"; CREATE SCHEMA "Quoed.Schema";
SET search_path TO "Quoed.Schema"; SET search_path TO "Quoed.Schema";
@ -280,7 +316,9 @@ ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooo
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *; --INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
SET search_path TO single_node; SET search_path TO single_node;
SET client_min_messages TO ERROR;
DROP SCHEMA "Quoed.Schema" CASCADE; DROP SCHEMA "Quoed.Schema" CASCADE;
RESET client_min_messages;
-- test partitioned index creation with long name -- test partitioned index creation with long name
CREATE TABLE test_index_creation1 CREATE TABLE test_index_creation1