Non-Blocking Shard Split

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-10 19:14:41 +05:30
parent 49cada7e9e
commit 0d26c4c8c7
17 changed files with 680 additions and 37 deletions

View File

@ -248,7 +248,6 @@ static void RegisterWorkerNodeCacheCallbacks(void);
static void RegisterLocalGroupIdCacheCallbacks(void);
static void RegisterAuthinfoCacheCallbacks(void);
static void RegisterCitusTableCacheEntryReleaseCallbacks(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
static void ResetCitusTableCacheEntry(CitusTableCacheEntry *cacheEntry);
static void RemoveStaleShardIdCacheEntries(CitusTableCacheEntry *tableEntry);
static void CreateDistTableCache(void);
@ -3942,26 +3941,6 @@ RegisterAuthinfoCacheCallbacks(void)
}
/*
* WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations
* hash to the same value.
*/
static uint32
WorkerNodeHashCode(const void *key, Size keySize)
{
const WorkerNode *worker = (const WorkerNode *) key;
const char *workerName = worker->workerName;
const uint32 *workerPort = &(worker->workerPort);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + string_hash(workerName, WORKER_LENGTH);
result = 37 * result + tag_hash(workerPort, sizeof(uint32));
return result;
}
/*
* ResetCitusTableCacheEntry frees any out-of-band memory used by a cache entry,
* but does not free the entry itself.

View File

@ -81,7 +81,10 @@ LookupSplitMode(Oid shardSplitModeOid)
{
shardSplitMode = BLOCKING_SPLIT;
}
else if (strncmp(enumLabel, "non_blocking", NAMEDATALEN) == 0)
{
shardSplitMode = NON_BLOCKING_SPLIT;
}
/* Extend with other modes as we support them */
else
{

View File

@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "utils/array.h"
#include "distributed/utils/array_type.h"
@ -32,6 +33,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h"
#include "commands/dbcommands.h"
/* Function declarations */
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
@ -42,6 +44,19 @@ static void CreateSplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *workersForPlacementList);
static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *workersForPlacementList);
static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
@ -53,6 +68,11 @@ static void BlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *workersForPlacementList);
static void NonBlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *workersForPlacementList);
static void DoSplitCopy(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
@ -382,8 +402,11 @@ SplitShard(SplitMode splitMode,
}
else
{
/* we only support blocking shard split in this code path for now. */
ereport(ERROR, (errmsg("Invalid split mode value %d.", splitMode)));
NonBlockingShardSplit(
splitOperation,
shardIntervalToSplit,
shardSplitPointsList,
workersForPlacementList);
}
}
@ -750,11 +773,11 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
SHARD_STATE_ACTIVE,
0, /* shard length (zero for HashDistributed Table) */
workerPlacementNode->groupId);
}
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
syncedShardList = lappend(syncedShardList, shardInterval);
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
syncedShardList = lappend(syncedShardList, shardInterval);
}
}
}
@ -918,3 +941,273 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
}
}
}
/*
* SplitShard API to split a given shard (or shard group) in blocking fashion
* based on specified split points to a set of destination nodes.
* 'splitOperation' : Customer operation that triggered split.
* 'shardIntervalToSplit' : Source shard interval to be split.
* 'shardSplitPointsList' : Split Points list for the source 'shardInterval'.
* 'workersForPlacementList' : Placement list corresponding to split children.
*/
static void
NonBlockingShardSplit(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *workersForPlacementList)
{
List *sourceColocatedShardIntervalList = ColocatedShardIntervalList(
shardIntervalToSplit);
/* First create shard interval metadata for split children */
List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup(
sourceColocatedShardIntervalList,
shardSplitPointsList);
/* Only single placement allowed (already validated RelationReplicationFactor = 1) */
List *sourcePlacementList = ActiveShardPlacementList(shardIntervalToSplit->shardId);
Assert(sourcePlacementList->length == 1);
ShardPlacement *sourceShardPlacement = (ShardPlacement *) linitial(
sourcePlacementList);
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
PG_TRY();
{
/*
* Physically create split children, perform split copy and create auxillary structures.
* This includes: indexes, replicaIdentity. triggers and statistics.
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateSplitShardsForShardGroupTwo(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
CreateDummyShardsForShardGroup(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
SplitShardReplicationSetup(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
}
PG_CATCH();
{
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList,
workersForPlacementList);
PG_RE_THROW();
}
PG_END_TRY();
}
/* Create ShardGroup split children on a list of corresponding workers. */
static void
CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/* Iterate on shard interval list for shard group */
List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
/* Iterate on split shard interval list and corresponding placement worker */
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
/* Populate list of commands necessary to create shard interval on destination */
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
shardInterval->shardId);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
}
}
}
static void
CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *workersForPlacementList)
{
/*
* Statisfy Constraint 1: Create dummy source shard(s) on all destination nodes.
* If source node is also in desintation, skip dummy shard creation(see Note 1 from function description).
* We are guarenteed to have a single active placement for source shard. This is enforced earlier by ErrorIfCannotSplitShardExtended.
*/
/* List 'workersForPlacementList' can have duplicates. We need all unique destination nodes. */
HTAB *workersForPlacementSet = CreateWorkerForPlacementSet(workersForPlacementList);
HASH_SEQ_STATUS status;
hash_seq_init(&status, workersForPlacementSet);
WorkerNode *workerPlacementNode = NULL;
while ((workerPlacementNode = (WorkerNode *) hash_seq_search(&status)) != NULL)
{
if (workerPlacementNode->nodeId == sourceWorkerNode->nodeId)
{
continue;
}
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, sourceColocatedShardIntervalList)
{
/* Populate list of commands necessary to create shard interval on destination */
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
shardInterval->shardId);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
}
}
/*
* Statisfy Constraint 2: Create dummy target shards from shard group on source node.
* If the target shard was created on source node as placement, skip it (See Note 2 from function description).
*/
List *shardIntervalList = NULL;
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *shardInterval = NULL;
workerPlacementNode = NULL;
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
if (workerPlacementNode->nodeId == sourceWorkerNode->nodeId)
{
continue;
}
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands(
shardInterval->relationId,
false, /* includeSequenceDefaults */
NULL /* auto add columnar options for cstore tables */);
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
shardInterval->shardId);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
}
}
}
static HTAB *
CreateWorkerForPlacementSet(List *workersForPlacementList)
{
HASHCTL info = { 0 };
info.keysize = sizeof(WorkerNode);
info.hash = WorkerNodeHashCode;
info.match = WorkerNodeCompare;
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
HTAB *workerForPlacementSet = hash_create("worker placement set", 32, &info,
hashFlags);
WorkerNode *workerForPlacement = NULL;
foreach_ptr(workerForPlacement, workersForPlacementList)
{
void *hashKey = (void *) workerForPlacement;
hash_search(workerForPlacementSet, hashKey, HASH_ENTER, NULL);
}
return workerForPlacementSet;
}
static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList)
{
StringInfo splitChildrenRows = makeStringInfo();
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL;
bool addComma = false;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
int64 sourceShardId = sourceShardIntervalToCopy->shardId;
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList)
{
if (addComma)
{
appendStringInfo(splitChildrenRows, ",");
}
StringInfo minValueString = makeStringInfo();
appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue));
StringInfo maxValueString = makeStringInfo();
appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows,
"ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info",
sourceShardId,
splitChildShardInterval->shardId,
quote_literal_cstr(minValueString->data),
quote_literal_cstr(maxValueString->data),
destinationWorkerNode->nodeId);
addComma = true;
}
}
StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data);
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
ClaimConnectionExclusively(sourceConnection);
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result))
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("Failed to run worker_split_shard_replication_setup")));
PQclear(result);
ForgetResults(sourceConnection);
}
}

View File

@ -9,14 +9,19 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "common/hashfn.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_utils.h"
#include "distributed/shardsplit_shared_memory.h"
#include "distributed/connection_management.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/listutils.h"
#include "distributed/remote_commands.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "commands/dbcommands.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(worker_split_shard_replication_setup);
@ -59,6 +64,10 @@ static uint32 NodeShardMappingHash(const void *key, Size keysize);
static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap);
StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList);
char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo);
/*
* worker_split_shard_replication_setup UDF creates in-memory data structures
* to store the meta information about the shard undergoing split and new split
@ -148,6 +157,8 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
/* store handle in statically allocated shared memory*/
StoreShardSplitSharedMemoryHandle(dsmHandle);
CreatePublishersForSplitChildren(ShardInfoHashMap);
PG_RETURN_VOID();
}
@ -263,6 +274,8 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
shardSplitInfo->shardMinValue = minValue;
shardSplitInfo->shardMaxValue = maxValue;
shardSplitInfo->nodeId = nodeId;
shardSplitInfo->sourceShardId = sourceShardIdToSplit;
shardSplitInfo->splitChildShardId = desSplitChildShardId;
return shardSplitInfo;
}
@ -417,3 +430,104 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
*nodeId = DatumGetInt32(nodeIdDatum);
}
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
int splitInfoIndex = 0;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId;
int connectionFlags = FORCE_NEW_CONNECTION;
printf("Sameer getting new connection \n");
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
"localhost",
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(entry->shardSplitInfoList);
StringInfo command = makeStringInfo();
appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", nodeId, tableOwnerId,shardNamesForPublication->data);
ExecuteCriticalRemoteCommand(sourceConnection, command->data);
printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false));
}
}
StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList)
{
HASHCTL info;
int flags = HASH_ELEM | HASH_CONTEXT;
/* initialise the hash table */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint64);
info.entrysize = sizeof(uint64);
info.hcxt = CurrentMemoryContext;
HTAB *sourceShardIdSet = hash_create("Source ShardId Set", 128, &info, flags);
/* Get child shard names */
StringInfo allShardNames = makeStringInfo();
bool addComma = false;
ShardSplitInfo *shardSplitInfo = NULL;
foreach_ptr(shardSplitInfo, shardSplitInfoList)
{
/* add source shard id to the hash table to get list of unique source shard ids */
bool found = false;
uint64 sourceShardId = shardSplitInfo->sourceShardId;
hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found);
if(addComma)
{
appendStringInfo(allShardNames, ",");
}
/* Append fully qualified split child shard name */
char *childShardName = ConstructFullyQualifiedSplitChildShardName(shardSplitInfo);
appendStringInfo(allShardNames, childShardName);
addComma = true;
}
HASH_SEQ_STATUS status;
hash_seq_init(&status, sourceShardIdSet);
uint64 *sourceShardIdEntry = NULL;
while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL)
{
ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry);
char* sourceShardName = ConstructQualifiedShardName(sourceShardInterval);
if(addComma)
{
appendStringInfo(allShardNames, ",");
}
appendStringInfo(allShardNames, sourceShardName);
addComma = true;
}
return allShardNames;
}
char *
ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo)
{
Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(shardSplitInfo->distributedTableOid);
char *shardName = pstrdup(tableName);
AppendShardIdToName(&shardName, shardSplitInfo->splitChildShardId);
shardName = quote_qualified_identifier(schemaName, shardName);
return shardName;
}

View File

@ -32,6 +32,11 @@
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#if PG_VERSION_NUM < PG_VERSION_13
#include "utils/hashutils.h"
#else
#include "common/hashfn.h"
#endif
/* Config variables managed via guc.c */
@ -361,6 +366,26 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
}
/*
* WorkerNodeHashCode computes the hash code for a worker node from the node's
* host name and port number. Nodes that only differ by their rack locations
* hash to the same value.
*/
uint32
WorkerNodeHashCode(const void *key, Size keySize)
{
const WorkerNode *worker = (const WorkerNode *) key;
const char *workerName = worker->workerName;
const uint32 *workerPort = &(worker->workerPort);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + string_hash(workerName, WORKER_LENGTH);
result = 37 * result + tag_hash(workerPort, sizeof(uint32));
return result;
}
/*
* NodeNamePortCompare implements the common logic for comparing two nodes
* with their given nodeNames and ports.

View File

@ -147,6 +147,7 @@ static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection,
Bitmapset *tableOwnerIds);
static char * ShardMovePublicationName(Oid ownerId);
static char * ShardMoveSubscriptionName(Oid ownerId);
static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId);
static void AcquireLogicalReplicationLock(void);
static void DropAllShardMoveLeftovers(void);
static void DropAllShardMoveSubscriptions(MultiConnection *connection);
@ -2061,3 +2062,12 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds)
"SELECT min(latest_end_lsn) FROM pg_stat_subscription "
"WHERE subname IN %s", subscriptionValueList));
}
/*
* ShardSplitPublicationName returns the name of the publication for the given
* table owner.
*/
static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId)
{
return psprintf("%s%i_%u", SHARD_SPLIT_PUBLICATION_PREFIX, ownerId, nodeId);
}

View File

@ -3,7 +3,8 @@ DROP TYPE IF EXISTS citus.split_mode;
-- Three modes to be implemented: blocking, non_blocking and auto.
-- Currently, the default / only supported mode is blocking.
CREATE TYPE citus.split_mode AS ENUM (
'blocking'
'blocking',
'non_blocking'
);
CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points(

View File

@ -3,7 +3,8 @@ DROP TYPE IF EXISTS citus.split_mode;
-- Three modes to be implemented: blocking, non_blocking and auto.
-- Currently, the default / only supported mode is blocking.
CREATE TYPE citus.split_mode AS ENUM (
'blocking'
'blocking',
'non_blocking'
);
CREATE OR REPLACE FUNCTION pg_catalog.citus_split_shard_by_split_points(

View File

@ -14,4 +14,4 @@ RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard'
IS 'Perform split copy for shard';

View File

@ -14,4 +14,4 @@ RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos citus.split_copy_info[])
IS 'Perform split copy for shard'
IS 'Perform split copy for shard';

View File

@ -29,5 +29,6 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_"
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"
#define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_"
#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_"
#endif /* MULTI_LOGICAL_REPLICATION_H_ */

View File

@ -15,7 +15,8 @@
/* Split Modes supported by Shard Split API */
typedef enum SplitMode
{
BLOCKING_SPLIT = 0
BLOCKING_SPLIT = 0,
NON_BLOCKING_SPLIT = 1
} SplitMode;
/*
@ -40,6 +41,8 @@ typedef struct ShardSplitInfo
int32 shardMinValue; /* min hash value */
int32 shardMaxValue; /* max hash value */
uint32_t nodeId; /* node where child shard is to be placed */
uint64 sourceShardId; /* parent shardId */
uint64 splitChildShardId; /* child shardId*/
char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */
} ShardSplitInfo;

View File

@ -110,6 +110,7 @@ extern List * PgDistTableMetadataSyncCommandList(void);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);
extern uint32 WorkerNodeHashCode(const void *key, Size keySize);
extern int WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize);
extern int NodeNamePortCompare(const char *workerLhsName, const char *workerRhsName,
int workerLhsPort, int workerRhsPort);

View File

@ -0,0 +1,145 @@
-- Negative test cases for citus_split_shard_by_split_points UDF.
CREATE SCHEMA citus_split_shard_by_split_points_negative;
SET search_path TO citus_split_shard_by_split_points_negative;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 60761300;
CREATE TABLE range_paritioned_table_to_split(rid bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('range_paritioned_table_to_split', 'rid', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Shards are not created automatically for range distributed table.
SELECT master_create_empty_shard('range_paritioned_table_to_split');
master_create_empty_shard
---------------------------------------------------------------------
60761300
(1 row)
SET citus.next_shard_id TO 49761300;
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
-- Shard1 | -2147483648 | -1073741825
-- Shard2 | -1073741824 | -1
-- Shard3 | 0 | 1073741823
-- Shard4 | 1073741824 | 2147483647
SELECT create_distributed_table('table_to_split','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
-- UDF fails for range partitioned tables.
SELECT citus_split_shard_by_split_points(
60761300,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard as operation is only supported for hash distributed tables.
-- UDF fails if number of placement node list does not exceed split points by one.
-- Example: One split point defines two way split (2 worker nodes needed).
SELECT citus_split_shard_by_split_points(
49761300,
-- 2 split points defined making it a 3 way split but we only specify 2 placement lists.
ARRAY['-1073741826', '-107374182'],
ARRAY[:worker_1_node, :worker_2_node]); -- 2 worker nodes.
ERROR: Number of worker node ids should be one greater split points. NodeId count is '2' and SplitPoint count is '2'.
-- UDF fails if split ranges specified are not within the shard id to split.
SELECT citus_split_shard_by_split_points(
49761300, -- Shard range is from (-2147483648, -1073741825)
ARRAY['0'], -- The range we specified is 0 which is not in the range.
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Split point 0 is outside the min/max range(-2147483648, -1073741825) for shard id 49761300.
-- UDF fails if split points are not strictly increasing.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50', '35'],
ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]);
ERROR: Invalid Split Points '50' followed by '35'. All split points should be strictly increasing.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50', '50'],
ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]);
ERROR: Invalid Split Points '50' followed by '50'. All split points should be strictly increasing.
-- UDF fails if nodeIds are < 1 or Invalid.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50'],
ARRAY[0, :worker_2_node]);
ERROR: Invalid Node Id '0'.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50'],
ARRAY[101, 201]);
ERROR: Invalid Node Id '101'.
-- UDF fails if split point specified is equal to the max value in the range.
-- Example: ShardId 81060002 range is from (-2147483648, -1073741825)
-- '-1073741825' as split point is invalid.
-- '-1073741826' is valid and will split to: (-2147483648, -1073741826) and (-1073741825, -1073741825)
SELECT citus_split_shard_by_split_points(
49761300, -- Shard range is from (-2147483648, -1073741825)
ARRAY['-1073741825'], -- Split point equals shard's max value.
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead.
-- UDF fails where source shard cannot be split further i.e min and max range is equal.
-- Create a Shard where range cannot be split further
SELECT isolate_tenant_to_new_shard('table_to_split', 1);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
49761305
(1 row)
SELECT citus_split_shard_by_split_points(
49761305,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard id "49761305" as min/max range are equal: ('-1905060026', '-1905060026').
-- Create distributed table with replication factor > 1
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 51261400;
CREATE TABLE table_to_split_replication_factor_2 (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split_replication_factor_2','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- UDF fails for replication factor > 1
SELECT citus_split_shard_by_split_points(
51261400,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Operation split not supported for shard as replication factor '2' is greater than 1.
-- Create distributed table with columnar type.
SET citus.next_shard_id TO 51271400;
CREATE TABLE table_to_split_columnar (id bigserial PRIMARY KEY, value char) USING columnar;
SELECT create_distributed_table('table_to_split_columnar','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- UDF fails for columnar table.
SELECT citus_split_shard_by_split_points(
51271400,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard as operation is not supported for Columnar tables.
-- Create distributed table which is partitioned.
SET citus.next_shard_id TO 51271900;
CREATE TABLE table_to_split_partitioned(id integer, dt date) PARTITION BY RANGE(dt);
SELECT create_distributed_table('table_to_split_partitioned','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- UDF fails for partitioned table.
SELECT citus_split_shard_by_split_points(
51271900,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: cannot split of 'table_to_split_partitioned', because it is a partitioned table
DETAIL: In colocation group of 'table_to_split_partitioned', a partitioned relation exists: 'table_to_split_partitioned'. Citus does not support split of partitioned tables.

View File

@ -5,7 +5,9 @@ test: multi_cluster_management
test: multi_test_catalog_views
test: tablespace
# Split tests go here.
test: worker_split_binary_copy_test
test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points
#test: worker_split_binary_copy_test
#test: worker_split_text_copy_test
#test: citus_split_shard_by_split_points_negative
test: citus_sameer
#test: citus_split_shard_by_split_points
#test: split_shard_replication_setup

View File

@ -0,0 +1,64 @@
-- Negative test cases for citus_split_shard_by_split_points UDF.
CREATE SCHEMA citus_split_shard_by_split_points_negative;
SET search_path TO citus_split_shard_by_split_points_negative;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 1;
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
-- Shard1 | -2147483648 | -1073741825
-- Shard2 | -1073741824 | -1
-- Shard3 | 0 | 1073741823
-- Shard4 | 1073741824 | 2147483647
SELECT create_distributed_table('table_to_split','id');
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT * FROM citus_shards;
SELECT * FROM pg_dist_shard;
SET client_min_messages TO LOG;
SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','p','')
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
-- UDF fails for range partitioned tables.
\c - - - :master_port
SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT citus_split_shard_by_split_points(
1,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node],
'non_blocking');
-- On worker2, we want child shard 2 and dummy shard 1 --
-- on worker1, we want child shard 3 and 1 and dummy shard 2 --
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
SELECT * FROM pg_publication_tables;

View File

@ -159,3 +159,4 @@ DROP PUBLICATION pub1;
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;