diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index fda5136e9..d590e29cc 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -248,7 +248,6 @@ static void RegisterWorkerNodeCacheCallbacks(void); static void RegisterLocalGroupIdCacheCallbacks(void); static void RegisterAuthinfoCacheCallbacks(void); static void RegisterCitusTableCacheEntryReleaseCallbacks(void); -static uint32 WorkerNodeHashCode(const void *key, Size keySize); static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry); static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry); static void CreateDistTableCache(void); @@ -3942,26 +3941,6 @@ RegisterAuthinfoCacheCallbacks(void) } -/* - * WorkerNodeHashCode computes the hash code for a worker node from the node's - * host name and port number. Nodes that only differ by their rack locations - * hash to the same value. - */ -static uint32 -WorkerNodeHashCode(const void *key, Size keySize) -{ - const WorkerNode *worker = (const WorkerNode *) key; - const char *workerName = worker->workerName; - const uint32 *workerPort = &(worker->workerPort); - - /* standard hash function outlined in Effective Java, Item 8 */ - uint32 result = 17; - result = 37 * result + string_hash(workerName, WORKER_LENGTH); - result = 37 * result + tag_hash(workerPort, sizeof(uint32)); - return result; -} - - /* * ResetCitusTableCacheEntry frees any out-of-band memory used by a cache entry, * but does not free the entry itself. diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index 848597e38..7674f7a2b 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -81,7 +81,10 @@ LookupSplitMode(Oid shardSplitModeOid) { shardSplitMode = BLOCKING_SPLIT; } - + else if (strncmp(enumLabel, "non_blocking", NAMEDATALEN) == 0) + { + shardSplitMode = NON_BLOCKING_SPLIT; + } /* Extend with other modes as we support them */ else { diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1b779f65b..d8a27e1b4 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "nodes/pg_list.h" #include "utils/array.h" #include "distributed/utils/array_type.h" @@ -32,6 +33,7 @@ #include "distributed/pg_dist_shard.h" #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" +#include "commands/dbcommands.h" /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, @@ -42,6 +44,19 @@ static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); +static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + WorkerNode *sourceWorkerNode, + List *workersForPlacementList); +static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + WorkerNode *sourceWorkerNode, + List *workersForPlacementList); +static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -53,6 +68,11 @@ static void BlockingShardSplit(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *workersForPlacementList); +static void NonBlockingShardSplit(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList); + static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -382,8 +402,11 @@ SplitShard(SplitMode splitMode, } else { - /* we only support blocking shard split in this code path for now. */ - ereport(ERROR, (errmsg("Invalid split mode value %d.", splitMode))); + NonBlockingShardSplit( + splitOperation, + shardIntervalToSplit, + shardSplitPointsList, + workersForPlacementList); } } @@ -750,11 +773,11 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, SHARD_STATE_ACTIVE, 0, /* shard length (zero for HashDistributed Table) */ workerPlacementNode->groupId); - } - if (ShouldSyncTableMetadata(shardInterval->relationId)) - { - syncedShardList = lappend(syncedShardList, shardInterval); + if (ShouldSyncTableMetadata(shardInterval->relationId)) + { + syncedShardList = lappend(syncedShardList, shardInterval); + } } } @@ -918,3 +941,273 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, } } } + +/* + * SplitShard API to split a given shard (or shard group) in blocking fashion + * based on specified split points to a set of destination nodes. + * 'splitOperation' : Customer operation that triggered split. + * 'shardIntervalToSplit' : Source shard interval to be split. + * 'shardSplitPointsList' : Split Points list for the source 'shardInterval'. + * 'workersForPlacementList' : Placement list corresponding to split children. + */ +static void +NonBlockingShardSplit(SplitOperation splitOperation, + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList) +{ + List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( + shardIntervalToSplit); + + /* First create shard interval metadata for split children */ + List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( + sourceColocatedShardIntervalList, + shardSplitPointsList); + + /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ + List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId); + Assert(sourcePlacementList->length == 1); + ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial( + sourcePlacementList); + WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, + false /* missingOk */); + + PG_TRY(); + { + /* + * Physically create split children, perform split copy and create auxillary structures. + * This includes: indexes, replicaIdentity. triggers and statistics. + * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). + */ + CreateSplitShardsForShardGroupTwo( + sourceShardToCopyNode, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + workersForPlacementList); + + CreateDummyShardsForShardGroup( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + sourceShardToCopyNode, + workersForPlacementList); + + SplitShardReplicationSetup( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + sourceShardToCopyNode, + workersForPlacementList); + + } + PG_CATCH(); + { + /* Do a best effort cleanup of shards created on workers in the above block */ + TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, + workersForPlacementList); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* Create ShardGroup split children on a list of corresponding workers. */ +static void +CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList) +{ + /* Iterate on shard interval list for shard group */ + List *shardIntervalList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + /* Iterate on split shard interval list and corresponding placement worker */ + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + /* Populate list of commands necessary to create shard interval on destination */ + List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( + shardInterval->relationId, + false, /* includeSequenceDefaults */ + NULL /* auto add columnar options for cstore tables */); + splitShardCreationCommandList = WorkerApplyShardDDLCommandList( + splitShardCreationCommandList, + shardInterval->shardId); + + /* Create new split child shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + } + } +} + +static void +CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + WorkerNode *sourceWorkerNode, + List *workersForPlacementList) +{ + /* + * Statisfy Constraint 1: Create dummy source shard(s) on all destination nodes. + * If source node is also in desintation, skip dummy shard creation(see Note 1 from function description). + * We are guarenteed to have a single active placement for source shard. This is enforced earlier by ErrorIfCannotSplitShardExtended. + */ + + /* List 'workersForPlacementList' can have duplicates. We need all unique destination nodes. */ + HTAB *workersForPlacementSet = CreateWorkerForPlacementSet(workersForPlacementList); + + HASH_SEQ_STATUS status; + hash_seq_init(&status, workersForPlacementSet); + WorkerNode *workerPlacementNode = NULL; + while ((workerPlacementNode = (WorkerNode *) hash_seq_search(&status)) != NULL) + { + if (workerPlacementNode->nodeId == sourceWorkerNode->nodeId) + { + continue; + } + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, sourceColocatedShardIntervalList) + { + /* Populate list of commands necessary to create shard interval on destination */ + List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( + shardInterval->relationId, + false, /* includeSequenceDefaults */ + NULL /* auto add columnar options for cstore tables */); + splitShardCreationCommandList = WorkerApplyShardDDLCommandList( + splitShardCreationCommandList, + shardInterval->shardId); + + /* Create new split child shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + } + } + + /* + * Statisfy Constraint 2: Create dummy target shards from shard group on source node. + * If the target shard was created on source node as placement, skip it (See Note 2 from function description). + */ + List *shardIntervalList = NULL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + if (workerPlacementNode->nodeId == sourceWorkerNode->nodeId) + { + continue; + } + + List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( + shardInterval->relationId, + false, /* includeSequenceDefaults */ + NULL /* auto add columnar options for cstore tables */); + splitShardCreationCommandList = WorkerApplyShardDDLCommandList( + splitShardCreationCommandList, + shardInterval->shardId); + + /* Create new split child shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + } + } +} + +static HTAB * +CreateWorkerForPlacementSet(List *workersForPlacementList) +{ + HASHCTL info = { 0 }; + info.keysize = sizeof(WorkerNode); + info.hash = WorkerNodeHashCode; + info.match = WorkerNodeCompare; + + /* we don't have value field as it's a set */ + info.entrysize = info.keysize; + + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + HTAB *workerForPlacementSet = hash_create("worker placement set", 32, &info, + hashFlags); + + WorkerNode *workerForPlacement = NULL; + foreach_ptr(workerForPlacement, workersForPlacementList) + { + void *hashKey = (void *) workerForPlacement; + hash_search(workerForPlacementSet, hashKey, HASH_ENTER, NULL); + } + + return workerForPlacementSet; +} + + +static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + WorkerNode *sourceWorkerNode, + List *destinationWorkerNodesList) +{ + StringInfo splitChildrenRows = makeStringInfo(); + + ShardInterval *sourceShardIntervalToCopy = NULL; + List *splitChildShardIntervalList = NULL; + bool addComma = false; + forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitChildShardIntervalList, shardGroupSplitIntervalListList) + { + int64 sourceShardId = sourceShardIntervalToCopy->shardId; + + ShardInterval *splitChildShardInterval = NULL; + WorkerNode *destinationWorkerNode = NULL; + forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, + destinationWorkerNode, destinationWorkerNodesList) + { + if (addComma) + { + appendStringInfo(splitChildrenRows, ","); + } + + StringInfo minValueString = makeStringInfo(); + appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue)); + + StringInfo maxValueString = makeStringInfo(); + appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue)); + + appendStringInfo(splitChildrenRows, + "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", + sourceShardId, + splitChildShardInterval->shardId, + quote_literal_cstr(minValueString->data), + quote_literal_cstr(maxValueString->data), + destinationWorkerNode->nodeId); + + addComma = true; + } + } + + StringInfo splitShardReplicationUDF = makeStringInfo(); + appendStringInfo(splitShardReplicationUDF, + "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data); + + int connectionFlags = FORCE_NEW_CONNECTION; + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceWorkerNode-> + workerName, + sourceWorkerNode-> + workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + ClaimConnectionExclusively(sourceConnection); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result); + + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result)) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("Failed to run worker_split_shard_replication_setup"))); + + PQclear(result); + ForgetResults(sourceConnection); + } +} \ No newline at end of file diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index 363ca516d..fa59957b5 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -9,14 +9,19 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "miscadmin.h" +#include "postmaster/postmaster.h" #include "common/hashfn.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" +#include "distributed/connection_management.h" #include "distributed/citus_safe_lib.h" #include "distributed/listutils.h" +#include "distributed/remote_commands.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "commands/dbcommands.h" /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(worker_split_shard_replication_setup); @@ -59,6 +64,10 @@ static uint32 NodeShardMappingHash(const void *key, Size keysize); static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); +static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap); +StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList); +char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo); + /* * worker_split_shard_replication_setup UDF creates in-memory data structures * to store the meta information about the shard undergoing split and new split @@ -148,6 +157,8 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) /* store handle in statically allocated shared memory*/ StoreShardSplitSharedMemoryHandle(dsmHandle); + CreatePublishersForSplitChildren(ShardInfoHashMap); + PG_RETURN_VOID(); } @@ -263,6 +274,8 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, shardSplitInfo->shardMinValue = minValue; shardSplitInfo->shardMaxValue = maxValue; shardSplitInfo->nodeId = nodeId; + shardSplitInfo->sourceShardId = sourceShardIdToSplit; + shardSplitInfo->splitChildShardId = desSplitChildShardId; return shardSplitInfo; } @@ -417,3 +430,104 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, *nodeId = DatumGetInt32(nodeIdDatum); } + +static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap) +{ + + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + int splitInfoIndex = 0; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32_t nodeId = entry->key.nodeId; + uint32_t tableOwnerId = entry->key.tableOwnerId; + + int connectionFlags = FORCE_NEW_CONNECTION; + printf("Sameer getting new connection \n"); + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + "localhost", + PostPortNumber, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(entry->shardSplitInfoList); + + StringInfo command = makeStringInfo(); + appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", nodeId, tableOwnerId,shardNamesForPublication->data); + ExecuteCriticalRemoteCommand(sourceConnection, command->data); + printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false)); + } +} + +StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) +{ + HASHCTL info; + int flags = HASH_ELEM | HASH_CONTEXT; + + /* initialise the hash table */ + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(uint64); + info.entrysize = sizeof(uint64); + info.hcxt = CurrentMemoryContext; + + HTAB *sourceShardIdSet = hash_create("Source ShardId Set", 128, &info, flags); + + /* Get child shard names */ + StringInfo allShardNames = makeStringInfo(); + bool addComma = false; + + ShardSplitInfo *shardSplitInfo = NULL; + foreach_ptr(shardSplitInfo, shardSplitInfoList) + { + /* add source shard id to the hash table to get list of unique source shard ids */ + bool found = false; + uint64 sourceShardId = shardSplitInfo->sourceShardId; + hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found); + + if(addComma) + { + appendStringInfo(allShardNames, ","); + } + + /* Append fully qualified split child shard name */ + char *childShardName = ConstructFullyQualifiedSplitChildShardName(shardSplitInfo); + appendStringInfo(allShardNames, childShardName); + addComma = true; + } + + + HASH_SEQ_STATUS status; + hash_seq_init(&status, sourceShardIdSet); + uint64 *sourceShardIdEntry = NULL; + while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL) + { + ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry); + char* sourceShardName = ConstructQualifiedShardName(sourceShardInterval); + + if(addComma) + { + appendStringInfo(allShardNames, ","); + } + + appendStringInfo(allShardNames, sourceShardName); + addComma = true; + } + + return allShardNames; +} + +char * +ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo) +{ + Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid); + char *schemaName = get_namespace_name(schemaId); + char *tableName = get_rel_name(shardSplitInfo->distributedTableOid); + + char *shardName = pstrdup(tableName); + AppendShardIdToName(&shardName, shardSplitInfo->splitChildShardId); + shardName = quote_qualified_identifier(schemaName, shardName); + + return shardName; +} \ No newline at end of file diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 16c0afb54..c4b5c6f00 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -32,6 +32,11 @@ #include "utils/guc.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#if PG_VERSION_NUM < PG_VERSION_13 +#include "utils/hashutils.h" +#else +#include "common/hashfn.h" +#endif /* Config variables managed via guc.c */ @@ -361,6 +366,26 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) } +/* + * WorkerNodeHashCode computes the hash code for a worker node from the node's + * host name and port number. Nodes that only differ by their rack locations + * hash to the same value. + */ +uint32 +WorkerNodeHashCode(const void *key, Size keySize) +{ + const WorkerNode *worker = (const WorkerNode *) key; + const char *workerName = worker->workerName; + const uint32 *workerPort = &(worker->workerPort); + + /* standard hash function outlined in Effective Java, Item 8 */ + uint32 result = 17; + result = 37 * result + string_hash(workerName, WORKER_LENGTH); + result = 37 * result + tag_hash(workerPort, sizeof(uint32)); + return result; +} + + /* * NodeNamePortCompare implements the common logic for comparing two nodes * with their given nodeNames and ports. diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 2a63ad0bd..ce92cfa75 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -147,6 +147,7 @@ static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds); static char * ShardMovePublicationName(Oid ownerId); static char * ShardMoveSubscriptionName(Oid ownerId); +static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId); static void AcquireLogicalReplicationLock(void); static void DropAllShardMoveLeftovers(void); static void DropAllShardMoveSubscriptions(MultiConnection *connection); @@ -2061,3 +2062,12 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds) "SELECT min(latest_end_lsn) FROM pg_stat_subscription " "WHERE subname IN %s", subscriptionValueList)); } + +/* + * ShardSplitPublicationName returns the name of the publication for the given + * table owner. + */ +static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId) +{ + return psprintf("%s%i_%u", SHARD_SPLIT_PUBLICATION_PREFIX, ownerId, nodeId); +} diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql index 559769260..cd6701e2d 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/11.0-2.sql @@ -3,7 +3,8 @@ DROP TYPE IF EXISTS citus.split_mode; -- Three modes to be implemented: blocking, non_blocking and auto. -- Currently, the default / only supported mode is blocking. CREATE TYPE citus.split_mode AS ENUM ( - 'blocking' + 'blocking', + 'non_blocking' ); CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( diff --git a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql index 559769260..cd6701e2d 100644 --- a/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_split_shard_by_split_points/latest.sql @@ -3,7 +3,8 @@ DROP TYPE IF EXISTS citus.split_mode; -- Three modes to be implemented: blocking, non_blocking and auto. -- Currently, the default / only supported mode is blocking. CREATE TYPE citus.split_mode AS ENUM ( - 'blocking' + 'blocking', + 'non_blocking' ); CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points( diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql index 7f35d2b4f..b9c5869d2 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/11.0-2.sql @@ -14,4 +14,4 @@ RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) - IS 'Perform split copy for shard' + IS 'Perform split copy for shard'; diff --git a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql index 7f35d2b4f..b9c5869d2 100644 --- a/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_copy/latest.sql @@ -14,4 +14,4 @@ RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_copy$$; COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[]) - IS 'Perform split copy for shard' + IS 'Perform split copy for shard'; diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 629179c54..e250072e0 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -29,5 +29,6 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" #define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" #define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_" +#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_" #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 521a9b2b2..042d6eb20 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -15,7 +15,8 @@ /* Split Modes supported by Shard Split API */ typedef enum SplitMode { - BLOCKING_SPLIT = 0 + BLOCKING_SPLIT = 0, + NON_BLOCKING_SPLIT = 1 } SplitMode; /* @@ -40,6 +41,8 @@ typedef struct ShardSplitInfo int32 shardMinValue; /* min hash value */ int32 shardMaxValue; /* max hash value */ uint32_t nodeId; /* node where child shard is to be placed */ + uint64 sourceShardId; /* parent shardId */ + uint64 splitChildShardId; /* child shardId*/ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ } ShardSplitInfo; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index e861b8a65..e7b1a7969 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -110,6 +110,7 @@ extern List * PgDistTableMetadataSyncCommandList(void); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); +extern uint32 WorkerNodeHashCode(const void *key, Size keySize); extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize); extern int NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName, int workerLhsPort, int workerRhsPort); diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out new file mode 100644 index 000000000..d8b006741 --- /dev/null +++ b/src/test/regress/expected/citus_sameer.out @@ -0,0 +1,145 @@ +-- Negative test cases for citus_split_shard_by_split_points UDF. +CREATE SCHEMA citus_split_shard_by_split_points_negative; +SET search_path TO citus_split_shard_by_split_points_negative; +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 60761300; +CREATE TABLE range_paritioned_table_to_split(rid bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('range_paritioned_table_to_split', 'rid', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Shards are not created automatically for range distributed table. +SELECT master_create_empty_shard('range_paritioned_table_to_split'); + master_create_empty_shard +--------------------------------------------------------------------- + 60761300 +(1 row) + +SET citus.next_shard_id TO 49761300; +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +-- Shard1 | -2147483648 | -1073741825 +-- Shard2 | -1073741824 | -1 +-- Shard3 | 0 | 1073741823 +-- Shard4 | 1073741824 | 2147483647 +SELECT create_distributed_table('table_to_split','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- UDF fails for range partitioned tables. +SELECT citus_split_shard_by_split_points( + 60761300, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Cannot split shard as operation is only supported for hash distributed tables. +-- UDF fails if number of placement node list does not exceed split points by one. +-- Example: One split point defines two way split (2 worker nodes needed). +SELECT citus_split_shard_by_split_points( + 49761300, + -- 2 split points defined making it a 3 way split but we only specify 2 placement lists. + ARRAY['-1073741826', '-107374182'], + ARRAY[:worker_1_node, :worker_2_node]); -- 2 worker nodes. +ERROR: Number of worker node ids should be one greater split points. NodeId count is '2' and SplitPoint count is '2'. +-- UDF fails if split ranges specified are not within the shard id to split. +SELECT citus_split_shard_by_split_points( + 49761300, -- Shard range is from (-2147483648, -1073741825) + ARRAY['0'], -- The range we specified is 0 which is not in the range. + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Split point 0 is outside the min/max range(-2147483648, -1073741825) for shard id 49761300. +-- UDF fails if split points are not strictly increasing. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50', '35'], + ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]); +ERROR: Invalid Split Points '50' followed by '35'. All split points should be strictly increasing. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50', '50'], + ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]); +ERROR: Invalid Split Points '50' followed by '50'. All split points should be strictly increasing. +-- UDF fails if nodeIds are < 1 or Invalid. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[0, :worker_2_node]); +ERROR: Invalid Node Id '0'. +SELECT citus_split_shard_by_split_points( + 49761302, + ARRAY['50'], + ARRAY[101, 201]); +ERROR: Invalid Node Id '101'. +-- UDF fails if split point specified is equal to the max value in the range. +-- Example: ShardId 81060002 range is from (-2147483648, -1073741825) +-- '-1073741825' as split point is invalid. +-- '-1073741826' is valid and will split to: (-2147483648, -1073741826) and (-1073741825, -1073741825) +SELECT citus_split_shard_by_split_points( + 49761300, -- Shard range is from (-2147483648, -1073741825) + ARRAY['-1073741825'], -- Split point equals shard's max value. + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead. +-- UDF fails where source shard cannot be split further i.e min and max range is equal. +-- Create a Shard where range cannot be split further +SELECT isolate_tenant_to_new_shard('table_to_split', 1); + isolate_tenant_to_new_shard +--------------------------------------------------------------------- + 49761305 +(1 row) + +SELECT citus_split_shard_by_split_points( + 49761305, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Cannot split shard id "49761305" as min/max range are equal: ('-1905060026', '-1905060026'). +-- Create distributed table with replication factor > 1 +SET citus.shard_replication_factor TO 2; +SET citus.next_shard_id TO 51261400; +CREATE TABLE table_to_split_replication_factor_2 (id bigserial PRIMARY KEY, value char); +SELECT create_distributed_table('table_to_split_replication_factor_2','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- UDF fails for replication factor > 1 +SELECT citus_split_shard_by_split_points( + 51261400, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Operation split not supported for shard as replication factor '2' is greater than 1. +-- Create distributed table with columnar type. +SET citus.next_shard_id TO 51271400; +CREATE TABLE table_to_split_columnar (id bigserial PRIMARY KEY, value char) USING columnar; +SELECT create_distributed_table('table_to_split_columnar','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- UDF fails for columnar table. +SELECT citus_split_shard_by_split_points( + 51271400, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: Cannot split shard as operation is not supported for Columnar tables. +-- Create distributed table which is partitioned. +SET citus.next_shard_id TO 51271900; +CREATE TABLE table_to_split_partitioned(id integer, dt date) PARTITION BY RANGE(dt); +SELECT create_distributed_table('table_to_split_partitioned','id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- UDF fails for partitioned table. +SELECT citus_split_shard_by_split_points( + 51271900, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node]); +ERROR: cannot split of 'table_to_split_partitioned', because it is a partitioned table +DETAIL: In colocation group of 'table_to_split_partitioned', a partitioned relation exists: 'table_to_split_partitioned'. Citus does not support split of partitioned tables. diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 9d996a892..ff8bd7d94 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -5,7 +5,9 @@ test: multi_cluster_management test: multi_test_catalog_views test: tablespace # Split tests go here. -test: worker_split_binary_copy_test -test: worker_split_text_copy_test -test: citus_split_shard_by_split_points_negative -test: citus_split_shard_by_split_points +#test: worker_split_binary_copy_test +#test: worker_split_text_copy_test +#test: citus_split_shard_by_split_points_negative +test: citus_sameer +#test: citus_split_shard_by_split_points +#test: split_shard_replication_setup diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql new file mode 100644 index 000000000..c49b8ad2d --- /dev/null +++ b/src/test/regress/sql/citus_sameer.sql @@ -0,0 +1,64 @@ +-- Negative test cases for citus_split_shard_by_split_points UDF. + +CREATE SCHEMA citus_split_shard_by_split_points_negative; +SET search_path TO citus_split_shard_by_split_points_negative; +SET citus.shard_count TO 1; +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 1; + +CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); +-- Shard1 | -2147483648 | -1073741825 +-- Shard2 | -1073741824 | -1 +-- Shard3 | 0 | 1073741823 +-- Shard4 | 1073741824 | 2147483647 +SELECT create_distributed_table('table_to_split','id'); +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); + + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +SELECT * FROM citus_shards; +SELECT * FROM pg_dist_shard; + +SET client_min_messages TO LOG; +SET citus.log_remote_commands TO on; + +CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", + c.relname as "Name", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner" +FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind IN ('r','p','') + AND n.nspname <> 'pg_catalog' + AND n.nspname !~ '^pg_toast' + AND n.nspname <> 'information_schema' + AND pg_catalog.pg_table_is_visible(c.oid) +ORDER BY 1,2; + +-- UDF fails for range partitioned tables. +\c - - - :master_port +SET citus.log_remote_commands TO on; +SET citus.next_shard_id TO 100; +SET search_path TO citus_split_shard_by_split_points_negative; + +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset + +SELECT citus_split_shard_by_split_points( + 1, + ARRAY['-1073741826'], + ARRAY[:worker_1_node, :worker_2_node], + 'non_blocking'); +-- On worker2, we want child shard 2 and dummy shard 1 -- +-- on worker1, we want child shard 3 and 1 and dummy shard 2 -- + +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points_negative; +SELECT * FROM show_catalog; + +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points_negative; +SELECT * FROM show_catalog; +SELECT * FROM pg_publication_tables; \ No newline at end of file diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 814b4b8ae..b8f9ace57 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -159,3 +159,4 @@ DROP PUBLICATION pub1; SET search_path TO split_shard_replication_setup_schema; SET client_min_messages TO ERROR; DROP SUBSCRIPTION sub1; +