Prevent reference tables being dropped when replicating reference tables

pull/3637/head
Hadi Moshayedi 2020-04-03 10:28:18 -07:00
parent 924cd7343a
commit 0758a81287
13 changed files with 308 additions and 198 deletions

View File

@ -26,6 +26,8 @@
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include <distributed/metadata_sync.h>
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
#include <distributed/remote_commands.h>
#include <distributed/remote_commands.h>
#include "nodes/parsenodes.h"
@ -90,6 +92,13 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString)
continue;
}
if (IsReferenceTable(relationId))
{
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock);
}
/* invalidate foreign key cache if the table involved in any foreign key */
if (TableReferenced(relationId) || TableReferencing(relationId))
{

View File

@ -29,6 +29,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/version_compat.h"
@ -85,6 +86,13 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
continue;
}
if (IsReferenceTable(relationId))
{
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock);
}
/* invalidate foreign key cache if the table involved in any foreign key */
if ((TableReferenced(relationId) || TableReferencing(relationId)))
{

View File

@ -3591,6 +3591,55 @@ DistTableOidList(void)
}
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
* expected that this function will be called frequently, it is OK not to use index scan.
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
List *
ReferenceTableOidList()
{
ScanKeyData scanKey[1];
int scanKeyCount = 0;
List *referenceTableOidList = NIL;
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum);
char partitionMethod = heap_getattr(heapTuple,
Anum_pg_dist_partition_partmethod,
tupleDescriptor, &isNull);
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
referenceTableOidList = lappend_oid(referenceTableOidList, relationId);
}
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, AccessShareLock);
return referenceTableOidList;
}
/*
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
* any change happens on pg_dist_node table. It also set WorkerNodeHash to

View File

@ -41,8 +41,8 @@
/* local function forward declarations */
static List * WorkersWithoutReferenceTablePlacement(uint64 shardId);
static void CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *newWorkerNode);
static void CopyShardPlacementToWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *workerNode, const char *userName);
static void ReplicateSingleShardTableToAllNodes(Oid relationId);
static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
@ -100,10 +100,25 @@ replicate_reference_tables(PG_FUNCTION_ARGS)
void
EnsureReferenceTablesExistOnAllNodes(void)
{
/*
* TODO: remove this. This is here so we don't have to update regression test
* outputs because of reference table colocation id being changed. Specially
* multi_colocation_utils which requires the test itself being updated.
*/
if (ReferenceTableOidList() == NIL)
{
return;
}
/* prevent this function from running concurrently with itself */
int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock);
List *referenceTableIdList = ReferenceTableOidList();
if (list_length(referenceTableIdList) == 0)
if (referenceTableIdList == NIL)
{
/* no reference tables exist */
UnlockColocationId(colocationId, ExclusiveLock);
return;
}
@ -119,10 +134,6 @@ EnsureReferenceTablesExistOnAllNodes(void)
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
/* prevent this function from running concurrently with itself */
int colocationId = TableColocationId(referenceTableId);
LockColocationId(colocationId, ExclusiveLock);
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId);
if (list_length(newWorkersList) == 0)
{
@ -168,7 +179,13 @@ EnsureReferenceTablesExistOnAllNodes(void)
WorkerNode *newWorkerNode = NULL;
foreach_ptr(newWorkerNode, newWorkersList)
{
CopyShardPlacementToNewWorkerNode(sourceShardPlacement, newWorkerNode);
/*
* Call master_copy_shard_placement using citus extension owner. Current
* user might not have permissions to do the copy.
*/
const char *userName = CitusExtensionOwnerName();
CopyShardPlacementToWorkerNode(sourceShardPlacement, newWorkerNode,
userName);
}
/*
@ -235,16 +252,16 @@ WorkersWithoutReferenceTablePlacement(uint64 shardId)
/*
* CopyShardPlacementToNewWorkerNode runs master_copy_shard_placement in a
* subtransaction by connecting to localhost.
* CopyShardPlacementToWorkerNode runs master_copy_shard_placement
* using the given username by connecting to localhost.
*/
static void
CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *newWorkerNode)
CopyShardPlacementToWorkerNode(ShardPlacement *sourceShardPlacement,
WorkerNode *workerNode,
const char *userName)
{
int connectionFlags = OUTSIDE_TRANSACTION;
StringInfo queryString = makeStringInfo();
const char *userName = CitusExtensionOwnerName();
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags, "localhost", PostPortNumber,
@ -256,8 +273,10 @@ CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement,
sourceShardPlacement->shardId,
quote_literal_cstr(sourceShardPlacement->nodeName),
sourceShardPlacement->nodePort,
quote_literal_cstr(newWorkerNode->workerName),
newWorkerNode->workerPort);
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort);
elog(DEBUG3, "%s", queryString->data);
ExecuteCriticalRemoteCommand(connection, queryString->data);
}
@ -605,34 +624,6 @@ DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
}
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
* expected that this function will be called frequently, it is OK not to use index scan.
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
List *
ReferenceTableOidList()
{
List *referenceTableList = NIL;
List *distTableOidList = DistTableOidList();
Oid relationId = InvalidOid;
foreach_oid(relationId, distTableOidList)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
referenceTableList = lappend_oid(referenceTableList, relationId);
}
}
return referenceTableList;
}
/* CompareOids is a comparison function for sort shard oids */
int
CompareOids(const void *leftElement, const void *rightElement)

View File

@ -130,6 +130,7 @@ extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid,
objsubid);
extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern List * ReferenceTableOidList(void);
extern Oid LookupShardRelation(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);

View File

@ -20,7 +20,6 @@ extern bool IsReferenceTable(Oid relationId);
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern uint32 CreateReferenceTableColocationId(void);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
extern List * ReferenceTableOidList(void);
extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void);

View File

@ -1122,6 +1122,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT

View File

@ -605,3 +605,42 @@ master_remove_node
starting permutation: s1-add-second-worker s2-begin s1-begin s1-drop-reference-table s2-replicate-reference-tables s1-commit s2-commit
create_distributed_table
step s1-add-second-worker:
SELECT 1 FROM master_add_node('localhost', 57638);
?column?
1
step s2-begin:
BEGIN;
step s1-begin:
BEGIN;
step s1-drop-reference-table:
DROP TABLE test_reference_table;
step s2-replicate-reference-tables:
SET client_min_messages TO DEBUG2;
SELECT replicate_reference_tables();
RESET client_min_messages;
<waiting ...>
step s1-commit:
COMMIT;
step s2-replicate-reference-tables: <... completed>
replicate_reference_tables
step s2-commit:
COMMIT;
master_remove_node

View File

@ -480,9 +480,7 @@ step s2-commit:
COMMIT;
step s1-remove-node-2: <... completed>
master_remove_node
error in steps s2-commit s1-remove-node-2: ERROR: you cannot remove the primary node of a node group which has shard placements
step s2-select:
SELECT * FROM dist_table;
@ -491,3 +489,4 @@ x y
master_remove_node

View File

@ -28,11 +28,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
357 356 f
360 359 f
transactionnumberwaitingtransactionnumbers
356
357 356
359
360 359
step s1-abort:
ABORT;
@ -75,14 +75,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
361 360 f
362 360 f
362 361 t
364 363 f
365 363 f
365 364 t
transactionnumberwaitingtransactionnumbers
360
361 360
362 360,361
363
364 363
365 363,364
step s1-abort:
ABORT;

View File

@ -1013,7 +1013,7 @@ master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s3-begin s1-add-worker s2-create-schema s3-create-schema2 s2-create-table s3-create-table s1-commit s2-commit s3-commit s2-print-distributed-objects
starting permutation: s1-print-distributed-objects s1-begin s2-begin s3-begin s1-add-worker s2-create-schema s3-create-schema2 s2-create-table s3-create-table s1-commit s3-commit s2-commit s2-print-distributed-objects
?column?
1
@ -1109,10 +1109,10 @@ step s3-create-table: <... completed>
create_distributed_table
step s2-commit:
step s3-commit:
COMMIT;
step s3-commit:
step s2-commit:
COMMIT;
step s2-print-distributed-objects:

View File

@ -16,9 +16,9 @@ setup
// ensure neither node's added for the remaining of the isolation tests
teardown
{
DROP TABLE test_reference_table;
DROP TABLE test_reference_table_2;
DROP TABLE test_table;
DROP TABLE IF EXISTS test_reference_table;
DROP TABLE IF EXISTS test_reference_table_2;
DROP TABLE IF EXISTS test_table;
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
@ -39,6 +39,11 @@ step "s1-remove-second-worker"
SELECT master_remove_node('localhost', 57638);
}
step "s1-drop-reference-table"
{
DROP TABLE test_reference_table;
}
step "s1-commit"
{
COMMIT;
@ -58,6 +63,13 @@ step "s2-copy-to-reference-table"
COPY test_reference_table FROM PROGRAM 'echo 1 && echo 2 && echo 3 && echo 4 && echo 5';
}
step "s2-replicate-reference-tables"
{
SET client_min_messages TO DEBUG2;
SELECT replicate_reference_tables();
RESET client_min_messages;
}
step "s2-insert-to-reference-table"
{
INSERT INTO test_reference_table VALUES (6);
@ -137,3 +149,5 @@ permutation "s2-begin" "s2-ddl-on-reference-table" "s1-add-second-worker" "s2-co
permutation "s1-begin" "s1-add-second-worker" "s2-create-reference-table-2" "s1-commit" "s2-print-content-2"
permutation "s2-begin" "s2-create-reference-table-2" "s1-add-second-worker" "s2-commit" "s2-print-content-2"
// verify drop table blocks replicate reference tables
permutation "s1-add-second-worker" "s2-begin" "s1-begin" "s1-drop-reference-table" "s2-replicate-reference-tables" "s1-commit" "s2-commit"

View File

@ -206,7 +206,7 @@ permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-sche
// concurrency tests with multi schema distribution
permutation "s1-print-distributed-objects" "s2-create-schema" "s1-begin" "s2-begin" "s3-begin" "s1-add-worker" "s2-create-table" "s3-use-schema" "s3-create-table" "s1-commit" "s2-commit" "s3-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-add-worker" "s2-create-schema" "s2-begin" "s3-begin" "s3-use-schema" "s2-create-table" "s3-create-table" "s2-commit" "s3-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s3-begin" "s1-add-worker" "s2-create-schema" "s3-create-schema2" "s2-create-table" "s3-create-table" "s1-commit" "s2-commit" "s3-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s3-begin" "s1-add-worker" "s2-create-schema" "s3-create-schema2" "s2-create-table" "s3-create-table" "s1-commit" "s3-commit" "s2-commit" "s2-print-distributed-objects"
// type and schema tests
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-type" "s1-commit" "s2-print-distributed-objects"