mirror of https://github.com/citusdata/citus.git
Introduce code changes to fix Issue:6303 (#6328)
The PR introduces code changes to fix Issue [6303](https://github.com/citusdata/citus/issues/6303) `create_distributed_table_concurrently` following drop column, creates a buggy situation in split decoder. * Consider the below scenario: * Session1 : Drop column followed by create_distributed_table_concurrently * Session2 : Concurrent insert workload The child shards created by `create_distributed_table_concurrently` will have less columns than the source shard because some column were dropped. The incoming tuple from session2 will have more columns as the writes happened on source shard. But now the tuple needs to be applied on child shard. So we need to format existing tuple according to child schema and skip dropped column values. The PR fixes this by reformatting the tuple according the target child schema. Test: 1) isolation_create_distributed_concurrently_after_drop_column - Repros the issue and tests on the same.pull/6355/head
parent
77947da17c
commit
acccad9879
|
@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
|
|||
HeapTuple tuple,
|
||||
char *currentSlotName);
|
||||
|
||||
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
||||
TupleDesc sourceTupleDesc,
|
||||
TupleDesc targetTupleDesc);
|
||||
|
||||
/*
|
||||
* Postgres uses 'pgoutput' as default plugin for logical replication.
|
||||
* We want to reuse Postgres pgoutput's functionality as much as possible.
|
||||
|
@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|||
}
|
||||
|
||||
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
|
||||
|
||||
/*
|
||||
* If any columns from source relation have been dropped, then the tuple needs to
|
||||
* be formatted according to the target relation.
|
||||
*/
|
||||
TupleDesc sourceRelationDesc = RelationGetDescr(relation);
|
||||
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
|
||||
if (sourceRelationDesc->natts > targetRelationDesc->natts)
|
||||
{
|
||||
switch (change->action)
|
||||
{
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
{
|
||||
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||
{
|
||||
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
|
||||
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
|
||||
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
|
||||
|
||||
/*
|
||||
* Format oldtuple according to the target relation. If the column values of replica
|
||||
* identiy change, then the old tuple is non-null and needs to be formatted according
|
||||
* to the target relation schema.
|
||||
*/
|
||||
if (change->data.tp.oldtuple != NULL)
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||
sourceRelationOldTuple,
|
||||
sourceRelationDesc,
|
||||
targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
{
|
||||
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
|
||||
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
|
||||
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
|
||||
|
||||
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
|
||||
break;
|
||||
}
|
||||
|
||||
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
|
||||
default:
|
||||
ereport(ERROR, errmsg(
|
||||
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
|
||||
change->action));
|
||||
}
|
||||
}
|
||||
|
||||
pgoutputChangeCB(ctx, txn, targetRelation, change);
|
||||
RelationClose(targetRelation);
|
||||
}
|
||||
|
@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
|
|||
|
||||
return DatumGetInt32(hashedValueDatum);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTupleForTargetSchema returns a tuple with the schema of the target relation.
|
||||
* If some columns within the source relations are dropped, we would have to reformat
|
||||
* the tuple to match the schema of the target relation.
|
||||
*
|
||||
* Consider the below scenario:
|
||||
* Session1 : Drop column followed by create_distributed_table_concurrently
|
||||
* Session2 : Concurrent insert workload
|
||||
*
|
||||
* The child shards created by create_distributed_table_concurrently will have less columns
|
||||
* than the source shard because some column were dropped.
|
||||
* The incoming tuple from session2 will have more columns as the writes
|
||||
* happened on source shard. But now the tuple needs to be applied on child shard. So we need to format
|
||||
* it according to child schema.
|
||||
*/
|
||||
static HeapTuple
|
||||
GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
|
||||
TupleDesc sourceRelDesc,
|
||||
TupleDesc targetRelDesc)
|
||||
{
|
||||
/* Deform the tuple */
|
||||
Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
|
||||
bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
|
||||
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues,
|
||||
oldNulls);
|
||||
|
||||
|
||||
/* Create new tuple by skipping dropped columns */
|
||||
int nextAttributeIndex = 0;
|
||||
Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
|
||||
bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
|
||||
for (int i = 0; i < sourceRelDesc->natts; i++)
|
||||
{
|
||||
if (TupleDescAttr(sourceRelDesc, i)->attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
newValues[nextAttributeIndex] = oldValues[i];
|
||||
newNulls[nextAttributeIndex] = oldNulls[i];
|
||||
nextAttributeIndex++;
|
||||
}
|
||||
|
||||
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls);
|
||||
return targetRelationTuple;
|
||||
}
|
||||
|
|
|
@ -8,5 +8,6 @@ test: isolation_cluster_management
|
|||
test: isolation_logical_replication_single_shard_commands
|
||||
test: isolation_logical_replication_multi_shard_commands
|
||||
test: isolation_non_blocking_shard_split
|
||||
test: isolation_create_distributed_concurrently_after_drop_column
|
||||
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
|
||||
test: isolation_non_blocking_shard_split_fkey
|
||||
|
|
|
@ -0,0 +1,667 @@
|
|||
Parsed test spec with 3 sessions
|
||||
|
||||
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
tenant_id|dummy|measurement_id|payload|observation_time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s3-acquire-advisory-lock:
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
|
||||
pg_advisory_lock
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
|
||||
step s1-set-factor-1:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||
<waiting ...>
|
||||
step s2-insert-observations_with_pk:
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
|
||||
step s2-update-observations_with_pk:
|
||||
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57636|1500004|t | 4
|
||||
(1 row)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(4 rows)
|
||||
|
||||
step s3-release-advisory-lock:
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
|
||||
pg_advisory_unlock
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||
create_distributed_table_concurrently
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500006|t | 4
|
||||
57637|1500008|t | 0
|
||||
57638|1500005|t | 0
|
||||
57638|1500007|t | 0
|
||||
(4 rows)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
tenant_id|dummy|measurement_id|payload|observation_time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s3-acquire-advisory-lock:
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
|
||||
pg_advisory_lock
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
|
||||
step s1-set-factor-1:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||
<waiting ...>
|
||||
step s2-insert-observations_with_pk:
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
|
||||
step s2-update-primary-key-observations_with_pk:
|
||||
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57636|1500009|t | 4
|
||||
(1 row)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(4 rows)
|
||||
|
||||
step s3-release-advisory-lock:
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
|
||||
pg_advisory_unlock
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||
create_distributed_table_concurrently
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500011|t | 4
|
||||
57637|1500013|t | 0
|
||||
57638|1500010|t | 0
|
||||
57638|1500012|t | 0
|
||||
(4 rows)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(4 rows)
|
||||
|
||||
|
||||
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
tenant_id|dummy|measurement_id|payload|observation_time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s3-acquire-advisory-lock:
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
|
||||
pg_advisory_lock
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
|
||||
step s1-set-factor-1:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently:
|
||||
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||
<waiting ...>
|
||||
step s2-insert-observations_with_pk:
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
|
||||
step s2-update-observations_with_pk:
|
||||
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
|
||||
step s2-delete-observations_with_pk:
|
||||
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57636|1500014|t | 3
|
||||
(1 row)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(3 rows)
|
||||
|
||||
step s3-release-advisory-lock:
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
|
||||
pg_advisory_unlock
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
|
||||
create_distributed_table_concurrently
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster-1:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500016|t | 3
|
||||
57637|1500018|t | 0
|
||||
57638|1500015|t | 0
|
||||
57638|1500017|t | 0
|
||||
(4 rows)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(3 rows)
|
||||
|
||||
|
||||
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
tenant_id|dummy|measurement_id|payload|observation_time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s3-acquire-advisory-lock:
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
|
||||
pg_advisory_lock
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
|
||||
step s1-set-factor-1:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations-2-concurrently:
|
||||
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||
<waiting ...>
|
||||
step s2-insert-observations_with_full_replica_identity:
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
|
||||
step s2-update-observations_with_full_replica_identity:
|
||||
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57636|1500019|t | 3
|
||||
(1 row)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||
(3 rows)
|
||||
|
||||
step s3-release-advisory-lock:
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
|
||||
pg_advisory_unlock
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations-2-concurrently: <... completed>
|
||||
create_distributed_table_concurrently
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500021|t | 3
|
||||
57637|1500023|t | 0
|
||||
57638|1500020|t | 0
|
||||
57638|1500022|t | 0
|
||||
(4 rows)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
|
||||
(3 rows)
|
||||
|
||||
|
||||
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
tenant_id|dummy|measurement_id|payload|observation_time
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
step s3-acquire-advisory-lock:
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
|
||||
pg_advisory_lock
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
|
||||
step s1-set-factor-1:
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
|
||||
citus_set_coordinator_host
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations-2-concurrently:
|
||||
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||
<waiting ...>
|
||||
step s2-insert-observations_with_full_replica_identity:
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
|
||||
step s2-update-observations_with_full_replica_identity:
|
||||
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
|
||||
step s2-delete-observations_with_full_replica_identity:
|
||||
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57636|1500024|t | 2
|
||||
(1 row)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(2 rows)
|
||||
|
||||
step s3-release-advisory-lock:
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
|
||||
pg_advisory_unlock
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
step s1-create-distributed-table-observations-2-concurrently: <... completed>
|
||||
create_distributed_table_concurrently
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-print-cluster-2:
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
|
||||
nodeport|shardid|success|result
|
||||
---------------------------------------------------------------------
|
||||
57637|1500026|t | 2
|
||||
57637|1500028|t | 0
|
||||
57638|1500025|t | 0
|
||||
57638|1500027|t | 0
|
||||
(4 rows)
|
||||
|
||||
tenant_id|measurement_id|payload |observation_time
|
||||
---------------------------------------------------------------------
|
||||
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
|
||||
(2 rows)
|
||||
|
|
@ -0,0 +1,176 @@
|
|||
#include "isolation_mx_common.include.spec"
|
||||
|
||||
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
|
||||
// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities
|
||||
// session s2 - Does concurrent inserts/update/delete
|
||||
// session s3 - Holds advisory locks
|
||||
|
||||
setup
|
||||
{
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE observations_with_pk (
|
||||
tenant_id text not null,
|
||||
dummy int,
|
||||
measurement_id bigserial not null,
|
||||
payload jsonb not null,
|
||||
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP,
|
||||
PRIMARY KEY (tenant_id, measurement_id)
|
||||
);
|
||||
|
||||
CREATE TABLE observations_with_full_replica_identity (
|
||||
tenant_id text not null,
|
||||
dummy int,
|
||||
measurement_id bigserial not null,
|
||||
payload jsonb not null,
|
||||
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP
|
||||
);
|
||||
ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE observations_with_pk;
|
||||
DROP TABLE observations_with_full_replica_identity;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-alter-table"
|
||||
{
|
||||
ALTER TABLE observations_with_pk DROP COLUMN dummy;
|
||||
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
|
||||
}
|
||||
|
||||
step "s1-set-factor-1"
|
||||
{
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SELECT citus_set_coordinator_host('localhost');
|
||||
}
|
||||
|
||||
step "s1-create-distributed-table-observations_with_pk-concurrently"
|
||||
{
|
||||
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
|
||||
}
|
||||
|
||||
step "s1-create-distributed-table-observations-2-concurrently"
|
||||
{
|
||||
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s2-insert-observations_with_pk"
|
||||
{
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
}
|
||||
|
||||
step "s2-insert-observations_with_full_replica_identity"
|
||||
{
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
|
||||
}
|
||||
|
||||
step "s2-update-observations_with_pk"
|
||||
{
|
||||
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
}
|
||||
|
||||
step "s2-update-primary-key-observations_with_pk"
|
||||
{
|
||||
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
|
||||
}
|
||||
|
||||
step "s2-update-observations_with_full_replica_identity"
|
||||
{
|
||||
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
|
||||
}
|
||||
|
||||
step "s2-delete-observations_with_pk"
|
||||
{
|
||||
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||
}
|
||||
|
||||
step "s2-delete-observations_with_full_replica_identity"
|
||||
{
|
||||
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
|
||||
}
|
||||
|
||||
step "s2-end"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s2-print-cluster-1"
|
||||
{
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_pk
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
}
|
||||
|
||||
step "s2-print-cluster-2"
|
||||
{
|
||||
-- row count per shard
|
||||
SELECT
|
||||
nodeport, shardid, success, result
|
||||
FROM
|
||||
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
|
||||
ORDER BY
|
||||
nodeport, shardid;
|
||||
|
||||
SELECT *
|
||||
FROM
|
||||
observations_with_full_replica_identity
|
||||
ORDER BY
|
||||
measurement_id;
|
||||
}
|
||||
|
||||
|
||||
session "s3"
|
||||
|
||||
// this advisory lock with (almost) random values are only used
|
||||
// for testing purposes. For details, check Citus' logical replication
|
||||
// source code
|
||||
step "s3-acquire-advisory-lock"
|
||||
{
|
||||
SELECT pg_advisory_lock(44000, 55152);
|
||||
}
|
||||
|
||||
step "s3-release-advisory-lock"
|
||||
{
|
||||
SELECT pg_advisory_unlock(44000, 55152);
|
||||
}
|
||||
|
||||
// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column:
|
||||
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
|
||||
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
|
||||
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
|
||||
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
|
||||
|
||||
|
||||
// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column:
|
||||
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
|
||||
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
|
||||
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
|
||||
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
|
||||
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
|
Loading…
Reference in New Issue