From adf73387f48b7135dad55d77cc9308f5b790835a Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Sat, 30 Jul 2022 13:36:10 +0530 Subject: [PATCH] 1. Added an isolation test 2. Added DisableAndDropShardSplitSubscription 3. Added creation of replica identities and validated through isolation test --- .../distributed/operations/shard_split.c | 78 ++- ...worker_split_shard_replication_setup_udf.c | 41 +- .../replication/multi_logical_replication.c | 7 +- .../shardsplit/shardsplit_decoder.c | 17 +- .../shardsplit_logical_replication.c | 31 +- .../11.1-1.sql | 1 + .../latest.sql | 1 + .../distributed/multi_logical_replication.h | 3 + .../shardsplit_logical_replication.h | 2 + ...enterprise_isolation_logicalrep_1_schedule | 1 + .../citus_non_blocking_split_shards.out | 10 +- src/test/regress/expected/citus_sameer.out | 124 ++++ .../isolation_non_blocking_shard_split.out | 647 ++++++++++++++++++ ...plit_shard_replication_colocated_setup.out | 8 +- .../split_shard_replication_setup.out | 4 +- .../split_shard_replication_setup_local.out | 4 +- ...t_shard_replication_setup_remote_local.out | 4 +- .../isolation_non_blocking_shard_split.spec | 131 ++++ src/test/regress/split_schedule | 1 + .../sql/citus_non_blocking_split_shards.sql | 10 +- src/test/regress/sql/citus_sameer.sql | 78 +++ ...plit_shard_replication_colocated_setup.sql | 8 +- .../sql/split_shard_replication_setup.sql | 4 +- .../split_shard_replication_setup_local.sql | 4 +- ...t_shard_replication_setup_remote_local.sql | 4 +- 25 files changed, 1151 insertions(+), 72 deletions(-) create mode 100644 src/test/regress/expected/citus_sameer.out create mode 100644 src/test/regress/expected/isolation_non_blocking_shard_split.out create mode 100644 src/test/regress/spec/isolation_non_blocking_shard_split.spec create mode 100644 src/test/regress/sql/citus_sameer.sql diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 153f29729..8992e81d2 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -69,7 +69,10 @@ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalLis List *workersForPlacementList); static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); + List *workersForPlacementList, + bool includeReplicaIdentity); +static void CreateReplicaIdentities(List *shardGroupSplitIntervalListList, + List *workersForPlacementList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -655,7 +658,8 @@ CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode) */ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, - List *workersForPlacementList) + List *workersForPlacementList, bool + includeReplicaIdentity) { List *shardIntervalList = NIL; List *ddlTaskExecList = NIL; @@ -678,7 +682,7 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *ddlCommandList = GetPostLoadTableCreationCommands( shardInterval->relationId, true /* includeIndexes */, - true /* includeReplicaIdentity */); + includeReplicaIdentity); ddlCommandList = WorkerApplyShardDDLCommandList( ddlCommandList, shardInterval->shardId); @@ -725,7 +729,8 @@ CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkfl /* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, - workersForPlacementList); + workersForPlacementList, + true /* includeReplicaIdentity*/); } @@ -1298,6 +1303,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, sourceShardToCopyNode, workersForPlacementList); + CreateReplicaIdentities(shardGroupSplitIntervalListList, workersForPlacementList); + + /* 3) Create Publications. */ CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); @@ -1350,6 +1358,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, superUser, databaseName); + /* Used for testing */ + ConflictOnlyWithIsolationTesting(); + /* 9) Wait for subscriptions to be ready */ WaitForShardSplitRelationSubscriptionsBecomeReady( shardSplitSubscribersMetadataList); @@ -1361,7 +1372,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* 11) Create Auxilary structures */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, - workersForPlacementList); + workersForPlacementList, + false /* includeReplicaIdentity*/); /* 12) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); @@ -1702,6 +1714,12 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, splitChildShardIntervalList, shardGroupSplitIntervalListList) { int64 sourceShardId = sourceShardIntervalToCopy->shardId; + Oid relationId = sourceShardIntervalToCopy->relationId; + Var *partitionColumn = DistPartitionKey(relationId); + + bool missingOK = false; + char *partitionColumnName = + get_attname(relationId, partitionColumn->varattno, missingOK); ShardInterval *splitChildShardInterval = NULL; WorkerNode *destinationWorkerNode = NULL; @@ -1722,8 +1740,9 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, splitChildShardInterval->maxValue)); appendStringInfo(splitChildrenRows, - "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", + "ROW(%lu, %s, %lu, %s, %s, %u)::citus.split_shard_info", sourceShardId, + quote_literal_cstr(partitionColumnName), splitChildShardInterval->shardId, quote_literal_cstr(minValueString->data), quote_literal_cstr(maxValueString->data), @@ -1868,3 +1887,50 @@ TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval) dropShardQuery->data, NULL /* pgResult */); } + + +/*todo(saawasek): Add comments */ +static void +CreateReplicaIdentities(List *shardGroupSplitIntervalListList, + List *workersForPlacementList) +{ + /* + * Create Replica Identities for actual child shards. + */ + List *shardIntervalList = NIL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + + /* + * Iterate on split shard interval list for given shard and create tasks + * for every single split shard in a shard group. + */ + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + List *shardList = NIL; + shardList = lappend(shardList, shardInterval); + + CreateReplicaIdentity(shardList, workerPlacementNode->workerName, + workerPlacementNode->workerPort); + } + } + + /*todo: remove the global variable dummy map*/ + HASH_SEQ_STATUS status; + hash_seq_init(&status, DummyShardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, + false /* missingOk */); + + List *dummyShardIntervalList = entry->shardSplitInfoList; + CreateReplicaIdentity(dummyShardIntervalList, shardToBeDroppedNode->workerName, + shardToBeDroppedNode->workerPort); + } +} diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index d73a6fb4c..d48153828 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -12,6 +12,7 @@ #include "miscadmin.h" #include "postmaster/postmaster.h" #include "common/hashfn.h" +#include "distributed/distribution_column.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" @@ -34,12 +35,14 @@ static HTAB *ShardInfoHashMap = NULL; /* Function declarations */ static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, uint64 *sourceShardId, + char **partitionColumnName, uint64 *childShardId, int32 *minValue, int32 *maxValue, int32 *nodeId); static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, + char *partitionColumnName, uint64 desSplitChildShardId, int32 minValue, int32 maxValue, @@ -111,16 +114,19 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) while (array_iterate(shardInfo_iterator, &shardInfoDatum, &isnull)) { uint64 sourceShardId = 0; + char *partitionColumnName = NULL; uint64 childShardId = 0; int32 minValue = 0; int32 maxValue = 0; int32 nodeId = 0; - ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId, &childShardId, + ParseShardSplitInfoFromDatum(shardInfoDatum, &sourceShardId, + &partitionColumnName, &childShardId, &minValue, &maxValue, &nodeId); ShardSplitInfo *shardSplitInfo = CreateShardSplitInfo( sourceShardId, + partitionColumnName, childShardId, minValue, maxValue, @@ -177,6 +183,7 @@ SetupHashMapForShardInfo() * with appropriate OIs' for source and destination relation. * * sourceShardIdToSplit - Existing shardId which has a valid entry in cache and catalogue + * partitionColumnName - Name of column to use for partitioning * desSplitChildShardId - New split child shard which doesn't have an entry in metacache yet. * minValue - Minimum hash value for desSplitChildShardId * maxValue - Maximum hash value for desSplitChildShardId @@ -185,6 +192,7 @@ SetupHashMapForShardInfo() */ ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, + char *partitionColumnName, uint64 desSplitChildShardId, int32 minValue, int32 maxValue, @@ -204,23 +212,6 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, "worker nodes hosting source shards.", sourceShardIdToSplit)); } - CitusTableCacheEntry *cachedTableEntry = GetCitusTableCacheEntry( - shardIntervalToSplit->relationId); - - if (!IsCitusTableTypeCacheEntry(cachedTableEntry, HASH_DISTRIBUTED)) - { - Relation distributedRel = RelationIdGetRelation(cachedTableEntry->relationId); - ereport(ERROR, (errmsg( - "Citus does only support Hash distributed tables to be split."), - errdetail("Table '%s' is not Hash distributed", - RelationGetRelationName(distributedRel)) - )); - RelationClose(distributedRel); - } - - Assert(shardIntervalToSplit->minValueExists); - Assert(shardIntervalToSplit->maxValueExists); - /* Oid of distributed table */ Oid citusTableOid = shardIntervalToSplit->relationId; Oid sourceShardToSplitOid = GetTableLocalShardOid(citusTableOid, @@ -244,7 +235,9 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, } /* determine the partition column in the tuple descriptor */ - Var *partitionColumn = cachedTableEntry->partitionColumn; + Var *partitionColumn = BuildDistributionKeyFromColumnName(sourceShardToSplitOid, + partitionColumnName, + AccessShareLock); if (partitionColumn == NULL) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -370,6 +363,7 @@ NodeShardMappingHashCompare(const void *left, const void *right, Size keysize) static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, uint64 *sourceShardId, + char **partitionColumnName, uint64 *childShardId, int32 *minValue, int32 *maxValue, @@ -385,6 +379,15 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, } *sourceShardId = DatumGetUInt64(sourceShardIdDatum); + Datum partitionColumnDatum = GetAttributeByName(dataTuple, "distribution_column", + &isnull); + if (isnull) + { + ereport(ERROR, (errmsg( + "distribution_column for split_shard_info can't be null"))); + } + *partitionColumnName = TextDatumGetCString(partitionColumnDatum); + Datum childShardIdDatum = GetAttributeByName(dataTuple, "child_shard_id", &isnull); if (isnull) { diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 8f150c9ac..5bd41d561 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -88,8 +88,6 @@ static void CreateForeignConstraintsToReferenceTable(List *shardList, MultiConnection *targetConnection); static List * PrepareReplicationSubscriptionList(List *shardList); static Bitmapset * TableOwnerIds(List *shardList); -static void CreateReplicaIdentity(List *shardList, char *nodeName, int32 - nodePort); static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId); static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); @@ -115,7 +113,6 @@ static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNodePort); static void CreateColocatedForeignKeys(List *shardList, char *targetNodeName, int targetNodePort); -static void ConflictOnlyWithIsolationTesting(void); static void DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds); static void DropShardMoveSubscriptions(MultiConnection *connection, @@ -456,7 +453,7 @@ TableOwnerIds(List *shardList) * CreateReplicaIdentity gets a shardList and creates all the replica identities * on the shards in the given node. */ -static void +void CreateReplicaIdentity(List *shardList, char *nodeName, int32 nodePort) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, @@ -1048,7 +1045,7 @@ CreateForeignConstraintsToReferenceTable(List *shardList, * Note that since the cost of calling this function is pretty low, we prefer * to use it in non-assert builds as well not to diverge in the behaviour. */ -static void +extern void ConflictOnlyWithIsolationTesting() { LOCKTAG tag; diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 54042ed57..ab4e97a8e 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -12,6 +12,7 @@ #include "distributed/shardsplit_shared_memory.h" #include "distributed/listutils.h" #include "replication/logical.h" +#include "utils/typcache.h" /* * Dynamically-loaded modules are required to include this macro call to check for @@ -208,24 +209,22 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation, int partitionColumnIndex, Oid distributedTableOid) { - CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableOid); - if (cacheEntry == NULL) - { - ereport(ERROR, errmsg( - "Expected valid Citus Cache entry to be present. But found null")); - } - TupleDesc relationTupleDes = RelationGetDescr(sourceShardRelation); + Form_pg_attribute partitionColumn = TupleDescAttr(relationTupleDes, + partitionColumnIndex); + bool isNull = false; Datum partitionColumnValue = heap_getattr(tuple, partitionColumnIndex + 1, relationTupleDes, &isNull); - FmgrInfo *hashFunction = cacheEntry->hashFunction; + TypeCacheEntry *typeEntry = lookup_type_cache(partitionColumn->atttypid, + TYPECACHE_HASH_PROC_FINFO); /* get hashed value of the distribution value */ - Datum hashedValueDatum = FunctionCall1(hashFunction, partitionColumnValue); + Datum hashedValueDatum = FunctionCall1(&(typeEntry->hash_proc_finfo), + partitionColumnValue); return DatumGetInt32(hashedValueDatum); } diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 6813fe824..d4cd37368 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -603,7 +603,7 @@ DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) char *subscriptionName = NULL; foreach_ptr(subscriptionName, subscriptionNameList) { - DropShardSubscription(cleanupConnection, subscriptionName); + DisableAndDropShardSplitSubscription(cleanupConnection, subscriptionName); } } @@ -709,8 +709,9 @@ DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList) uint32 tableOwnerId = subscriberMetadata->tableOwnerId; MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection; - DropShardSubscription(targetNodeConnection, ShardSubscriptionName(tableOwnerId, - SHARD_SPLIT_SUBSCRIPTION_PREFIX)); + DisableAndDropShardSplitSubscription(targetNodeConnection, ShardSubscriptionName( + tableOwnerId, + SHARD_SPLIT_SUBSCRIPTION_PREFIX)); DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); @@ -718,6 +719,30 @@ DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList) } +/*todo(saawasek): add comments */ +void +DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName) +{ + StringInfo alterSubscriptionSlotCommand = makeStringInfo(); + StringInfo alterSubscriptionDisableCommand = makeStringInfo(); + + appendStringInfo(alterSubscriptionDisableCommand, + "ALTER SUBSCRIPTION %s DISABLE", + quote_identifier(subscriptionName)); + ExecuteCriticalRemoteCommand(connection, + alterSubscriptionDisableCommand->data); + + appendStringInfo(alterSubscriptionSlotCommand, + "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", + quote_identifier(subscriptionName)); + ExecuteCriticalRemoteCommand(connection, alterSubscriptionSlotCommand->data); + + ExecuteCriticalRemoteCommand(connection, psprintf( + "DROP SUBSCRIPTION %s", + quote_identifier(subscriptionName))); +} + + /* * CloseShardSplitSubscriberConnections closes connection of subscriber nodes. * 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql index aca0e4725..35ca3b898 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.1-1.sql @@ -1,5 +1,6 @@ CREATE TYPE citus.split_shard_info AS ( source_shard_id bigint, + distribution_column text, child_shard_id bigint, shard_min_value text, shard_max_value text, diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql index cfa3d2a67..6e70b827c 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql @@ -1,5 +1,6 @@ CREATE TYPE citus.split_shard_info AS ( source_shard_id bigint, + distribution_column text, child_shard_id bigint, shard_min_value text, shard_max_value text, diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f91b1f850..c80ff6e6c 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -26,6 +26,9 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); +extern void ConflictOnlyWithIsolationTesting(void); +extern void CreateReplicaIdentity(List *shardList, char *nodeName, int32 + nodePort); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); extern List * GetQueryResultStringList(MultiConnection *connection, char *query); diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 58801401f..1bae3957a 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -96,6 +96,8 @@ extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot( ShardInterval *shardIntervalToSplit, MultiConnection * sourceConnection); +extern void DisableAndDropShardSplitSubscription(MultiConnection *connection, + char *subscriptionName); /* Wrapper functions which wait for a subscriber to be ready and catchup */ extern void WaitForShardSplitRelationSubscriptionsBecomeReady( diff --git a/src/test/regress/enterprise_isolation_logicalrep_1_schedule b/src/test/regress/enterprise_isolation_logicalrep_1_schedule index 8a6f35af5..0c0a782d7 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_1_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_1_schedule @@ -7,3 +7,4 @@ test: isolation_cluster_management test: isolation_logical_replication_single_shard_commands test: isolation_logical_replication_multi_shard_commands +test: isolation_non_blocking_shard_split diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index a112ed537..d0a0f7351 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -11,9 +11,9 @@ Here is a high level overview of test plan: 8. Split an already split shard second time on a different schema. */ CREATE SCHEMA "citus_split_test_schema"; -CREATE ROLE test_split_role WITH LOGIN; -GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; -SET ROLE test_split_role; +CREATE ROLE test_shard_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_shard_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; SET citus.next_placement_id TO 8610000; @@ -196,7 +196,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN : Move one shard before we split it. \c - postgres - :master_port -SET ROLE test_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -380,7 +380,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- END : Display current state -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port -SET ROLE test_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out new file mode 100644 index 000000000..2cfa1a716 --- /dev/null +++ b/src/test/regress/expected/citus_sameer.out @@ -0,0 +1,124 @@ +/* +Citus Shard Split Test.The test is model similar to 'shard_move_constraints'. +Here is a high level overview of test plan: + 1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table. + 2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors. + 3. Create Foreign key constraints between the two co-located distributed tables. + 4. Load data into the three tables. + 5. Move one of the shards for 'sensors' to test ShardMove -> Split. + 6. Trigger Split on both shards of 'sensors'. This will also split co-located tables. + 7. Move one of the split shard to test Split -> ShardMove. + 8. Split an already split shard second time on a different schema. +*/ +CREATE SCHEMA "citus_split_test_schema"; +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981000; +SET citus.next_placement_id TO 8610000; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc. +CREATE TABLE sensors( + measureid integer, + eventdatetime date, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); +SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc. +-- BEGIN : Move one shard before we split it. +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981007; +SET citus.defer_drop_after_shard_move TO OFF; +SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- END : Move one shard before we split it. +-- BEGIN : Set node id variables +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END : Set node id variables +-- BEGIN : Split two shards : One with move and One without move. +-- Perform 2 way split +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); + table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size +--------------------------------------------------------------------- + sensors | 8981000 | citus_split_test_schema.sensors_8981000 | distributed | 1390009 | localhost | 57638 | 40960 + sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960 +(2 rows) + +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-1073741824'], + ARRAY[:worker_2_node, :worker_2_node], + 'force_logical'); +WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist +CONTEXT: while executing command on localhost:xxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); + table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size +--------------------------------------------------------------------- + sensors | 8981001 | citus_split_test_schema.sensors_8981001 | distributed | 1390009 | localhost | 57638 | 40960 + sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576 + sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576 +(3 rows) + +\c - - - :worker_2_port +SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_template_slot_8981000 + citus_shard_split_18_20648 +(2 rows) + +\c - - - :master_port +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981001, + ARRAY['536870911', '1610612735'], + ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist +CONTEXT: while executing command on localhost:xxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); + table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size +--------------------------------------------------------------------- + citus_split_test_schema.sensors | 102042 | citus_split_test_schema.sensors_102042 | distributed | 1390009 | localhost | 57637 | 8192 + citus_split_test_schema.sensors | 102043 | citus_split_test_schema.sensors_102043 | distributed | 1390009 | localhost | 57637 | 16384 + citus_split_test_schema.sensors | 102044 | citus_split_test_schema.sensors_102044 | distributed | 1390009 | localhost | 57638 | 16384 + citus_split_test_schema.sensors | 8981007 | citus_split_test_schema.sensors_8981007 | distributed | 1390009 | localhost | 57638 | 24576 + citus_split_test_schema.sensors | 8981008 | citus_split_test_schema.sensors_8981008 | distributed | 1390009 | localhost | 57638 | 24576 +(5 rows) + +\c - - - :worker_2_port +SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_template_slot_8981001 + citus_shard_split_16_20648 + citus_shard_split_18_20648 +(3 rows) + diff --git a/src/test/regress/expected/isolation_non_blocking_shard_split.out b/src/test/regress/expected/isolation_non_blocking_shard_split.out new file mode 100644 index 000000000..df959baea --- /dev/null +++ b/src/test/regress/expected/isolation_non_blocking_shard_split.out @@ -0,0 +1,647 @@ +unused step name: s2-insert-2 +unused step name: s2-select +Parsed test spec with 3 sessions + +starting permutation: s1-load-cache s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 +(1 row) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 111 +(1 row) + + +starting permutation: s1-load-cache s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-load-cache: + -- Indirect way to load cache. + TRUNCATE to_split_table; + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 0 +(2 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + + +starting permutation: s2-print-cluster s3-acquire-advisory-lock s1-begin s2-begin s1-non-blocking-shard-split s2-insert s2-end s2-print-cluster s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 0 +(1 row) + +id|value +--------------------------------------------------------------------- +(0 rows) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + + +starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-update s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-update: + UPDATE to_split_table SET value = 111 WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 1 +(2 rows) + + id|value +--------------------------------------------------------------------- +123456789| 111 +(1 row) + + +starting permutation: s2-insert s2-print-cluster s3-acquire-advisory-lock s1-begin s1-non-blocking-shard-split s2-delete s3-release-advisory-lock s1-end s2-print-cluster +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s2-insert: + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); + +get_shard_id_for_distribution_column +--------------------------------------------------------------------- + 1500001 +(1 row) + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57637|1500001|t | 1 +(1 row) + + id|value +--------------------------------------------------------------------- +123456789| 1 +(1 row) + +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: + BEGIN; + +s1: WARNING: replication slot "citus_shard_split_template_slot_1500001" does not exist +step s1-non-blocking-shard-split: + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); + +step s2-delete: + DELETE FROM to_split_table WHERE id = 123456789; + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-non-blocking-shard-split: <... completed> +citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +step s1-end: + COMMIT; + +step s2-print-cluster: + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; + +nodeport|shardid|success|result +--------------------------------------------------------------------- + 57638|1500002|t | 0 + 57638|1500003|t | 0 +(2 rows) + +id|value +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index a672587b6..7485f850f 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -65,10 +65,10 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, - ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index 5521253b8..7590388ca 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -63,8 +63,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index 66113d603..4dced5752 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -11,8 +11,8 @@ SET client_min_messages TO ERROR; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]); count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 56899ab0e..9b1ec403c 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -9,8 +9,8 @@ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. count diff --git a/src/test/regress/spec/isolation_non_blocking_shard_split.spec b/src/test/regress/spec/isolation_non_blocking_shard_split.spec new file mode 100644 index 000000000..550afe970 --- /dev/null +++ b/src/test/regress/spec/isolation_non_blocking_shard_split.spec @@ -0,0 +1,131 @@ +// we use 15 as the partition key value through out the test +// so setting the corresponding shard here is useful +setup +{ + SET citus.shard_count to 1; + SET citus.shard_replication_factor to 1; + SELECT setval('pg_dist_shardid_seq', 1500000); + + CREATE TABLE to_split_table (id int PRIMARY KEY, value int); + SELECT create_distributed_table('to_split_table', 'id'); +} + +teardown +{ + DROP TABLE to_split_table; +} + + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +// cache all placements +step "s1-load-cache" +{ + -- Indirect way to load cache. + TRUNCATE to_split_table; +} + +step "s1-non-blocking-shard-split" +{ + SELECT pg_catalog.citus_split_shard_by_split_points( + 1500001, + ARRAY['-1073741824'], + ARRAY[2, 2], + 'force_logical'); +} + +step "s1-end" +{ + COMMIT; +} + +session "s2" + +step "s2-begin" +{ + BEGIN; +} + +step "s2-insert" +{ + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (123456789, 1); +} + +step "s2-insert-2" +{ + SELECT get_shard_id_for_distribution_column('to_split_table', 123456789); + INSERT INTO to_split_table VALUES (1234567819, 1); +} + +step "s2-update" +{ + UPDATE to_split_table SET value = 111 WHERE id = 123456789; +} + +step "s2-delete" +{ + DELETE FROM to_split_table WHERE id = 123456789; +} + +step "s2-select" +{ + SELECT count(*) FROM to_split_table WHERE id = 123456789; +} + +step "s2-end" +{ + COMMIT; +} + +step "s2-print-cluster" +{ + -- row count per shard + SELECT + nodeport, shardid, success, result + FROM + run_command_on_placements('to_split_table', 'select count(*) from %s') + ORDER BY + nodeport, shardid; + -- rows + SELECT id, value FROM to_split_table ORDER BY id, value; +} + + +session "s3" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s3-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(44000, 55152); +} + +step "s3-release-advisory-lock" +{ + SELECT pg_advisory_unlock(44000, 55152); +} + +##// nonblocking tests lie below ### + +// move placement first +// the following tests show the non-blocking modifications while shard is being moved +// in fact, the shard move blocks the writes for a very short duration of time +// by using an advisory and allowing the other commands continue to run, we prevent +// the modifications to block on that blocking duration +//permutation "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-insert" "s3-release-advisory-lock" "s1-end" "s1-select" "s1-get-shard-distribution" + + +permutation "s1-load-cache" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s1-load-cache" "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" + +permutation "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s2-begin" "s1-non-blocking-shard-split" "s2-insert" "s2-end" "s2-print-cluster" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-update" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" +permutation "s2-insert" "s2-print-cluster" "s3-acquire-advisory-lock" "s1-begin" "s1-non-blocking-shard-split" "s2-delete" "s3-release-advisory-lock" "s1-end" "s2-print-cluster" diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 67d7e6013..b2773e1d8 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -21,3 +21,4 @@ test: citus_split_shard_by_split_points_failure # use citus_split_shard_columnar_partitioned instead. test: citus_split_shard_columnar_partitioned test: citus_non_blocking_split_shards +# test: citus_sameer diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index 5082ab7a5..d6569debf 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -13,9 +13,9 @@ Here is a high level overview of test plan: CREATE SCHEMA "citus_split_test_schema"; -CREATE ROLE test_split_role WITH LOGIN; -GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; -SET ROLE test_split_role; +CREATE ROLE test_shard_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_shard_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981000; @@ -119,7 +119,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN : Move one shard before we split it. \c - postgres - :master_port -SET ROLE test_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; SET citus.next_shard_id TO 8981007; SET citus.defer_drop_after_shard_move TO OFF; @@ -200,7 +200,7 @@ SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, node -- BEGIN: Should be able to change/drop constraints \c - postgres - :master_port -SET ROLE test_split_role; +SET ROLE test_shard_split_role; SET search_path TO "citus_split_test_schema"; ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql new file mode 100644 index 000000000..212b771c8 --- /dev/null +++ b/src/test/regress/sql/citus_sameer.sql @@ -0,0 +1,78 @@ +/* +Citus Shard Split Test.The test is model similar to 'shard_move_constraints'. +Here is a high level overview of test plan: + 1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table. + 2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors. + 3. Create Foreign key constraints between the two co-located distributed tables. + 4. Load data into the three tables. + 5. Move one of the shards for 'sensors' to test ShardMove -> Split. + 6. Trigger Split on both shards of 'sensors'. This will also split co-located tables. + 7. Move one of the split shard to test Split -> ShardMove. + 8. Split an already split shard second time on a different schema. +*/ + +CREATE SCHEMA "citus_split_test_schema"; + +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; + +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981000; +SET citus.next_placement_id TO 8610000; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; + +-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc. +CREATE TABLE sensors( + measureid integer, + eventdatetime date, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + +SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); +INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc. + + +-- BEGIN : Move one shard before we split it. +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981007; +SET citus.defer_drop_after_shard_move TO OFF; + +SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical'); +-- END : Move one shard before we split it. + +-- BEGIN : Set node id variables +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END : Set node id variables + +-- BEGIN : Split two shards : One with move and One without move. +-- Perform 2 way split +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-1073741824'], + ARRAY[:worker_2_node, :worker_2_node], + 'force_logical'); +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); + +\c - - - :worker_2_port +SELECT slot_name FROM pg_replication_slots; + +\c - - - :master_port +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981001, + ARRAY['536870911', '1610612735'], + ARRAY[:worker_2_node, :worker_2_node, :worker_2_node], + 'force_logical'); +SELECT * FROM citus_shards WHERE nodeport IN (:worker_1_port, :worker_2_port); + +\c - - - :worker_2_port +SELECT slot_name FROM pg_replication_slots; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 3dd627e7c..4733e1cf4 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -67,10 +67,10 @@ CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, - ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, + ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index a9a84b86d..176dd6576 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -67,8 +67,8 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index 4bdcb0887..a33b4e27d 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -14,8 +14,8 @@ CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_s -- Worker1 is target for table_to_split_2 and table_to_split_3 SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]); SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index bfb7dfed9..d9f8d527c 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -12,8 +12,8 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ - ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, - ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info + ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, + ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset