Added pub/sub

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-13 13:16:01 +05:30
parent fd46d8011d
commit 51242a21d7
19 changed files with 809 additions and 232 deletions

View File

@ -34,6 +34,7 @@
#include "distributed/metadata_sync.h"
#include "distributed/multi_physical_planner.h"
#include "commands/dbcommands.h"
#include "distributed/shardsplit_logical_replication.h"
/* Function declarations */
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
@ -1186,47 +1187,10 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList)
{
StringInfo splitChildrenRows = makeStringInfo();
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL;
bool addComma = false;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
int64 sourceShardId = sourceShardIntervalToCopy->shardId;
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList)
{
if (addComma)
{
appendStringInfo(splitChildrenRows, ",");
}
StringInfo minValueString = makeStringInfo();
appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue));
StringInfo maxValueString = makeStringInfo();
appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows,
"ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info",
sourceShardId,
splitChildShardInterval->shardId,
quote_literal_cstr(minValueString->data),
quote_literal_cstr(maxValueString->data),
destinationWorkerNode->nodeId);
addComma = true;
}
}
StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data);
StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList);
int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
@ -1241,7 +1205,6 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result))
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
@ -1250,4 +1213,15 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList,
PQclear(result);
ForgetResults(sourceConnection);
}
}
/* Get replication slot information */
List * replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result);
List * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList(sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
destinationWorkerNodesList,
replicationSlotInfoList);
LogicallReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata);
}

View File

@ -19,29 +19,18 @@
#include "distributed/citus_safe_lib.h"
#include "distributed/listutils.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "distributed/shardsplit_logical_replication.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "commands/dbcommands.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(worker_split_shard_replication_setup);
static HTAB *ShardInfoHashMap = NULL;
/* key for NodeShardMappingEntry */
typedef struct NodeShardMappingKey
{
uint32_t nodeId;
Oid tableOwnerId;
} NodeShardMappingKey;
/* Entry for hash map */
typedef struct NodeShardMappingEntry
{
NodeShardMappingKey key;
List *shardSplitInfoList;
} NodeShardMappingEntry;
/* Function declarations */
static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
uint64 *sourceShardId,
@ -58,11 +47,8 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
HTAB *shardInfoHashMap);
static void SetupHashMapForShardInfo(void);
static uint32 NodeShardMappingHash(const void *key, Size keysize);
static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor);
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap);
StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList);
@ -117,7 +103,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
}
/* SetupMap */
SetupHashMapForShardInfo();
ShardInfoHashMap = SetupHashMapForShardInfo();
int shardSplitInfoCount = 0;
@ -157,7 +143,9 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
/* store handle in statically allocated shared memory*/
StoreShardSplitSharedMemoryHandle(dsmHandle);
CreatePublishersForSplitChildren(ShardInfoHashMap);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
ReturnReplicationSlotInfo(ShardInfoHashMap, tupleStore, tupleDescriptor);
PG_RETURN_VOID();
}
@ -169,7 +157,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS)
* is 'nodeId' and value is a list of ShardSplitInfo that are placed on
* this particular node.
*/
static void
HTAB *
SetupHashMapForShardInfo()
{
HASHCTL info;
@ -182,7 +170,8 @@ SetupHashMapForShardInfo()
int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE);
ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
HTAB * shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags);
return shardInfoMap;
}
@ -350,7 +339,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader,
* NodeShardMappingHash returns hash value by combining hash of node id
* and tableowner Id.
*/
static uint32
uint32
NodeShardMappingHash(const void *key, Size keysize)
{
NodeShardMappingKey *entry = (NodeShardMappingKey *) key;
@ -363,7 +352,7 @@ NodeShardMappingHash(const void *key, Size keysize)
/*
* Comparator function for hash keys
*/
static int
int
NodeShardMappingHashCompare(const void *left, const void *right, Size keysize)
{
NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left;
@ -530,4 +519,30 @@ ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo)
shardName = quote_qualified_identifier(schemaName, shardName);
return shardName;
}
static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
Datum values[3];
bool nulls[3];
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
values[0] = Int32GetDatum(entry->key.nodeId);
char * tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false);
values[1] = CStringGetTextDatum(tableOwnerName);
char * slotName = encode_replication_slot(entry->key.nodeId, entry->key.tableOwnerId);
values[2] = CStringGetTextDatum(slotName);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
}
}

View File

@ -133,8 +133,7 @@ static void CreateShardMoveSubscriptions(MultiConnection *connection,
static char * escape_param_str(const char *str);
static XLogRecPtr GetRemoteLogPosition(MultiConnection *connection);
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds);
static uint64 TotalRelationSizeForSubscription(MultiConnection *connection,
char *command);
static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection,
@ -146,8 +145,7 @@ static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection,
Bitmapset *tableOwnerIds);
static char * ShardMovePublicationName(Oid ownerId);
static char * ShardMoveSubscriptionName(Oid ownerId);
static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId);
static char * ShardSubscriptionName(Oid ownerId, char * operationPrefix);
static void AcquireLogicalReplicationLock(void);
static void DropAllShardMoveLeftovers(void);
static void DropAllShardMoveSubscriptions(MultiConnection *connection);
@ -1091,7 +1089,7 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds)
* If replication slot can not be dropped while dropping the subscriber, drop
* it here.
*/
DropShardMoveReplicationSlot(connection, ShardMoveSubscriptionName(ownerId));
DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardMovePublication(connection, ShardMovePublicationName(ownerId));
}
}
@ -1139,7 +1137,7 @@ ShardMovePublicationName(Oid ownerId)
/*
* ShardMoveSubscriptionName returns the name of the subscription for the given
* ShardSubscriptionName returns the name of the subscription for the given
* owner. If we're running the isolation tester the function also appends the
* process id normal subscription name.
*
@ -1150,15 +1148,15 @@ ShardMovePublicationName(Oid ownerId)
* coordinator is blocked by the blocked replication process.
*/
static char *
ShardMoveSubscriptionName(Oid ownerId)
ShardSubscriptionName(Oid ownerId, char * operationPrefix)
{
if (RunningUnderIsolationTest)
{
return psprintf("%s%i_%i", SHARD_MOVE_SUBSCRIPTION_PREFIX, ownerId, MyProcPid);
return psprintf("%s%i_%i", operationPrefix, ownerId, MyProcPid);
}
else
{
return psprintf("%s%i", SHARD_MOVE_SUBSCRIPTION_PREFIX, ownerId);
return psprintf("%s%i", operationPrefix, ownerId);
}
}
@ -1323,7 +1321,7 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds
int ownerId = -1;
while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0)
{
DropShardMoveSubscription(connection, ShardMoveSubscriptionName(ownerId));
DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX));
DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId));
}
}
@ -1497,7 +1495,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
appendStringInfo(createSubscriptionCommand,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (citus_use_authinfo=true, enabled=false)",
quote_identifier(ShardMoveSubscriptionName(ownerId)),
quote_identifier(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)),
quote_literal_cstr(conninfo->data),
quote_identifier(ShardMovePublicationName(ownerId)));
@ -1506,7 +1504,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
pfree(createSubscriptionCommand);
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s",
ShardMoveSubscriptionName(ownerId),
ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX),
ShardMoveSubscriptionRole(ownerId)
));
@ -1525,7 +1523,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName,
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s ENABLE",
ShardMoveSubscriptionName(ownerId)
ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)
));
}
}
@ -1631,7 +1629,7 @@ GetRemoteLSN(MultiConnection *connection, char *command)
* on the target node doesn't change within LogicalReplicationErrorTimeout. The
* function also reports its progress in every logicalReplicationProgressReportTimeout.
*/
static void
void
WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds)
{
@ -1848,7 +1846,7 @@ ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds)
first = false;
}
appendStringInfoString(subscriptionValueList,
quote_literal_cstr(ShardMoveSubscriptionName(ownerId)));
quote_literal_cstr(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)));
}
appendStringInfoString(subscriptionValueList, ")");
return subscriptionValueList->data;
@ -2063,11 +2061,68 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds)
"WHERE subname IN %s", subscriptionValueList));
}
/*
* ShardSplitPublicationName returns the name of the publication for the given
* table owner.
*/
static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId)
/*Refactor this for ShardMove too.*/
void
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName,
char * publicationName,
Oid ownerId)
{
return psprintf("%s%i_%u", SHARD_SPLIT_PUBLICATION_PREFIX, ownerId, nodeId);
StringInfo createSubscriptionCommand = makeStringInfo();
StringInfo conninfo = makeStringInfo();
/*
* The CREATE USER command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"CREATE USER %s SUPERUSER IN ROLE %s",
ShardMoveSubscriptionRole(ownerId),
GetUserNameFromId(ownerId, false)
)));
appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' "
"connect_timeout=20",
escape_param_str(sourceNodeName), sourceNodePort,
escape_param_str(userName), escape_param_str(databaseName));
appendStringInfo(createSubscriptionCommand,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (citus_use_authinfo=true, enabled=false)",
quote_identifier(ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)),
quote_literal_cstr(conninfo->data),
quote_identifier(publicationName));
ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data);
pfree(createSubscriptionCommand->data);
pfree(createSubscriptionCommand);
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s OWNER TO %s",
ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX),
ShardMoveSubscriptionRole(ownerId)
));
/*
* The ALTER ROLE command should not propagate, so we temporarily
* disable DDL propagation.
*/
SendCommandListToWorkerOutsideTransaction(
connection->hostname, connection->port, connection->user,
list_make2(
"SET LOCAL citus.enable_ddl_propagation TO OFF;",
psprintf(
"ALTER ROLE %s NOSUPERUSER",
ShardMoveSubscriptionRole(ownerId)
)));
ExecuteCriticalRemoteCommand(connection, psprintf(
"ALTER SUBSCRIPTION %s ENABLE",
ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)
));
}

View File

@ -0,0 +1,380 @@
/*-------------------------------------------------------------------------
*
* shardsplit_logical_replication.c
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_split.h"
#include "distributed/listutils.h"
#include "distributed/shardsplit_logical_replication.h"
#include "distributed/multi_logical_replication.h"
#include "utils/builtins.h"
#include "commands/dbcommands.h"
static HTAB *ShardInfoHashMapForPublications = NULL;
/* function declarations */
static void AddShardEntryInMap(Oid tableOwner, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval);
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIdList, List * replicationSlotInfoList);
static void
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid tableOwner);
static void
CreateShardSplitPublications(MultiConnection *sourceConnection, List * shardSplitPubSubMetadataList);
static void
CreateShardSplitSubscriptions(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList, WorkerNode * sourceWorkerNode, char * superUser, char * databaseName);
static void
WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList);
static char *
ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
/*used for debuggin. Remove later*/
void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata);
StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList)
{
StringInfo splitChildrenRows = makeStringInfo();
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL;
bool addComma = false;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
int64 sourceShardId = sourceShardIntervalToCopy->shardId;
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList)
{
if (addComma)
{
appendStringInfo(splitChildrenRows, ",");
}
StringInfo minValueString = makeStringInfo();
appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue));
StringInfo maxValueString = makeStringInfo();
appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue));
appendStringInfo(splitChildrenRows,
"ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info",
sourceShardId,
splitChildShardInterval->shardId,
quote_literal_cstr(minValueString->data),
quote_literal_cstr(maxValueString->data),
destinationWorkerNode->nodeId);
addComma = true;
}
}
StringInfo splitShardReplicationUDF = makeStringInfo();
appendStringInfo(splitShardReplicationUDF,
"SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data);
return splitShardReplicationUDF;
}
List * ParseReplicationSlotInfoFromResult(PGresult * result)
{
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ReplicationSlotInfo * replicationSlotInfo = (ReplicationSlotInfo *)palloc0(sizeof(ReplicationSlotInfo));
char * targeNodeIdString = PQgetvalue(result, rowIndex, 0);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* we're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1));
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
}
/*TODO(saawasek): size of this should not be NULL */
return replicationSlotInfoList;
}
List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList,
List *replicationSlotInfoList)
{
ShardInfoHashMapForPublications = SetupHashMapForShardInfo();
ShardInterval *sourceShardIntervalToCopy = NULL;
List *splitChildShardIntervalList = NULL;
forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList,
splitChildShardIntervalList, shardGroupSplitIntervalListList)
{
ShardInterval *splitChildShardInterval = NULL;
WorkerNode *destinationWorkerNode = NULL;
forboth_ptr(splitChildShardInterval, splitChildShardIntervalList,
destinationWorkerNode, destinationWorkerNodesList)
{
/* Table owner is same for both parent and child shard */
Oid tableOwnerId = TableOwnerOid(sourceShardIntervalToCopy->relationId);
uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId;
/* Add split child shard interval */
AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, splitChildShardInterval, true /*isChildShardInterval*/);
/* Add parent shard interval if not already added */
AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, sourceShardIntervalToCopy, false /*isChildShardInterval*/);
}
}
/* Populate pubsub meta data*/
HASH_SEQ_STATUS status;
hash_seq_init(&status, ShardInfoHashMapForPublications);
List * shardSplitPubSubMetadataList = NIL;
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, entry->shardSplitInfoList, replicationSlotInfoList);
shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, shardSplitPubSubMetadata);
}
return shardSplitPubSubMetadataList;
}
static void AddShardEntryInMap(Oid tableOwnerId, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval)
{
NodeShardMappingKey key;
key.nodeId = nodeId;
key.tableOwnerId = tableOwnerId;
bool found = false;
NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, HASH_ENTER,
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NIL;
}
if(isChildShardInterval)
{
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
return;
}
ShardInterval * existingShardInterval = NULL;
foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList)
{
if(existingShardInterval->shardId == shardInterval->shardId)
{
/* parent shard interval is already added hence return */
return;
}
}
/* Add parent shard Interval */
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
}
ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIntervalList, List * replicationSlotInfoList)
{
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = palloc0(sizeof(ShardSplitPubSubMetadata));
shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList;
shardSplitPubSubMetadata->tableOwnerId = tableOwnerId;
char * tableOwnerName = GetUserNameFromId(tableOwnerId, false);
ReplicationSlotInfo * replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
if(nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{
shardSplitPubSubMetadata->slotInfo = replicationSlotInfo;
break;
}
}
return shardSplitPubSubMetadata;
}
void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
/* Get source node connection */
MultiConnection *sourceConnection =
GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, sourceWorkerNode->workerPort,
superUser, databaseName);
ClaimConnectionExclusively(sourceConnection);
List * targetNodeConnectionList = NIL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
WorkerNode * targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false);
MultiConnection *targetConnection =
GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, targetWorkerNode->workerPort,
superUser, databaseName);
ClaimConnectionExclusively(targetConnection);
targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection);
}
/* create publications */
CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList);
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitPubSubMetadataList,
sourceWorkerNode,
superUser,
databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, shardSplitPubSubMetadataList);
}
void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata)
{
printf("sameer: ShardSplitPubSbuMetadata\n");
ReplicationSlotInfo * replicationInfo = shardSplitMetadata->slotInfo;
List * shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription;
printf("shardIds: ");
ShardInterval * shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
printf("%ld ", shardInterval->shardId);
}
printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(shardSplitMetadata->tableOwnerId, false));
printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, replicationInfo->targetNodeId, replicationInfo->tableOwnerName);
}
static void
CreateShardSplitPublications(MultiConnection *sourceConnection, List *shardSplitPubSubMetadataList)
{
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSplitPublicationForNode(sourceConnection,
shardSplitPubSubMetadata->shardIntervalListForSubscription,
publicationForNodeId,
tableOwnerId);
}
}
static void
CreateShardSplitSubscriptions(List * targetNodeConnectionList,
List * shardSplitPubSubMetadataList,
WorkerNode * sourceWorkerNode,
char * superUser,
char * databaseName)
{
MultiConnection * targetConnection = NULL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSubscription(targetConnection,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId),
ownerId);
}
}
static void
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId)
{
StringInfo createPublicationCommand = makeStringInfo();
bool prefixWithComma = false;
appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ",
ShardSplitPublicationName(publicationForTargetNodeId, ownerId));
ShardInterval *shard = NULL;
foreach_ptr(shard, shardList)
{
char *shardName = ConstructQualifiedShardName(shard);
if (prefixWithComma)
{
appendStringInfoString(createPublicationCommand, ",");
}
appendStringInfoString(createPublicationCommand, shardName);
prefixWithComma = true;
}
ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
pfree(createPublicationCommand->data);
pfree(createPublicationCommand);
}
static char *
ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
{
return psprintf("%s%u_%u", SHARD_SPLIT_PUBLICATION_PREFIX, nodeId, ownerId);
}
static void
WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList)
{
MultiConnection * targetConnection = NULL;
ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId);
WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds);
}
}

View File

@ -5,9 +5,11 @@ CREATE TYPE citus.split_shard_info AS (
shard_max_value text,
node_id integer);
CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[])
RETURNS void
RETURNS setof citus.replication_slot_info
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])

View File

@ -5,12 +5,15 @@ CREATE TYPE citus.split_shard_info AS (
shard_max_value text,
node_id integer);
CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup(
splitShardInfo citus.split_shard_info[])
RETURNS void
RETURNS setof citus.replication_slot_info
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[])
IS 'Replication setup for splitting a shard';
IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC;

View File

@ -26,9 +26,18 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName,
int sourceNodePort, char *targetNodeName,
int targetNodePort);
extern void
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName,
char * publicationName,
Oid ownerId);
extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,
Bitmapset *tableOwnerIds);
#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_"
#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_"
#define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_"
#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_"
#define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_"
#endif /* MULTI_LOGICAL_REPLICATION_H_ */

View File

@ -0,0 +1,57 @@
/*-------------------------------------------------------------------------
*
* shardsplit_logical_replication.h
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef SHARDSPLIT_LOGICAL_REPLICATION_H
#define SHARDSPLIT_LOGICAL_REPLICATION_H
typedef struct ReplicationSlotInfo
{
uint32 targetNodeId;
char * tableOwnerName;
char * slotName;
} ReplicationSlotInfo;
typedef struct ShardSplitPubSubMetadata
{
List * shardIntervalListForSubscription;
Oid tableOwnerId;
ReplicationSlotInfo *slotInfo;
} ShardSplitPubSubMetadata;
/* key for NodeShardMappingEntry */
typedef struct NodeShardMappingKey
{
uint32_t nodeId;
Oid tableOwnerId;
} NodeShardMappingKey;
/* Entry for hash map */
typedef struct NodeShardMappingEntry
{
NodeShardMappingKey key;
List *shardSplitInfoList;
} NodeShardMappingEntry;
extern uint32 NodeShardMappingHash(const void *key, Size keysize);
extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize);
HTAB * SetupHashMapForShardInfo(void);
List * ParseReplicationSlotInfoFromResult(PGresult * result);
extern StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList,
List *replicationSlotInfoList);
extern void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList);
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */

View File

@ -1,25 +1,11 @@
-- Negative test cases for citus_split_shard_by_split_points UDF.
CREATE SCHEMA citus_split_shard_by_split_points_negative;
SET search_path TO citus_split_shard_by_split_points_negative;
SET citus.shard_count TO 4;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 60761300;
CREATE TABLE range_paritioned_table_to_split(rid bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('range_paritioned_table_to_split', 'rid', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Shards are not created automatically for range distributed table.
SELECT master_create_empty_shard('range_paritioned_table_to_split');
master_create_empty_shard
---------------------------------------------------------------------
60761300
(1 row)
SET citus.next_shard_id TO 49761300;
SET citus.next_shard_id TO 1;
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
-- Shard1 | -2147483648 | -1073741825
-- Shard2 | -1073741824 | -1
-- Shard3 | 0 | 1073741823
@ -30,116 +16,210 @@ SELECT create_distributed_table('table_to_split','id');
(1 row)
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT * FROM citus_shards;
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9997 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8888 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8887 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9995 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9992 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 57637 | 0
table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9998 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8888 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8887 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9995 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9992 | 0
(14 rows)
SELECT * FROM pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
table_to_split | 1 | t | -2147483648 | 2147483647
table_second | 2 | t | -2147483648 | 2147483647
(2 rows)
SET client_min_messages TO LOG;
SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','p','')
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema",
c.relname AS "Name",
pg_get_userbyid(c.relowner) AS "Owner"
FROM (pg_class c
LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid))
ORDER BY n.nspname, c.relname;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema",
c.relname AS "Name",
pg_get_userbyid(c.relowner) AS "Owner"
FROM (pg_class c
LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid))
ORDER BY n.nspname, c.relname;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
-- UDF fails for range partitioned tables.
\c - - - :master_port
SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT citus_split_shard_by_split_points(
60761300,
1,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard as operation is only supported for hash distributed tables.
-- UDF fails if number of placement node list does not exceed split points by one.
-- Example: One split point defines two way split (2 worker nodes needed).
SELECT citus_split_shard_by_split_points(
49761300,
-- 2 split points defined making it a 3 way split but we only specify 2 placement lists.
ARRAY['-1073741826', '-107374182'],
ARRAY[:worker_1_node, :worker_2_node]); -- 2 worker nodes.
ERROR: Number of worker node ids should be one greater split points. NodeId count is '2' and SplitPoint count is '2'.
-- UDF fails if split ranges specified are not within the shard id to split.
SELECT citus_split_shard_by_split_points(
49761300, -- Shard range is from (-2147483648, -1073741825)
ARRAY['0'], -- The range we specified is 0 which is not in the range.
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Split point 0 is outside the min/max range(-2147483648, -1073741825) for shard id 49761300.
-- UDF fails if split points are not strictly increasing.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50', '35'],
ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]);
ERROR: Invalid Split Points '50' followed by '35'. All split points should be strictly increasing.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50', '50'],
ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]);
ERROR: Invalid Split Points '50' followed by '50'. All split points should be strictly increasing.
-- UDF fails if nodeIds are < 1 or Invalid.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50'],
ARRAY[0, :worker_2_node]);
ERROR: Invalid Node Id '0'.
SELECT citus_split_shard_by_split_points(
49761302,
ARRAY['50'],
ARRAY[101, 201]);
ERROR: Invalid Node Id '101'.
-- UDF fails if split point specified is equal to the max value in the range.
-- Example: ShardId 81060002 range is from (-2147483648, -1073741825)
-- '-1073741825' as split point is invalid.
-- '-1073741826' is valid and will split to: (-2147483648, -1073741826) and (-1073741825, -1073741825)
SELECT citus_split_shard_by_split_points(
49761300, -- Shard range is from (-2147483648, -1073741825)
ARRAY['-1073741825'], -- Split point equals shard's max value.
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead.
-- UDF fails where source shard cannot be split further i.e min and max range is equal.
-- Create a Shard where range cannot be split further
SELECT isolate_tenant_to_new_shard('table_to_split', 1);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
49761305
(1 row)
SELECT citus_split_shard_by_split_points(
49761305,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard id "49761305" as min/max range are equal: ('-1905060026', '-1905060026').
-- Create distributed table with replication factor > 1
SET citus.shard_replication_factor TO 2;
SET citus.next_shard_id TO 51261400;
CREATE TABLE table_to_split_replication_factor_2 (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split_replication_factor_2','id');
create_distributed_table
ARRAY[:worker_1_node, :worker_2_node],
'non_blocking');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info])
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
-- UDF fails for replication factor > 1
SELECT citus_split_shard_by_split_points(
51261400,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Operation split not supported for shard as replication factor '2' is greater than 1.
-- Create distributed table with columnar type.
SET citus.next_shard_id TO 51271400;
CREATE TABLE table_to_split_columnar (id bigserial PRIMARY KEY, value char) USING columnar;
SELECT create_distributed_table('table_to_split_columnar','id');
create_distributed_table
-- On worker2, we want child shard xxxxx and dummy shard xxxxx --
-- on worker1, we want child shard xxxxx and 1 and dummy shard xxxxx --
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_second | postgres
citus_split_shard_by_split_points_negative | table_second_103 | postgres
citus_split_shard_by_split_points_negative | table_second_2 | postgres
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(6 rows)
(1 row)
-- UDF fails for columnar table.
SELECT citus_split_shard_by_split_points(
51271400,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: Cannot split shard as operation is not supported for Columnar tables.
-- Create distributed table which is partitioned.
SET citus.next_shard_id TO 51271900;
CREATE TABLE table_to_split_partitioned(id integer, dt date) PARTITION BY RANGE(dt);
SELECT create_distributed_table('table_to_split_partitioned','id');
create_distributed_table
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_second | postgres
citus_split_shard_by_split_points_negative | table_second_102 | postgres
citus_split_shard_by_split_points_negative | table_second_103 | postgres
citus_split_shard_by_split_points_negative | table_second_2 | postgres
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_1 | postgres
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(8 rows)
(1 row)
SELECT * FROM pg_publication_tables;
pubname | schemaname | tablename
---------------------------------------------------------------------
(0 rows)
-- UDF fails for partitioned table.
SELECT citus_split_shard_by_split_points(
51271900,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node]);
ERROR: cannot split of 'table_to_split_partitioned', because it is a partitioned table
DETAIL: In colocation group of 'table_to_split_partitioned', a partitioned relation exists: 'table_to_split_partitioned'. Citus does not support split of partitioned tables.

View File

@ -64,16 +64,16 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);
SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
worker_split_shard_replication_setup
count
---------------------------------------------------------------------
2
(1 row)
SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset

View File

@ -62,13 +62,13 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
worker_split_shard_replication_setup
count
---------------------------------------------------------------------
1
(1 row)
SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset

View File

@ -10,13 +10,13 @@ SET client_min_messages TO ERROR;
-- Create publication at worker1
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]);
worker_split_shard_replication_setup
count
---------------------------------------------------------------------
1
(1 row)
SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset

View File

@ -8,14 +8,14 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);
WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow.
worker_split_shard_replication_setup
count
---------------------------------------------------------------------
2
(1 row)
SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset

View File

@ -5,12 +5,15 @@ test: multi_cluster_management
test: multi_test_catalog_views
test: tablespace
# Helpers for foreign key catalogs.
test: foreign_key_to_reference_table
#test: foreign_key_to_reference_table
# Split tests go here.
#test: citus_sameer
test: citus_sameer
#test: split_shard_replication_setup
test: worker_split_copy_test
test: worker_split_binary_copy_test
test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points
#test: split_shard_replication_setup_remote_local
#test: split_shard_replication_setup_local
#test: split_shard_replication_colocated_setup
#test: worker_split_copy_test
#test: worker_split_binary_copy_test
#test: worker_split_text_copy_test
#test: citus_split_shard_by_split_points_negative
#test: citus_split_shard_by_split_points

View File

@ -15,7 +15,6 @@ CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split','id');
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
@ -49,7 +48,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points(
1,
ARRAY['-1073741826'],
ARRAY[:worker_1_node, :worker_2_node],
ARRAY[:worker_2_node, :worker_2_node],
'non_blocking');
-- On worker2, we want child shard 2 and dummy shard 1 --
-- on worker1, we want child shard 3 and 1 and dummy shard 2 --

View File

@ -66,7 +66,7 @@ SET search_path TO split_shard_replication_setup_schema;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info,
ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,

View File

@ -66,7 +66,7 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);

View File

@ -13,7 +13,7 @@ SET client_min_messages TO ERROR;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info
]);

View File

@ -11,7 +11,7 @@ SET search_path TO split_shard_replication_setup_schema;
-- Create publication at worker1
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
SELECT worker_split_shard_replication_setup(ARRAY[
SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[
ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info,
ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info
]);