diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 59fb3118f..51a56b36e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation, HeapTuple tuple, char *currentSlotName); +static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple, + TupleDesc sourceTupleDesc, + TupleDesc targetTupleDesc); + /* * Postgres uses 'pgoutput' as default plugin for logical replication. * We want to reuse Postgres pgoutput's functionality as much as possible. @@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } Relation targetRelation = RelationIdGetRelation(targetRelationOid); + + /* + * If any columns from source relation have been dropped, then the tuple needs to + * be formatted according to the target relation. + */ + TupleDesc sourceRelationDesc = RelationGetDescr(relation); + TupleDesc targetRelationDesc = RelationGetDescr(targetRelation); + if (sourceRelationDesc->natts > targetRelationDesc->natts) + { + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple); + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple->tuple = *targetRelationNewTuple; + break; + } + + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple); + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple->tuple = *targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple); + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple->tuple = *targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple); + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.oldtuple->tuple = *targetRelationOldTuple; + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } + } + pgoutputChangeCB(ctx, txn, targetRelation, change); RelationClose(targetRelation); } @@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation, return DatumGetInt32(hashedValueDatum); } + + +/* + * GetTupleForTargetSchema returns a tuple with the schema of the target relation. + * If some columns within the source relations are dropped, we would have to reformat + * the tuple to match the schema of the target relation. + * + * Consider the below scenario: + * Session1 : Drop column followed by create_distributed_table_concurrently + * Session2 : Concurrent insert workload + * + * The child shards created by create_distributed_table_concurrently will have less columns + * than the source shard because some column were dropped. + * The incoming tuple from session2 will have more columns as the writes + * happened on source shard. But now the tuple needs to be applied on child shard. So we need to format + * it according to child schema. + */ +static HeapTuple +GetTupleForTargetSchema(HeapTuple sourceRelationTuple, + TupleDesc sourceRelDesc, + TupleDesc targetRelDesc) +{ + /* Deform the tuple */ + Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum)); + bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool)); + heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues, + oldNulls); + + + /* Create new tuple by skipping dropped columns */ + int nextAttributeIndex = 0; + Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum)); + bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool)); + for (int i = 0; i < sourceRelDesc->natts; i++) + { + if (TupleDescAttr(sourceRelDesc, i)->attisdropped) + { + continue; + } + + newValues[nextAttributeIndex] = oldValues[i]; + newNulls[nextAttributeIndex] = oldNulls[i]; + nextAttributeIndex++; + } + + HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls); + return targetRelationTuple; +} diff --git a/src/test/regress/enterprise_isolation_logicalrep_1_schedule b/src/test/regress/enterprise_isolation_logicalrep_1_schedule index 96cc9915e..23ed93739 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_1_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_1_schedule @@ -8,5 +8,6 @@ test: isolation_cluster_management test: isolation_logical_replication_single_shard_commands test: isolation_logical_replication_multi_shard_commands test: isolation_non_blocking_shard_split +test: isolation_create_distributed_concurrently_after_drop_column test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity test: isolation_non_blocking_shard_split_fkey diff --git a/src/test/regress/expected/isolation_create_distributed_concurrently_after_drop_column.out b/src/test/regress/expected/isolation_create_distributed_concurrently_after_drop_column.out new file mode 100644 index 000000000..1cdc756d1 --- /dev/null +++ b/src/test/regress/expected/isolation_create_distributed_concurrently_after_drop_column.out @@ -0,0 +1,667 @@ +Parsed test spec with 3 sessions + +starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1 +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- +(0 rows) + +tenant_id|dummy|measurement_id|payload|observation_time +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; + +step s1-set-factor-1: + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); + +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: + SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id'); + +step s2-insert-observations_with_pk: + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + +step s2-update-observations_with_pk: + UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; + +step s2-end: + COMMIT; + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57636|1500004|t | 4 +(1 row) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT +tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(4 rows) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: <... completed> +create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500006|t | 4 + 57637|1500008|t | 0 + 57638|1500005|t | 0 + 57638|1500007|t | 0 +(4 rows) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT +tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(4 rows) + + +starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1 +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- +(0 rows) + +tenant_id|dummy|measurement_id|payload|observation_time +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; + +step s1-set-factor-1: + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); + +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: + SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id'); + +step s2-insert-observations_with_pk: + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + +step s2-update-primary-key-observations_with_pk: + UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ; + +step s2-end: + COMMIT; + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57636|1500009|t | 4 +(1 row) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(4 rows) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: <... completed> +create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500011|t | 4 + 57637|1500013|t | 0 + 57638|1500010|t | 0 + 57638|1500012|t | 0 +(4 rows) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(4 rows) + + +starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1 +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- +(0 rows) + +tenant_id|dummy|measurement_id|payload|observation_time +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; + +step s1-set-factor-1: + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); + +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: + SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id'); + +step s2-insert-observations_with_pk: + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + +step s2-update-observations_with_pk: + UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; + +step s2-delete-observations_with_pk: + DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ; + +step s2-end: + COMMIT; + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57636|1500014|t | 3 +(1 row) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(3 rows) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-create-distributed-table-observations_with_pk-concurrently: <... completed> +create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster-1: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500016|t | 3 + 57637|1500018|t | 0 + 57638|1500015|t | 0 + 57638|1500017|t | 0 +(4 rows) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(3 rows) + + +starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2 +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- +(0 rows) + +tenant_id|dummy|measurement_id|payload|observation_time +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; + +step s1-set-factor-1: + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); + +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step s1-create-distributed-table-observations-2-concurrently: + SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id'); + +step s2-insert-observations_with_full_replica_identity: + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + +step s2-update-observations_with_full_replica_identity: + UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; + +step s2-end: + COMMIT; + +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57636|1500019|t | 3 +(1 row) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT +(3 rows) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-create-distributed-table-observations-2-concurrently: <... completed> +create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500021|t | 3 + 57637|1500023|t | 0 + 57638|1500020|t | 0 + 57638|1500022|t | 0 +(4 rows) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT +(3 rows) + + +starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2 +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- +(0 rows) + +tenant_id|dummy|measurement_id|payload|observation_time +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s2-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; + +step s1-set-factor-1: + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); + +citus_set_coordinator_host +--------------------------------------------------------------------- + +(1 row) + +step s1-create-distributed-table-observations-2-concurrently: + SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id'); + +step s2-insert-observations_with_full_replica_identity: + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + +step s2-update-observations_with_full_replica_identity: + UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; + +step s2-delete-observations_with_full_replica_identity: + DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ; + +step s2-end: + COMMIT; + +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57636|1500024|t | 2 +(1 row) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(2 rows) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-create-distributed-table-observations-2-concurrently: <... completed> +create_distributed_table_concurrently +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster-2: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500026|t | 2 + 57637|1500028|t | 0 + 57638|1500025|t | 0 + 57638|1500027|t | 0 +(4 rows) + +tenant_id|measurement_id|payload |observation_time +--------------------------------------------------------------------- +tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT +(2 rows) + diff --git a/src/test/regress/spec/isolation_create_distributed_concurrently_after_drop_column.spec b/src/test/regress/spec/isolation_create_distributed_concurrently_after_drop_column.spec new file mode 100644 index 000000000..95fa5e010 --- /dev/null +++ b/src/test/regress/spec/isolation_create_distributed_concurrently_after_drop_column.spec @@ -0,0 +1,176 @@ +#include "isolation_mx_common.include.spec" + +// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE +// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities +// session s2 - Does concurrent inserts/update/delete +// session s3 - Holds advisory locks + +setup +{ + SET citus.shard_replication_factor TO 1; + CREATE TABLE observations_with_pk ( + tenant_id text not null, + dummy int, + measurement_id bigserial not null, + payload jsonb not null, + observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP, + PRIMARY KEY (tenant_id, measurement_id) + ); + + CREATE TABLE observations_with_full_replica_identity ( + tenant_id text not null, + dummy int, + measurement_id bigserial not null, + payload jsonb not null, + observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP + ); + ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL; +} + +teardown +{ + DROP TABLE observations_with_pk; + DROP TABLE observations_with_full_replica_identity; +} + +session "s1" + +step "s1-alter-table" +{ + ALTER TABLE observations_with_pk DROP COLUMN dummy; + ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy; +} + +step "s1-set-factor-1" +{ + SET citus.shard_replication_factor TO 1; + SELECT citus_set_coordinator_host('localhost'); +} + +step "s1-create-distributed-table-observations_with_pk-concurrently" +{ + SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id'); +} + +step "s1-create-distributed-table-observations-2-concurrently" +{ + SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id'); +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert-observations_with_pk" +{ + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); +} + +step "s2-insert-observations_with_full_replica_identity" +{ + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); + INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3); +} + +step "s2-update-observations_with_pk" +{ + UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; +} + +step "s2-update-primary-key-observations_with_pk" +{ + UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ; +} + +step "s2-update-observations_with_full_replica_identity" +{ + UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3; +} + +step "s2-delete-observations_with_pk" +{ + DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ; +} + +step "s2-delete-observations_with_full_replica_identity" +{ + DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ; +} + +step "s2-end" +{ + COMMIT; +} + +step "s2-print-cluster-1" +{ + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_pk', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + + SELECT * + FROM + observations_with_pk + ORDER BY + measurement_id; +} + +step "s2-print-cluster-2" +{ + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + + SELECT * + FROM + observations_with_full_replica_identity + ORDER BY + measurement_id; +} + + +session "s3" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s3-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(44000, 55152); +} + +step "s3-release-advisory-lock" +{ + SELECT pg_advisory_unlock(44000, 55152); +} + +// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column: +// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock -> +// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock +// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards +permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1" +permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1" +permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1" + + +// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column: +// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock -> +// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock +// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards +permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2" +permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"