mirror of https://github.com/citusdata/citus.git
Accept invalidations during COPY command to prevent inconsistent data among placements
COPY command is blocked on the shard metadata lock and once it acquires the lock it doesn't check the cache invalidations. In some cases this could lead to data inconsistency among the shard replicas if there exists concurrent shard placement addition.pull/1403/head
parent
3ec502b286
commit
c9df9f37d6
|
@ -72,6 +72,7 @@
|
|||
#include "nodes/makefuncs.h"
|
||||
#include "tsearch/ts_locale.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/memutils.h"
|
||||
|
@ -1751,6 +1752,14 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
LockShardListResources(shardIntervalList, ShareLock);
|
||||
|
||||
/*
|
||||
* We might have some concurrent metadata changes. In order to get the changes,
|
||||
* we first need to accept the cache invalidation messages and then access the
|
||||
* cache entry to trigger the cache entry rebuild.
|
||||
*/
|
||||
AcceptInvalidationMessages();
|
||||
cacheEntry = DistributedTableCacheEntry(tableId);
|
||||
|
||||
/* keep the table metadata to avoid looking it up for every tuple */
|
||||
copyDest->tableMetadata = cacheEntry;
|
||||
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-add-second-worker s2-begin s2-copy-to-reference-table s1-commit s2-commit s2-print-content s1-remove-second-worker s1-begin s1-add-second-worker s2-begin s2-copy-to-reference-table s1-commit s2-commit s2-print-content
|
||||
create_reference_table
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-add-second-worker:
|
||||
SELECT master_add_node('localhost', 57638);
|
||||
|
||||
master_add_node
|
||||
|
||||
(4,4,localhost,57638,default,f,t)
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-copy-to-reference-table:
|
||||
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-copy-to-reference-table: <... completed>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-content:
|
||||
SELECT run_command_on_placements('test_reference_table', 'select count(*) from %s');
|
||||
|
||||
run_command_on_placements
|
||||
|
||||
(localhost,57637,102197,t,5)
|
||||
(localhost,57638,102197,t,5)
|
||||
step s1-remove-second-worker:
|
||||
SELECT master_remove_node('localhost', 57638);
|
||||
|
||||
master_remove_node
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-add-second-worker:
|
||||
SELECT master_add_node('localhost', 57638);
|
||||
|
||||
master_add_node
|
||||
|
||||
(5,5,localhost,57638,default,f,t)
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-copy-to-reference-table:
|
||||
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-copy-to-reference-table: <... completed>
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-content:
|
||||
SELECT run_command_on_placements('test_reference_table', 'select count(*) from %s');
|
||||
|
||||
run_command_on_placements
|
||||
|
||||
(localhost,57637,102197,t,10)
|
||||
(localhost,57638,102197,t,10)
|
||||
master_add_node
|
||||
|
||||
(5,5,localhost,57638,default,f,t)
|
|
@ -2,3 +2,4 @@ test: isolation_cluster_management
|
|||
test: isolation_dml_vs_repair
|
||||
test: isolation_concurrent_dml
|
||||
test: isolation_drop_shards
|
||||
test: isolation_add_node_vs_copy
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
setup
|
||||
{
|
||||
truncate pg_dist_shard_placement;
|
||||
truncate pg_dist_shard;
|
||||
truncate pg_dist_partition;
|
||||
truncate pg_dist_colocation;
|
||||
truncate pg_dist_node;
|
||||
|
||||
SELECT master_add_node('localhost', 57637);
|
||||
|
||||
CREATE TABLE test_reference_table (test_id integer);
|
||||
SELECT create_reference_table('test_reference_table');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS test_reference_table CASCADE;
|
||||
|
||||
SELECT master_add_node('localhost', 57637);
|
||||
SELECT master_add_node('localhost', 57638);
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-add-second-worker"
|
||||
{
|
||||
SELECT master_add_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s1-remove-second-worker"
|
||||
{
|
||||
SELECT master_remove_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-copy-to-reference-table"
|
||||
{
|
||||
COPY test_reference_table FROM PROGRAM 'echo "1\n2\n3\n4\n5"';
|
||||
}
|
||||
|
||||
step "s2-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s2-print-content"
|
||||
{
|
||||
SELECT run_command_on_placements('test_reference_table', 'select count(*) from %s');
|
||||
}
|
||||
|
||||
# verify that copy gets the invalidation and re-builts its metadata cache
|
||||
# note that we need to run the same test twice to ensure that metadata is cached
|
||||
# otherwise the test would be useless since the cache would be empty and the
|
||||
# metadata data is gathered from the tables directly
|
||||
permutation "s1-begin" "s1-add-second-worker" "s2-begin" "s2-copy-to-reference-table" "s1-commit" "s2-commit" "s2-print-content" "s1-remove-second-worker" "s1-begin" "s1-add-second-worker" "s2-begin" "s2-copy-to-reference-table" "s1-commit" "s2-commit" "s2-print-content"
|
Loading…
Reference in New Issue