From fcd150c7c8b77224c235e61bdb26fb66957de36d Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 26 Oct 2016 03:36:35 -0700 Subject: [PATCH] Invalidate relcache after pg_dist_shard_placement changes. This forces prepared statements to be re-planned after changes of the placement metadata. There's some locking issues remaining, but that's a a separate task. Also add regression tests verifying that invalidations take effect on prepared statements. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.0-14--6.0-15.sql | 14 ++ src/backend/distributed/citus.control | 2 +- .../master/master_metadata_utility.c | 13 ++ .../distributed/utils/metadata_cache.c | 125 ++++++++++++++++++ src/include/distributed/metadata_cache.h | 1 + src/test/regress/expected/multi_extension.out | 1 + .../regress/expected/multi_prepare_sql.out | 90 +++++++++++++ src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_prepare_sql.sql | 51 +++++++ 10 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-14--6.0-15.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index ecd86bc48..2e4d9ff3a 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -86,6 +86,8 @@ $(EXTENSION)--6.0-13.sql: $(EXTENSION)--6.0-12.sql $(EXTENSION)--6.0-12--6.0-13. cat $^ > $@ $(EXTENSION)--6.0-14.sql: $(EXTENSION)--6.0-13.sql $(EXTENSION)--6.0-13--6.0-14.sql cat $^ > $@ +$(EXTENSION)--6.0-15.sql: $(EXTENSION)--6.0-14.sql $(EXTENSION)--6.0-14--6.0-15.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-14--6.0-15.sql b/src/backend/distributed/citus--6.0-14--6.0-15.sql new file mode 100644 index 000000000..56a165fbd --- /dev/null +++ b/src/backend/distributed/citus--6.0-14--6.0-15.sql @@ -0,0 +1,14 @@ +/* citus--6.0-14--6.0-15.sql */ + + +CREATE FUNCTION pg_catalog.master_dist_placement_cache_invalidate() + RETURNS trigger + LANGUAGE C + AS 'MODULE_PATHNAME', $$master_dist_placement_cache_invalidate$$; +COMMENT ON FUNCTION master_dist_placement_cache_invalidate() + IS 'register relcache invalidation for changed placements'; + +CREATE TRIGGER dist_placement_cache_invalidate + AFTER INSERT OR UPDATE OR DELETE + ON pg_catalog.pg_dist_shard_placement + FOR EACH ROW EXECUTE PROCEDURE master_dist_placement_cache_invalidate(); diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index c2ea5fe3c..557f71bc8 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-14' +default_version = '6.0-15' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index f0e067a77..86449369a 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -451,6 +451,8 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, simple_heap_insert(pgDistShardPlacement, heapTuple); CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); + CitusInvalidateRelcacheByShardId(shardId); + CommandCounterIncrement(); heap_close(pgDistShardPlacement, RowExclusiveLock); } @@ -565,6 +567,8 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); systable_endscan(scanDescriptor); + CitusInvalidateRelcacheByShardId(shardId); + CommandCounterIncrement(); heap_close(pgDistShardPlacement, RowExclusiveLock); @@ -589,6 +593,8 @@ UpdateShardPlacementState(uint64 placementId, char shardState) Datum values[Natts_pg_dist_shard_placement]; bool isnull[Natts_pg_dist_shard_placement]; bool replace[Natts_pg_dist_shard_placement]; + uint64 shardId = INVALID_SHARD_ID; + bool colIsNull = false; pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); tupleDescriptor = RelationGetDescr(pgDistShardPlacement); @@ -617,6 +623,13 @@ UpdateShardPlacementState(uint64 placementId, char shardState) simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple); CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); + + shardId = DatumGetInt64(heap_getattr(heapTuple, + Anum_pg_dist_shard_placement_shardid, + tupleDescriptor, &colIsNull)); + Assert(!colIsNull); + CitusInvalidateRelcacheByShardId(shardId); + CommandCounterIncrement(); systable_endscan(scanDescriptor); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 5170aabf0..1abf588fd 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -31,6 +31,7 @@ #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/pg_dist_shard_placement.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -119,6 +120,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate); +PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate); PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate); @@ -1056,6 +1058,68 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS) } +/* + * master_dist_placmeent_cache_invalidate is a trigger function that performs + * relcache invalidations when the contents of pg_dist_shard_placement are + * changed on the SQL level. + * + * NB: We decided there is little point in checking permissions here, there + * are much easier ways to waste CPU than causing cache invalidations. + */ +Datum +master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS) +{ + TriggerData *triggerData = (TriggerData *) fcinfo->context; + HeapTuple newTuple = NULL; + HeapTuple oldTuple = NULL; + Oid oldShardId = InvalidOid; + Oid newShardId = InvalidOid; + + if (!CALLED_AS_TRIGGER(fcinfo)) + { + ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("must be called as trigger"))); + } + + newTuple = triggerData->tg_newtuple; + oldTuple = triggerData->tg_trigtuple; + + /* collect shardid for OLD and NEW tuple */ + if (oldTuple != NULL) + { + Form_pg_dist_shard_placement distPlacement = + (Form_pg_dist_shard_placement) GETSTRUCT(oldTuple); + + oldShardId = distPlacement->shardid; + } + + if (newTuple != NULL) + { + Form_pg_dist_shard_placement distPlacement = + (Form_pg_dist_shard_placement) GETSTRUCT(newTuple); + + newShardId = distPlacement->shardid; + } + + /* + * Invalidate relcache for the relevant relation(s). In theory shardId + * should never change, but it doesn't hurt to be paranoid. + */ + if (oldShardId != InvalidOid && + oldShardId != newShardId) + { + CitusInvalidateRelcacheByShardId(oldShardId); + } + + if (newShardId != InvalidOid) + { + CitusInvalidateRelcacheByShardId(newShardId); + } + + PG_RETURN_DATUM(PointerGetDatum(NULL)); +} + + /* * master_dist_node_cache_invalidate is a trigger function that performs * relcache invalidations when the contents of pg_dist_node are changed @@ -1772,3 +1836,64 @@ CitusInvalidateRelcacheByRelid(Oid relationId) ReleaseSysCache(classTuple); } } + + +/* + * Register a relcache invalidation for the distributed relation associated + * with the shard. + */ +void +CitusInvalidateRelcacheByShardId(int64 shardId) +{ + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + Form_pg_dist_shard shardForm = NULL; + Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock); + + /* + * Load shard, to find the associated relation id. Can't use + * LoadShardInterval directly because that'd fail if the shard doesn't + * exist anymore, which we can't have. Also lower overhead is desirable + * here. + */ + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + scanDescriptor = systable_beginscan(pgDistShard, + DistShardShardidIndexId(), true, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + CitusInvalidateRelcacheByRelid(shardForm->logicalrelid); + } + else + { + /* + * Couldn't find associated relation. That can primarily happen in two cases: + * + * 1) A placement row is inserted before the shard row. That's fine, + * since we don't need invalidations via placements in that case. + * + * 2) The shard has been deleted, but some placements were + * unreachable, and the user is manually deleting the rows. Not + * much point in WARNING or ERRORing in that case either, there's + * nothing to invalidate. + * + * Hence we just emit a DEBUG5 message. + */ + ereport(DEBUG5, (errmsg("could not find distributed relation to invalidate for " + "shard "INT64_FORMAT, shardId))); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistShard, NoLock); + + /* bump command counter, to force invalidation to take effect */ + CommandCounterIncrement(); +} diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 3e64c80c5..a1c946eb7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -58,6 +58,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern int GetLocalGroupId(void); extern void CitusInvalidateRelcacheByRelid(Oid relationId); +extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateNodeCache(void); extern bool CitusHasBeenLoaded(void); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8d09e1011..119930e1d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -54,6 +54,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-11'; ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-13'; ALTER EXTENSION citus UPDATE TO '6.0-14'; +ALTER EXTENSION citus UPDATE TO '6.0-15'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_prepare_sql.out b/src/test/regress/expected/multi_prepare_sql.out index 15b4de64d..7ae77b3e9 100644 --- a/src/test/regress/expected/multi_prepare_sql.out +++ b/src/test/regress/expected/multi_prepare_sql.out @@ -820,5 +820,95 @@ SELECT * FROM prepare_table ORDER BY key, value; 0 | (6 rows) +-- verify placement state updates invalidate shard state +-- +-- We use a immutable function to check for that. The planner will +-- evaluate it once during planning, during execution it should never +-- be reached (no rows). That way we'll see a NOTICE when +-- (re-)planning, but not when executing. +-- first create helper function +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :worker_1_port +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :worker_2_port +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :master_port +-- test table +CREATE TABLE test_table (test_id integer NOT NULL, data text); +SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_table', 2, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +-- avoid 9.6+ only context messages +\set VERBOSITY terse +--plain statement, needs planning +SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning'); +NOTICE: replanning + count +------- +(0 rows) + +--prepared statement +PREPARE countsome AS SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning'); +EXECUTE countsome; -- should indicate planning +NOTICE: replanning + count +------- +(0 rows) + +EXECUTE countsome; -- no replanning + count +------- +(0 rows) + +-- invalidate half of the placements using SQL, should invalidate via trigger +UPDATE pg_dist_shard_placement SET shardstate = '3' +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) + AND nodeport = :worker_1_port; +EXECUTE countsome; -- should indicate replanning +NOTICE: replanning + count +------- +(0 rows) + +EXECUTE countsome; -- no replanning + count +------- +(0 rows) + +-- repair shards, should invalidate via master_metadata_utility.c +SELECT master_copy_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) +FROM pg_dist_shard_placement +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) + AND nodeport = :worker_1_port; + master_copy_shard_placement +----------------------------- + + +(2 rows) + +EXECUTE countsome; -- should indicate replanning +NOTICE: replanning + count +------- +(0 rows) + +EXECUTE countsome; -- no replanning + count +------- +(0 rows) + +-- reset +\set VERBOSITY default -- clean-up prepared statements DEALLOCATE ALL; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index fd4309f8d..63369f692 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -54,6 +54,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-11'; ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-13'; ALTER EXTENSION citus UPDATE TO '6.0-14'; +ALTER EXTENSION citus UPDATE TO '6.0-15'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_prepare_sql.sql b/src/test/regress/sql/multi_prepare_sql.sql index 5c467921c..8618b8f76 100644 --- a/src/test/regress/sql/multi_prepare_sql.sql +++ b/src/test/regress/sql/multi_prepare_sql.sql @@ -456,5 +456,56 @@ EXECUTE prepared_non_partition_parameter_delete(62); -- check after deletes SELECT * FROM prepare_table ORDER BY key, value; + +-- verify placement state updates invalidate shard state +-- +-- We use a immutable function to check for that. The planner will +-- evaluate it once during planning, during execution it should never +-- be reached (no rows). That way we'll see a NOTICE when +-- (re-)planning, but not when executing. + +-- first create helper function +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :worker_1_port +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :worker_2_port +CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$; +\c - - - :master_port + +-- test table +CREATE TABLE test_table (test_id integer NOT NULL, data text); +SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); +SELECT master_create_worker_shards('test_table', 2, 2); + +-- avoid 9.6+ only context messages +\set VERBOSITY terse + +--plain statement, needs planning +SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning'); +--prepared statement +PREPARE countsome AS SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning'); +EXECUTE countsome; -- should indicate planning +EXECUTE countsome; -- no replanning + +-- invalidate half of the placements using SQL, should invalidate via trigger +UPDATE pg_dist_shard_placement SET shardstate = '3' +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) + AND nodeport = :worker_1_port; +EXECUTE countsome; -- should indicate replanning +EXECUTE countsome; -- no replanning + +-- repair shards, should invalidate via master_metadata_utility.c +SELECT master_copy_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port) +FROM pg_dist_shard_placement +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) + AND nodeport = :worker_1_port; +EXECUTE countsome; -- should indicate replanning +EXECUTE countsome; -- no replanning + +-- reset +\set VERBOSITY default + -- clean-up prepared statements DEALLOCATE ALL;