mirror of https://github.com/citusdata/citus.git
Merge pull request #914 from citusdata/bugfix/minimal-127
Invalidate relcache after pg_dist_shard_placement changes.pull/909/head
commit
837ec67c80
|
@ -8,7 +8,7 @@ EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -86,6 +86,8 @@ $(EXTENSION)--6.0-13.sql: $(EXTENSION)--6.0-12.sql $(EXTENSION)--6.0-12--6.0-13.
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.0-14.sql: $(EXTENSION)--6.0-13.sql $(EXTENSION)--6.0-13--6.0-14.sql
|
$(EXTENSION)--6.0-14.sql: $(EXTENSION)--6.0-13.sql $(EXTENSION)--6.0-13--6.0-14.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.0-15.sql: $(EXTENSION)--6.0-14.sql $(EXTENSION)--6.0-14--6.0-15.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
/* citus--6.0-14--6.0-15.sql */
|
||||||
|
|
||||||
|
|
||||||
|
CREATE FUNCTION pg_catalog.master_dist_placement_cache_invalidate()
|
||||||
|
RETURNS trigger
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'MODULE_PATHNAME', $$master_dist_placement_cache_invalidate$$;
|
||||||
|
COMMENT ON FUNCTION master_dist_placement_cache_invalidate()
|
||||||
|
IS 'register relcache invalidation for changed placements';
|
||||||
|
|
||||||
|
CREATE TRIGGER dist_placement_cache_invalidate
|
||||||
|
AFTER INSERT OR UPDATE OR DELETE
|
||||||
|
ON pg_catalog.pg_dist_shard_placement
|
||||||
|
FOR EACH ROW EXECUTE PROCEDURE master_dist_placement_cache_invalidate();
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.0-14'
|
default_version = '6.0-15'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -451,6 +451,8 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
|
||||||
simple_heap_insert(pgDistShardPlacement, heapTuple);
|
simple_heap_insert(pgDistShardPlacement, heapTuple);
|
||||||
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
|
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
|
||||||
|
|
||||||
|
CitusInvalidateRelcacheByShardId(shardId);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
heap_close(pgDistShardPlacement, RowExclusiveLock);
|
heap_close(pgDistShardPlacement, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
@ -565,6 +567,8 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
|
||||||
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
|
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
||||||
|
CitusInvalidateRelcacheByShardId(shardId);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
heap_close(pgDistShardPlacement, RowExclusiveLock);
|
heap_close(pgDistShardPlacement, RowExclusiveLock);
|
||||||
|
|
||||||
|
@ -589,6 +593,8 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
|
||||||
Datum values[Natts_pg_dist_shard_placement];
|
Datum values[Natts_pg_dist_shard_placement];
|
||||||
bool isnull[Natts_pg_dist_shard_placement];
|
bool isnull[Natts_pg_dist_shard_placement];
|
||||||
bool replace[Natts_pg_dist_shard_placement];
|
bool replace[Natts_pg_dist_shard_placement];
|
||||||
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
|
bool colIsNull = false;
|
||||||
|
|
||||||
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
|
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
|
||||||
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
|
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
|
||||||
|
@ -617,6 +623,13 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
|
||||||
simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple);
|
simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple);
|
||||||
|
|
||||||
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
|
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
|
||||||
|
|
||||||
|
shardId = DatumGetInt64(heap_getattr(heapTuple,
|
||||||
|
Anum_pg_dist_shard_placement_shardid,
|
||||||
|
tupleDescriptor, &colIsNull));
|
||||||
|
Assert(!colIsNull);
|
||||||
|
CitusInvalidateRelcacheByShardId(shardId);
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
#include "distributed/pg_dist_node.h"
|
#include "distributed/pg_dist_node.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/pg_dist_shard.h"
|
#include "distributed/pg_dist_shard.h"
|
||||||
|
#include "distributed/pg_dist_shard_placement.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -119,6 +120,7 @@ static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_partition_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_shard_cache_invalidate);
|
||||||
|
PG_FUNCTION_INFO_V1(master_dist_placement_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_node_cache_invalidate);
|
||||||
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
|
PG_FUNCTION_INFO_V1(master_dist_local_group_cache_invalidate);
|
||||||
|
|
||||||
|
@ -1056,6 +1058,68 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_dist_placmeent_cache_invalidate is a trigger function that performs
|
||||||
|
* relcache invalidations when the contents of pg_dist_shard_placement are
|
||||||
|
* changed on the SQL level.
|
||||||
|
*
|
||||||
|
* NB: We decided there is little point in checking permissions here, there
|
||||||
|
* are much easier ways to waste CPU than causing cache invalidations.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TriggerData *triggerData = (TriggerData *) fcinfo->context;
|
||||||
|
HeapTuple newTuple = NULL;
|
||||||
|
HeapTuple oldTuple = NULL;
|
||||||
|
Oid oldShardId = InvalidOid;
|
||||||
|
Oid newShardId = InvalidOid;
|
||||||
|
|
||||||
|
if (!CALLED_AS_TRIGGER(fcinfo))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
|
||||||
|
errmsg("must be called as trigger")));
|
||||||
|
}
|
||||||
|
|
||||||
|
newTuple = triggerData->tg_newtuple;
|
||||||
|
oldTuple = triggerData->tg_trigtuple;
|
||||||
|
|
||||||
|
/* collect shardid for OLD and NEW tuple */
|
||||||
|
if (oldTuple != NULL)
|
||||||
|
{
|
||||||
|
Form_pg_dist_shard_placement distPlacement =
|
||||||
|
(Form_pg_dist_shard_placement) GETSTRUCT(oldTuple);
|
||||||
|
|
||||||
|
oldShardId = distPlacement->shardid;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newTuple != NULL)
|
||||||
|
{
|
||||||
|
Form_pg_dist_shard_placement distPlacement =
|
||||||
|
(Form_pg_dist_shard_placement) GETSTRUCT(newTuple);
|
||||||
|
|
||||||
|
newShardId = distPlacement->shardid;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Invalidate relcache for the relevant relation(s). In theory shardId
|
||||||
|
* should never change, but it doesn't hurt to be paranoid.
|
||||||
|
*/
|
||||||
|
if (oldShardId != InvalidOid &&
|
||||||
|
oldShardId != newShardId)
|
||||||
|
{
|
||||||
|
CitusInvalidateRelcacheByShardId(oldShardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newShardId != InvalidOid)
|
||||||
|
{
|
||||||
|
CitusInvalidateRelcacheByShardId(newShardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_dist_node_cache_invalidate is a trigger function that performs
|
* master_dist_node_cache_invalidate is a trigger function that performs
|
||||||
* relcache invalidations when the contents of pg_dist_node are changed
|
* relcache invalidations when the contents of pg_dist_node are changed
|
||||||
|
@ -1772,3 +1836,64 @@ CitusInvalidateRelcacheByRelid(Oid relationId)
|
||||||
ReleaseSysCache(classTuple);
|
ReleaseSysCache(classTuple);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Register a relcache invalidation for the distributed relation associated
|
||||||
|
* with the shard.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CitusInvalidateRelcacheByShardId(int64 shardId)
|
||||||
|
{
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
Form_pg_dist_shard shardForm = NULL;
|
||||||
|
Relation pgDistShard = heap_open(DistShardRelationId(), AccessShareLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Load shard, to find the associated relation id. Can't use
|
||||||
|
* LoadShardInterval directly because that'd fail if the shard doesn't
|
||||||
|
* exist anymore, which we can't have. Also lower overhead is desirable
|
||||||
|
* here.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
|
||||||
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(pgDistShard,
|
||||||
|
DistShardShardidIndexId(), true,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
shardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
|
||||||
|
CitusInvalidateRelcacheByRelid(shardForm->logicalrelid);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Couldn't find associated relation. That can primarily happen in two cases:
|
||||||
|
*
|
||||||
|
* 1) A placement row is inserted before the shard row. That's fine,
|
||||||
|
* since we don't need invalidations via placements in that case.
|
||||||
|
*
|
||||||
|
* 2) The shard has been deleted, but some placements were
|
||||||
|
* unreachable, and the user is manually deleting the rows. Not
|
||||||
|
* much point in WARNING or ERRORing in that case either, there's
|
||||||
|
* nothing to invalidate.
|
||||||
|
*
|
||||||
|
* Hence we just emit a DEBUG5 message.
|
||||||
|
*/
|
||||||
|
ereport(DEBUG5, (errmsg("could not find distributed relation to invalidate for "
|
||||||
|
"shard "INT64_FORMAT, shardId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistShard, NoLock);
|
||||||
|
|
||||||
|
/* bump command counter, to force invalidation to take effect */
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
||||||
extern int GetLocalGroupId(void);
|
extern int GetLocalGroupId(void);
|
||||||
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
||||||
|
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
|
||||||
extern void CitusInvalidateNodeCache(void);
|
extern void CitusInvalidateNodeCache(void);
|
||||||
|
|
||||||
extern bool CitusHasBeenLoaded(void);
|
extern bool CitusHasBeenLoaded(void);
|
||||||
|
|
|
@ -54,6 +54,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-11';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM pg_depend AS pgd,
|
FROM pg_depend AS pgd,
|
||||||
|
|
|
@ -820,5 +820,95 @@ SELECT * FROM prepare_table ORDER BY key, value;
|
||||||
0 |
|
0 |
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
|
-- verify placement state updates invalidate shard state
|
||||||
|
--
|
||||||
|
-- We use a immutable function to check for that. The planner will
|
||||||
|
-- evaluate it once during planning, during execution it should never
|
||||||
|
-- be reached (no rows). That way we'll see a NOTICE when
|
||||||
|
-- (re-)planning, but not when executing.
|
||||||
|
-- first create helper function
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :master_port
|
||||||
|
-- test table
|
||||||
|
CREATE TABLE test_table (test_id integer NOT NULL, data text);
|
||||||
|
SELECT master_create_distributed_table('test_table', 'test_id', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('test_table', 2, 2);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- avoid 9.6+ only context messages
|
||||||
|
\set VERBOSITY terse
|
||||||
|
--plain statement, needs planning
|
||||||
|
SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning');
|
||||||
|
NOTICE: replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
--prepared statement
|
||||||
|
PREPARE countsome AS SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning');
|
||||||
|
EXECUTE countsome; -- should indicate planning
|
||||||
|
NOTICE: replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- invalidate half of the placements using SQL, should invalidate via trigger
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = '3'
|
||||||
|
WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass)
|
||||||
|
AND nodeport = :worker_1_port;
|
||||||
|
EXECUTE countsome; -- should indicate replanning
|
||||||
|
NOTICE: replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- repair shards, should invalidate via master_metadata_utility.c
|
||||||
|
SELECT master_copy_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||||
|
FROM pg_dist_shard_placement
|
||||||
|
WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass)
|
||||||
|
AND nodeport = :worker_1_port;
|
||||||
|
master_copy_shard_placement
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
EXECUTE countsome; -- should indicate replanning
|
||||||
|
NOTICE: replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- reset
|
||||||
|
\set VERBOSITY default
|
||||||
-- clean-up prepared statements
|
-- clean-up prepared statements
|
||||||
DEALLOCATE ALL;
|
DEALLOCATE ALL;
|
||||||
|
|
|
@ -54,6 +54,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-11';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
|
|
|
@ -456,5 +456,56 @@ EXECUTE prepared_non_partition_parameter_delete(62);
|
||||||
-- check after deletes
|
-- check after deletes
|
||||||
SELECT * FROM prepare_table ORDER BY key, value;
|
SELECT * FROM prepare_table ORDER BY key, value;
|
||||||
|
|
||||||
|
|
||||||
|
-- verify placement state updates invalidate shard state
|
||||||
|
--
|
||||||
|
-- We use a immutable function to check for that. The planner will
|
||||||
|
-- evaluate it once during planning, during execution it should never
|
||||||
|
-- be reached (no rows). That way we'll see a NOTICE when
|
||||||
|
-- (re-)planning, but not when executing.
|
||||||
|
|
||||||
|
-- first create helper function
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
CREATE OR REPLACE FUNCTION immutable_bleat(text) RETURNS int LANGUAGE plpgsql IMMUTABLE AS $$BEGIN RAISE NOTICE '%', $1;RETURN 1;END$$;
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- test table
|
||||||
|
CREATE TABLE test_table (test_id integer NOT NULL, data text);
|
||||||
|
SELECT master_create_distributed_table('test_table', 'test_id', 'hash');
|
||||||
|
SELECT master_create_worker_shards('test_table', 2, 2);
|
||||||
|
|
||||||
|
-- avoid 9.6+ only context messages
|
||||||
|
\set VERBOSITY terse
|
||||||
|
|
||||||
|
--plain statement, needs planning
|
||||||
|
SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning');
|
||||||
|
--prepared statement
|
||||||
|
PREPARE countsome AS SELECT count(*) FROM test_table HAVING COUNT(*) = immutable_bleat('replanning');
|
||||||
|
EXECUTE countsome; -- should indicate planning
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
|
||||||
|
-- invalidate half of the placements using SQL, should invalidate via trigger
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = '3'
|
||||||
|
WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass)
|
||||||
|
AND nodeport = :worker_1_port;
|
||||||
|
EXECUTE countsome; -- should indicate replanning
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
|
||||||
|
-- repair shards, should invalidate via master_metadata_utility.c
|
||||||
|
SELECT master_copy_shard_placement(shardid, 'localhost', :worker_2_port, 'localhost', :worker_1_port)
|
||||||
|
FROM pg_dist_shard_placement
|
||||||
|
WHERE shardid IN (
|
||||||
|
SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass)
|
||||||
|
AND nodeport = :worker_1_port;
|
||||||
|
EXECUTE countsome; -- should indicate replanning
|
||||||
|
EXECUTE countsome; -- no replanning
|
||||||
|
|
||||||
|
-- reset
|
||||||
|
\set VERBOSITY default
|
||||||
|
|
||||||
-- clean-up prepared statements
|
-- clean-up prepared statements
|
||||||
DEALLOCATE ALL;
|
DEALLOCATE ALL;
|
||||||
|
|
Loading…
Reference in New Issue