mirror of https://github.com/citusdata/citus.git
parent
60ee33cfcc
commit
5a711a9176
|
@ -43,11 +43,11 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit,
|
||||||
int32 maxValue,
|
int32 maxValue,
|
||||||
int32 nodeId);
|
int32 nodeId);
|
||||||
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
|
static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo);
|
||||||
static void * PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
static void PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
||||||
HTAB *shardInfoHashMap,
|
HTAB *shardInfoHashMap,
|
||||||
dsm_handle dsmHandle,
|
dsm_handle dsmHandle,
|
||||||
int shardSplitInfoCount);
|
int shardSplitInfoCount);
|
||||||
static void SetupHashMapForShardInfo();
|
static void SetupHashMapForShardInfo(void);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* split_shard_replication_setup UDF creates in-memory data structures
|
* split_shard_replication_setup UDF creates in-memory data structures
|
||||||
|
@ -171,7 +171,7 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
|
||||||
int32 *nodeId)
|
int32 *nodeId)
|
||||||
{
|
{
|
||||||
Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject);
|
Oid elemtypeId = ARR_ELEMTYPE(shardInfoArrayObject);
|
||||||
int elemtypeLength = 0;
|
int16 elemtypeLength = 0;
|
||||||
bool elemtypeByValue = false;
|
bool elemtypeByValue = false;
|
||||||
char elemtypeAlignment = 0;
|
char elemtypeAlignment = 0;
|
||||||
get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue,
|
get_typlenbyvalalign(elemtypeId, &elemtypeLength, &elemtypeByValue,
|
||||||
|
@ -296,8 +296,6 @@ ParseShardSplitInfo(ArrayType *shardInfoArrayObject,
|
||||||
}
|
}
|
||||||
|
|
||||||
*nodeId = DatumGetInt32(nodeIdDat);
|
*nodeId = DatumGetInt32(nodeIdDat);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -385,7 +383,7 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit,
|
||||||
* AddShardSplitInfoEntryForNodeInMap function add's ShardSplitInfo entry
|
* AddShardSplitInfoEntryForNodeInMap function add's ShardSplitInfo entry
|
||||||
* to the hash map. The key is nodeId on which the new shard is to be placed.
|
* to the hash map. The key is nodeId on which the new shard is to be placed.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
||||||
{
|
{
|
||||||
uint32_t keyNodeId = shardSplitInfo->nodeId;
|
uint32_t keyNodeId = shardSplitInfo->nodeId;
|
||||||
|
@ -402,8 +400,6 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
||||||
|
|
||||||
nodeMappingEntry->shardSplitInfoList =
|
nodeMappingEntry->shardSplitInfoList =
|
||||||
lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo);
|
lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -420,7 +416,7 @@ AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo)
|
||||||
*
|
*
|
||||||
* dsmHandle - Shared memory segment handle
|
* dsmHandle - Shared memory segment handle
|
||||||
*/
|
*/
|
||||||
void *
|
static void
|
||||||
PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
||||||
HTAB *shardInfoHashMap,
|
HTAB *shardInfoHashMap,
|
||||||
dsm_handle dsmHandle,
|
dsm_handle dsmHandle,
|
||||||
|
@ -455,6 +451,4 @@ PopulateShardSplitInfoInSM(ShardSplitInfo *shardSplitInfoArray,
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,10 @@ static int32_t GetHashValueForIncomingTuple(Relation sourceShardRelation,
|
||||||
HeapTuple tuple,
|
HeapTuple tuple,
|
||||||
bool *shouldHandleUpdate);
|
bool *shouldHandleUpdate);
|
||||||
|
|
||||||
|
static Oid FindTargetRelationOid(Relation sourceShardRelation,
|
||||||
|
HeapTuple tuple,
|
||||||
|
char *currentSlotName);
|
||||||
|
|
||||||
void
|
void
|
||||||
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
||||||
{
|
{
|
||||||
|
@ -132,7 +136,7 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
|
||||||
* tuple - changed tuple.
|
* tuple - changed tuple.
|
||||||
* currentSlotName - Name of replication slot that is processing this update.
|
* currentSlotName - Name of replication slot that is processing this update.
|
||||||
*/
|
*/
|
||||||
Oid
|
static Oid
|
||||||
FindTargetRelationOid(Relation sourceShardRelation,
|
FindTargetRelationOid(Relation sourceShardRelation,
|
||||||
HeapTuple tuple,
|
HeapTuple tuple,
|
||||||
char *currentSlotName)
|
char *currentSlotName)
|
||||||
|
@ -185,21 +189,17 @@ FindTargetRelationOid(Relation sourceShardRelation,
|
||||||
bool
|
bool
|
||||||
ShouldCommitBeApplied(Relation sourceShardRelation)
|
ShouldCommitBeApplied(Relation sourceShardRelation)
|
||||||
{
|
{
|
||||||
ShardSplitInfo *shardSplitInfo = NULL;
|
|
||||||
int partitionColumnIndex = -1;
|
|
||||||
Oid distributedTableOid = InvalidOid;
|
|
||||||
|
|
||||||
Oid sourceShardOid = sourceShardRelation->rd_id;
|
Oid sourceShardOid = sourceShardRelation->rd_id;
|
||||||
for (int i = 0; i < shardSplitInfoArraySize; i++)
|
for (int i = 0; i < shardSplitInfoArraySize; i++)
|
||||||
{
|
{
|
||||||
/* skip the commit when destination is equal to the source */
|
/* skip the commit when destination is equal to the source */
|
||||||
shardSplitInfo = &shardSplitInfoArray[i];
|
ShardSplitInfo *shardSplitInfo = &shardSplitInfoArray[i];
|
||||||
if (shardSplitInfo->splitChildShardOid == sourceShardOid)
|
if (shardSplitInfo->splitChildShardOid == sourceShardOid)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ SELECT create_distributed_table('slotName_table','id');
|
||||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||||
-- 6. Setup Pub/Sub
|
-- 6. Setup Pub/Sub
|
||||||
-- 7. Insert into table_to_split_1 at source worker1
|
-- 7. Insert into table_to_split_1 at source worker1
|
||||||
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
|
-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
||||||
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
|
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
|
||||||
|
|
|
@ -31,7 +31,7 @@ SELECT create_distributed_table('slotName_table','id');
|
||||||
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
-- 5. Create Replication slot with 'decoding_plugin_for_shard_split'
|
||||||
-- 6. Setup Pub/Sub
|
-- 6. Setup Pub/Sub
|
||||||
-- 7. Insert into table_to_split_1 at source worker1
|
-- 7. Insert into table_to_split_1 at source worker1
|
||||||
-- 8. Expect the results in either table_to_split_2 or table_to_split_2 at worker2
|
-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
|
||||||
|
|
Loading…
Reference in New Issue