mirror of https://github.com/citusdata/citus.git
Fix relation access tracking for local only transactions on release-11.0 (#6182)
Co-authored-by: Onder Kalaci <onderkalaci@gmail.com>release-11.0.onder.18.agu
parent
7df8588107
commit
56939f0d14
|
@ -971,7 +971,6 @@ ResetPlacementConnectionManagement(void)
|
||||||
hash_delete_all(ConnectionPlacementHash);
|
hash_delete_all(ConnectionPlacementHash);
|
||||||
hash_delete_all(ConnectionShardHash);
|
hash_delete_all(ConnectionShardHash);
|
||||||
hash_delete_all(ColocatedPlacementsHash);
|
hash_delete_all(ColocatedPlacementsHash);
|
||||||
ResetRelationAccessHash();
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NB: memory for ConnectionReference structs and subordinate data is
|
* NB: memory for ConnectionReference structs and subordinate data is
|
||||||
|
@ -1091,9 +1090,6 @@ InitPlacementConnectionManagement(void)
|
||||||
|
|
||||||
ConnectionShardHash = hash_create("citus connection cache (shardid)",
|
ConnectionShardHash = hash_create("citus connection cache (shardid)",
|
||||||
64, &info, hashFlags);
|
64, &info, hashFlags);
|
||||||
|
|
||||||
/* (relationId) = [relationAccessMode] hash */
|
|
||||||
AllocateRelationAccessHash();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -346,6 +346,7 @@ _PG_init(void)
|
||||||
InitializeBackendManagement();
|
InitializeBackendManagement();
|
||||||
InitializeConnectionManagement();
|
InitializeConnectionManagement();
|
||||||
InitPlacementConnectionManagement();
|
InitPlacementConnectionManagement();
|
||||||
|
InitRelationAccessHash();
|
||||||
InitializeCitusQueryStats();
|
InitializeCitusQueryStats();
|
||||||
InitializeSharedConnectionStats();
|
InitializeSharedConnectionStats();
|
||||||
InitializeLocallyReservedSharedConnections();
|
InitializeLocallyReservedSharedConnections();
|
||||||
|
|
|
@ -49,6 +49,8 @@ bool EnforceForeignKeyRestrictions = true;
|
||||||
(1 << (PLACEMENT_ACCESS_DDL + \
|
(1 << (PLACEMENT_ACCESS_DDL + \
|
||||||
PARALLEL_MODE_FLAG_OFFSET)))
|
PARALLEL_MODE_FLAG_OFFSET)))
|
||||||
|
|
||||||
|
MemoryContext RelationAcessContext = NULL;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Hash table mapping relations to the
|
* Hash table mapping relations to the
|
||||||
|
@ -86,8 +88,8 @@ typedef struct RelationAccessHashEntry
|
||||||
|
|
||||||
static HTAB *RelationAccessHash;
|
static HTAB *RelationAccessHash;
|
||||||
|
|
||||||
|
|
||||||
/* functions related to access recording */
|
/* functions related to access recording */
|
||||||
|
static void AllocateRelationAccessHash(void);
|
||||||
static void RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType);
|
static void RecordRelationAccessBase(Oid relationId, ShardPlacementAccessType accessType);
|
||||||
static void RecordPlacementAccessToCache(Oid relationId,
|
static void RecordPlacementAccessToCache(Oid relationId,
|
||||||
ShardPlacementAccessType accessType);
|
ShardPlacementAccessType accessType);
|
||||||
|
@ -122,6 +124,18 @@ static bool HoldsConflictingLockWithReferencedRelations(Oid relationId,
|
||||||
conflictingAccessMode);
|
conflictingAccessMode);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitRelationAccessHash performs initialization of the
|
||||||
|
* infrastructure in this file at backend start.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitRelationAccessHash(void)
|
||||||
|
{
|
||||||
|
/* allocate (relationId) = [relationAccessMode] hash */
|
||||||
|
AllocateRelationAccessHash();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Empty RelationAccessHash, without destroying the hash table itself.
|
* Empty RelationAccessHash, without destroying the hash table itself.
|
||||||
*/
|
*/
|
||||||
|
@ -135,19 +149,29 @@ ResetRelationAccessHash()
|
||||||
/*
|
/*
|
||||||
* Allocate RelationAccessHash.
|
* Allocate RelationAccessHash.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
AllocateRelationAccessHash(void)
|
AllocateRelationAccessHash(void)
|
||||||
{
|
{
|
||||||
HASHCTL info;
|
/*
|
||||||
|
* Create a single context for relation access related memory
|
||||||
|
* management. Doing so, instead of allocating in TopMemoryContext, makes
|
||||||
|
* it easier to associate used memory.
|
||||||
|
*/
|
||||||
|
RelationAcessContext = AllocSetContextCreateExtended(TopMemoryContext,
|
||||||
|
"Relation Access Context",
|
||||||
|
ALLOCSET_DEFAULT_MINSIZE,
|
||||||
|
ALLOCSET_DEFAULT_INITSIZE,
|
||||||
|
ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
|
|
||||||
|
HASHCTL info;
|
||||||
memset(&info, 0, sizeof(info));
|
memset(&info, 0, sizeof(info));
|
||||||
info.keysize = sizeof(RelationAccessHashKey);
|
info.keysize = sizeof(RelationAccessHashKey);
|
||||||
info.entrysize = sizeof(RelationAccessHashEntry);
|
info.entrysize = sizeof(RelationAccessHashEntry);
|
||||||
info.hash = tag_hash;
|
info.hash = tag_hash;
|
||||||
info.hcxt = ConnectionContext;
|
info.hcxt = RelationAcessContext;
|
||||||
uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
uint32 hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
|
||||||
|
|
||||||
RelationAccessHash = hash_create("citus connection cache (relationid)",
|
RelationAccessHash = hash_create("citus relation access cache (relationid)",
|
||||||
8, &info, hashFlags);
|
8, &info, hashFlags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#include "distributed/repartition_join_execution.h"
|
#include "distributed/repartition_join_execution.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/placement_connection.h"
|
#include "distributed/placement_connection.h"
|
||||||
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/shared_connection_stats.h"
|
#include "distributed/shared_connection_stats.h"
|
||||||
#include "distributed/subplan_execution.h"
|
#include "distributed/subplan_execution.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
@ -306,6 +307,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
ResetGlobalVariables();
|
ResetGlobalVariables();
|
||||||
|
ResetRelationAccessHash();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that we give the shared connections back to the shared
|
* Make sure that we give the shared connections back to the shared
|
||||||
|
@ -364,6 +366,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
AfterXactConnectionHandling(false);
|
AfterXactConnectionHandling(false);
|
||||||
|
|
||||||
ResetGlobalVariables();
|
ResetGlobalVariables();
|
||||||
|
ResetRelationAccessHash();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that we give the shared connections back to the shared
|
* Make sure that we give the shared connections back to the shared
|
||||||
|
|
|
@ -34,7 +34,7 @@ typedef enum RelationAccessMode
|
||||||
RELATION_PARALLEL_ACCESSED
|
RELATION_PARALLEL_ACCESSED
|
||||||
} RelationAccessMode;
|
} RelationAccessMode;
|
||||||
|
|
||||||
extern void AllocateRelationAccessHash(void);
|
extern void InitRelationAccessHash(void);
|
||||||
extern void ResetRelationAccessHash(void);
|
extern void ResetRelationAccessHash(void);
|
||||||
extern void RecordRelationAccessIfNonDistTable(Oid relationId,
|
extern void RecordRelationAccessIfNonDistTable(Oid relationId,
|
||||||
ShardPlacementAccessType accessType);
|
ShardPlacementAccessType accessType);
|
||||||
|
|
|
@ -503,6 +503,82 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
DROP TABLE upsert_test;
|
DROP TABLE upsert_test;
|
||||||
|
CREATE TABLE relation_tracking_table_1(id int, nonid int);
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0;
|
||||||
|
CREATE or REPLACE function foo()
|
||||||
|
returns setof relation_tracking_table_1
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN query select * from relation_tracking_table_1 order by 1 limit 10;
|
||||||
|
end;
|
||||||
|
$$ language plpgsql;
|
||||||
|
CREATE TABLE relation_tracking_table_2 (id int, nonid int);
|
||||||
|
-- use the relation-access in this session
|
||||||
|
select foo();
|
||||||
|
foo
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(6,0)
|
||||||
|
(7,0)
|
||||||
|
(8,0)
|
||||||
|
(9,0)
|
||||||
|
(10,0)
|
||||||
|
(11,0)
|
||||||
|
(12,0)
|
||||||
|
(13,0)
|
||||||
|
(14,0)
|
||||||
|
(15,0)
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
-- we should be able to use sequential mode, as the previous multi-shard
|
||||||
|
-- relation access has been cleaned-up
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO sequential;
|
||||||
|
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM relation_tracking_table_2;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
995
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$single_node.relation_tracking_table_2$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM relation_tracking_table_2;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
995
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE;
|
||||||
|
RESET client_min_messages;
|
||||||
CREATE SCHEMA "Quoed.Schema";
|
CREATE SCHEMA "Quoed.Schema";
|
||||||
SET search_path TO "Quoed.Schema";
|
SET search_path TO "Quoed.Schema";
|
||||||
CREATE TABLE "long_constraint_upsert\_test"
|
CREATE TABLE "long_constraint_upsert\_test"
|
||||||
|
@ -541,13 +617,9 @@ NOTICE: identifier "looo oooo ooooo ooooooooooooooooo oooooooo oooooooo ng quot
|
||||||
ERROR: renaming constraints belonging to distributed tables is currently unsupported
|
ERROR: renaming constraints belonging to distributed tables is currently unsupported
|
||||||
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
||||||
SET search_path TO single_node;
|
SET search_path TO single_node;
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||||
NOTICE: drop cascades to 5 other objects
|
RESET client_min_messages;
|
||||||
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
|
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630528
|
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630529
|
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630530
|
|
||||||
drop cascades to table "Quoed.Schema".simple_table_name_90630531
|
|
||||||
-- test partitioned index creation with long name
|
-- test partitioned index creation with long name
|
||||||
CREATE TABLE test_index_creation1
|
CREATE TABLE test_index_creation1
|
||||||
(
|
(
|
||||||
|
|
|
@ -252,6 +252,42 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONST
|
||||||
|
|
||||||
DROP TABLE upsert_test;
|
DROP TABLE upsert_test;
|
||||||
|
|
||||||
|
CREATE TABLE relation_tracking_table_1(id int, nonid int);
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_1', 'id', colocate_with := 'none');
|
||||||
|
INSERT INTO relation_tracking_table_1 select generate_series(6, 10000, 1), 0;
|
||||||
|
|
||||||
|
CREATE or REPLACE function foo()
|
||||||
|
returns setof relation_tracking_table_1
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN query select * from relation_tracking_table_1 order by 1 limit 10;
|
||||||
|
end;
|
||||||
|
$$ language plpgsql;
|
||||||
|
|
||||||
|
CREATE TABLE relation_tracking_table_2 (id int, nonid int);
|
||||||
|
|
||||||
|
-- use the relation-access in this session
|
||||||
|
select foo();
|
||||||
|
|
||||||
|
-- we should be able to use sequential mode, as the previous multi-shard
|
||||||
|
-- relation access has been cleaned-up
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_modify_mode TO sequential;
|
||||||
|
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
|
||||||
|
SELECT count(*) FROM relation_tracking_table_2;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO relation_tracking_table_2 select generate_series(6, 1000, 1), 0;
|
||||||
|
SELECT create_distributed_table('relation_tracking_table_2', 'id', colocate_with := 'none');
|
||||||
|
SELECT count(*) FROM relation_tracking_table_2;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP TABLE relation_tracking_table_2, relation_tracking_table_1 CASCADE;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
CREATE SCHEMA "Quoed.Schema";
|
CREATE SCHEMA "Quoed.Schema";
|
||||||
SET search_path TO "Quoed.Schema";
|
SET search_path TO "Quoed.Schema";
|
||||||
|
|
||||||
|
@ -280,7 +316,9 @@ ALTER TABLE simple_table_name RENAME CONSTRAINT "looo oooo ooooo ooooooooooooooo
|
||||||
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
--INSERT INTO simple_table_name (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT simple_constraint_name DO NOTHING RETURNING *;
|
||||||
|
|
||||||
SET search_path TO single_node;
|
SET search_path TO single_node;
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
-- test partitioned index creation with long name
|
-- test partitioned index creation with long name
|
||||||
CREATE TABLE test_index_creation1
|
CREATE TABLE test_index_creation1
|
||||||
|
|
Loading…
Reference in New Issue