Merge branch 'main' into reuse-connections-for-logical-ref-fkeys-niupre

reuse-connections-for-logical-ref-fkeys-niupre
Nitish Upreti 2022-09-15 13:11:36 -07:00
commit dce396fc3d
9 changed files with 1032 additions and 12 deletions

View File

@ -1604,6 +1604,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}
else if (ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
@ -1612,8 +1614,8 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency('%s');",
quote_qualified_identifier(schemaName, sourceName));
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
}
@ -1903,11 +1905,17 @@ CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequ
char *sourceSchemaName, char *sourceName,
char *targetSchemaName, char *targetName)
{
char *qualifiedSchemaName = quote_qualified_identifier(sequenceSchemaName,
sequenceName);
char *qualifiedSourceName = quote_qualified_identifier(sourceSchemaName, sourceName);
char *qualifiedTargetName = quote_qualified_identifier(targetSchemaName, targetName);
StringInfo query = makeStringInfo();
appendStringInfo(query, "SELECT worker_change_sequence_dependency('%s', '%s', '%s')",
quote_qualified_identifier(sequenceSchemaName, sequenceName),
quote_qualified_identifier(sourceSchemaName, sourceName),
quote_qualified_identifier(targetSchemaName, targetName));
appendStringInfo(query, "SELECT worker_change_sequence_dependency(%s, %s, %s)",
quote_literal_cstr(qualifiedSchemaName),
quote_literal_cstr(qualifiedSourceName),
quote_literal_cstr(qualifiedTargetName));
return query->data;
}

View File

@ -528,6 +528,14 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId);
}
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}
List *workersForPlacementList;
List *shardSplitPointsList;
@ -555,7 +563,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
/*
* Place shards in a round-robin fashion across all data nodes.
*/
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
}
@ -856,6 +863,8 @@ WorkerNodesForShardList(List *shardList)
static List *
RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
{
Assert(workerNodeList != NIL);
List *nodeIdList = NIL;
for (int idx = 0; idx < listLength; idx++)

View File

@ -670,10 +670,15 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
AppendShardIdToName(&shardName, shardInterval->shardId);
StringInfo checkShardExistsQuery = makeStringInfo();
/*
* We pass schemaName and shardName without quote_identifier, since
* they are used as strings here.
*/
appendStringInfo(checkShardExistsQuery,
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = '%s' AND tablename = '%s');",
schemaName,
shardName);
"SELECT EXISTS (SELECT FROM pg_catalog.pg_tables WHERE schemaname = %s AND tablename = %s);",
quote_literal_cstr(schemaName),
quote_literal_cstr(shardName));
int connectionFlags = 0;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
@ -691,11 +696,13 @@ CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *work
ReportResultError(connection, result, ERROR);
}
char *checkExists = PQgetvalue(result, 0, 0);
char *existsString = PQgetvalue(result, 0, 0);
bool tableExists = strcmp(existsString, "t") == 0;
PQclear(result);
ForgetResults(connection);
return strcmp(checkExists, "t") == 0;
return tableExists;
}

View File

@ -34,6 +34,10 @@ static Oid FindTargetRelationOid(Relation sourceShardRelation,
HeapTuple tuple,
char *currentSlotName);
static HeapTuple GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceTupleDesc,
TupleDesc targetTupleDesc);
/*
* Postgres uses 'pgoutput' as default plugin for logical replication.
* We want to reuse Postgres pgoutput's functionality as much as possible.
@ -129,6 +133,71 @@ split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
Relation targetRelation = RelationIdGetRelation(targetRelationOid);
/*
* If any columns from source relation have been dropped, then the tuple needs to
* be formatted according to the target relation.
*/
TupleDesc sourceRelationDesc = RelationGetDescr(relation);
TupleDesc targetRelationDesc = RelationGetDescr(targetRelation);
if (sourceRelationDesc->natts > targetRelationDesc->natts)
{
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
break;
}
case REORDER_BUFFER_CHANGE_UPDATE:
{
HeapTuple sourceRelationNewTuple = &(change->data.tp.newtuple->tuple);
HeapTuple targetRelationNewTuple = GetTupleForTargetSchema(
sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.newtuple->tuple = *targetRelationNewTuple;
/*
* Format oldtuple according to the target relation. If the column values of replica
* identiy change, then the old tuple is non-null and needs to be formatted according
* to the target relation schema.
*/
if (change->data.tp.oldtuple != NULL)
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple,
sourceRelationDesc,
targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
{
HeapTuple sourceRelationOldTuple = &(change->data.tp.oldtuple->tuple);
HeapTuple targetRelationOldTuple = GetTupleForTargetSchema(
sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc);
change->data.tp.oldtuple->tuple = *targetRelationOldTuple;
break;
}
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
pgoutputChangeCB(ctx, txn, targetRelation, change);
RelationClose(targetRelation);
}
@ -223,3 +292,51 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
return DatumGetInt32(hashedValueDatum);
}
/*
* GetTupleForTargetSchema returns a tuple with the schema of the target relation.
* If some columns within the source relations are dropped, we would have to reformat
* the tuple to match the schema of the target relation.
*
* Consider the below scenario:
* Session1 : Drop column followed by create_distributed_table_concurrently
* Session2 : Concurrent insert workload
*
* The child shards created by create_distributed_table_concurrently will have less columns
* than the source shard because some column were dropped.
* The incoming tuple from session2 will have more columns as the writes
* happened on source shard. But now the tuple needs to be applied on child shard. So we need to format
* it according to child schema.
*/
static HeapTuple
GetTupleForTargetSchema(HeapTuple sourceRelationTuple,
TupleDesc sourceRelDesc,
TupleDesc targetRelDesc)
{
/* Deform the tuple */
Datum *oldValues = (Datum *) palloc0(sourceRelDesc->natts * sizeof(Datum));
bool *oldNulls = (bool *) palloc0(sourceRelDesc->natts * sizeof(bool));
heap_deform_tuple(sourceRelationTuple, sourceRelDesc, oldValues,
oldNulls);
/* Create new tuple by skipping dropped columns */
int nextAttributeIndex = 0;
Datum *newValues = (Datum *) palloc0(targetRelDesc->natts * sizeof(Datum));
bool *newNulls = (bool *) palloc0(targetRelDesc->natts * sizeof(bool));
for (int i = 0; i < sourceRelDesc->natts; i++)
{
if (TupleDescAttr(sourceRelDesc, i)->attisdropped)
{
continue;
}
newValues[nextAttributeIndex] = oldValues[i];
newNulls[nextAttributeIndex] = oldNulls[i];
nextAttributeIndex++;
}
HeapTuple targetRelationTuple = heap_form_tuple(targetRelDesc, newValues, newNulls);
return targetRelationTuple;
}

View File

@ -8,5 +8,6 @@ test: isolation_cluster_management
test: isolation_logical_replication_single_shard_commands
test: isolation_logical_replication_multi_shard_commands
test: isolation_non_blocking_shard_split
test: isolation_create_distributed_concurrently_after_drop_column
test: isolation_non_blocking_shard_split_with_index_as_replicaIdentity
test: isolation_non_blocking_shard_split_fkey

View File

@ -57,6 +57,35 @@ ERROR: cannot colocate tables nocolo and test
DETAIL: Distribution column types don't match for nocolo and test.
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
ERROR: relation "noexists" does not exist
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select create_distributed_table_concurrently('test','key');
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY
DETAIL: UPDATE and DELETE commands on the relation will error out during create_distributed_table_concurrently unless there is a REPLICA IDENTITY or PRIMARY KEY. INSERT commands will still work.
ERROR: no worker nodes are available for placing shards
HINT: Add more worker nodes.
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
citus_set_node_property
---------------------------------------------------------------------
(1 row)
-- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11);
NOTICE: relation test does not have a REPLICA IDENTITY or PRIMARY KEY

View File

@ -0,0 +1,667 @@
Parsed test spec with 3 sessions
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500004|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500006|t | 4
57637|1500008|t | 0
57638|1500005|t | 0
57638|1500007|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-primary-key-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-primary-key-observations_with_pk:
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500009|t | 4
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500011|t | 4
57637|1500013|t | 0
57638|1500010|t | 0
57638|1500012|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 100|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(4 rows)
starting permutation: s2-print-cluster-1 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations_with_pk-concurrently s2-insert-observations_with_pk s2-update-observations_with_pk s2-delete-observations_with_pk s2-end s2-print-cluster-1 s3-release-advisory-lock s2-print-cluster-1
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently:
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
<waiting ...>
step s2-insert-observations_with_pk:
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_pk:
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_pk:
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500014|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations_with_pk-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-1:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500016|t | 3
57637|1500018|t | 0
57638|1500015|t | 0
57638|1500017|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 4|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500019|t | 3
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500021|t | 3
57637|1500023|t | 0
57638|1500020|t | 0
57638|1500022|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 3|{"name": 29.3}|Mon Mar 11 02:00:00 2019 PDT
(3 rows)
starting permutation: s2-print-cluster-2 s3-acquire-advisory-lock s2-begin s1-alter-table s1-set-factor-1 s1-create-distributed-table-observations-2-concurrently s2-insert-observations_with_full_replica_identity s2-update-observations_with_full_replica_identity s2-delete-observations_with_full_replica_identity s2-end s2-print-cluster-2 s3-release-advisory-lock s2-print-cluster-2
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
(0 rows)
tenant_id|dummy|measurement_id|payload|observation_time
---------------------------------------------------------------------
(0 rows)
step s3-acquire-advisory-lock:
SELECT pg_advisory_lock(44000, 55152);
pg_advisory_lock
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s1-alter-table:
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
step s1-set-factor-1:
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
citus_set_coordinator_host
---------------------------------------------------------------------
(1 row)
step s1-create-distributed-table-observations-2-concurrently:
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
<waiting ...>
step s2-insert-observations_with_full_replica_identity:
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
step s2-update-observations_with_full_replica_identity:
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
step s2-delete-observations_with_full_replica_identity:
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
step s2-end:
COMMIT;
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57636|1500024|t | 2
(1 row)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)
step s3-release-advisory-lock:
SELECT pg_advisory_unlock(44000, 55152);
pg_advisory_unlock
---------------------------------------------------------------------
t
(1 row)
step s1-create-distributed-table-observations-2-concurrently: <... completed>
create_distributed_table_concurrently
---------------------------------------------------------------------
(1 row)
step s2-print-cluster-2:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500026|t | 2
57637|1500028|t | 0
57638|1500025|t | 0
57638|1500027|t | 0
(4 rows)
tenant_id|measurement_id|payload |observation_time
---------------------------------------------------------------------
tenant_id| 1|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
tenant_id| 2|{"name": 29.3}|Sun Mar 11 03:00:00 2018 PDT
(2 rows)

View File

@ -0,0 +1,176 @@
#include "isolation_mx_common.include.spec"
// Test scenario for nonblocking split and concurrent INSERT/UPDATE/DELETE
// session s1 - Executes create_distributed_table_concurrently after dropping a column on tables with replica identities
// session s2 - Does concurrent inserts/update/delete
// session s3 - Holds advisory locks
setup
{
SET citus.shard_replication_factor TO 1;
CREATE TABLE observations_with_pk (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP,
PRIMARY KEY (tenant_id, measurement_id)
);
CREATE TABLE observations_with_full_replica_identity (
tenant_id text not null,
dummy int,
measurement_id bigserial not null,
payload jsonb not null,
observation_time timestamptz not null default '03/11/2018 02:00:00'::TIMESTAMP
);
ALTER TABLE observations_with_full_replica_identity REPLICA IDENTITY FULL;
}
teardown
{
DROP TABLE observations_with_pk;
DROP TABLE observations_with_full_replica_identity;
}
session "s1"
step "s1-alter-table"
{
ALTER TABLE observations_with_pk DROP COLUMN dummy;
ALTER TABLE observations_with_full_replica_identity DROP COLUMN dummy;
}
step "s1-set-factor-1"
{
SET citus.shard_replication_factor TO 1;
SELECT citus_set_coordinator_host('localhost');
}
step "s1-create-distributed-table-observations_with_pk-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_pk','tenant_id');
}
step "s1-create-distributed-table-observations-2-concurrently"
{
SELECT create_distributed_table_concurrently('observations_with_full_replica_identity','tenant_id');
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-insert-observations_with_pk"
{
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_pk(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-insert-observations_with_full_replica_identity"
{
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
INSERT INTO observations_with_full_replica_identity(tenant_id, payload) SELECT 'tenant_id', jsonb_build_object('name', 29.3);
}
step "s2-update-observations_with_pk"
{
UPDATE observations_with_pk set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-update-primary-key-observations_with_pk"
{
UPDATE observations_with_pk set measurement_id=100 where tenant_id = 'tenant_id' and measurement_id = 4 ;
}
step "s2-update-observations_with_full_replica_identity"
{
UPDATE observations_with_full_replica_identity set observation_time='03/11/2019 02:00:00'::TIMESTAMP where tenant_id = 'tenant_id' and measurement_id = 3;
}
step "s2-delete-observations_with_pk"
{
DELETE FROM observations_with_pk where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-delete-observations_with_full_replica_identity"
{
DELETE FROM observations_with_full_replica_identity where tenant_id = 'tenant_id' and measurement_id = 3 ;
}
step "s2-end"
{
COMMIT;
}
step "s2-print-cluster-1"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_pk', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_pk
ORDER BY
measurement_id;
}
step "s2-print-cluster-2"
{
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('observations_with_full_replica_identity', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
SELECT *
FROM
observations_with_full_replica_identity
ORDER BY
measurement_id;
}
session "s3"
// this advisory lock with (almost) random values are only used
// for testing purposes. For details, check Citus' logical replication
// source code
step "s3-acquire-advisory-lock"
{
SELECT pg_advisory_lock(44000, 55152);
}
step "s3-release-advisory-lock"
{
SELECT pg_advisory_unlock(44000, 55152);
}
// Concurrent Insert/Update with create_distributed_table_concurrently(with primary key as replica identity) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-primary-key-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
permutation "s2-print-cluster-1" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations_with_pk-concurrently" "s2-insert-observations_with_pk" "s2-update-observations_with_pk" "s2-delete-observations_with_pk" "s2-end" "s2-print-cluster-1" "s3-release-advisory-lock" "s2-print-cluster-1"
// Concurrent Insert/Update with create_distributed_table_concurrently(with replica identity full) after dropping a column:
// s3 holds advisory lock -> s1 starts create_distributed_table_concurrently and waits for advisory lock ->
// s2 concurrently inserts/deletes/updates rows -> s3 releases the advisory lock
// -> s1 complete create_distributed_table_concurrently -> result is reflected in new shards
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"
permutation "s2-print-cluster-2" "s3-acquire-advisory-lock" "s2-begin" "s1-alter-table" "s1-set-factor-1" "s1-create-distributed-table-observations-2-concurrently" "s2-insert-observations_with_full_replica_identity" "s2-update-observations_with_full_replica_identity" "s2-delete-observations_with_full_replica_identity" "s2-end" "s2-print-cluster-2" "s3-release-advisory-lock" "s2-print-cluster-2"

View File

@ -38,6 +38,12 @@ select create_distributed_table_concurrently('nocolo','x');
select create_distributed_table_concurrently('test','key', colocate_with := 'nocolo');
select create_distributed_table_concurrently('test','key', colocate_with := 'noexists');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', false);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', false);
select create_distributed_table_concurrently('test','key');
select citus_set_node_property('localhost', :worker_1_port, 'shouldhaveshards', true);
select citus_set_node_property('localhost', :worker_2_port, 'shouldhaveshards', true);
-- use colocate_with "default"
select create_distributed_table_concurrently('test','key', shard_count := 11);