mirror of https://github.com/citusdata/citus.git
Add local function GetNextShardId()
parent
40bdafa8d1
commit
8334d853c0
|
@ -271,8 +271,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
||||||
{
|
{
|
||||||
ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell);
|
ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell);
|
||||||
uint64 sourceShardId = sourceShardInterval->shardId;
|
uint64 sourceShardId = sourceShardInterval->shardId;
|
||||||
|
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
|
||||||
uint64 newShardId = GetNextShardId();
|
uint64 newShardId = GetNextShardId();
|
||||||
ListCell *sourceShardPlacementCell = NULL;
|
|
||||||
|
|
||||||
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
|
||||||
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
|
||||||
|
|
|
@ -231,12 +231,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_get_new_shardid allocates and returns a unique shardId for the shard
|
* master_get_new_shardid is a user facing wrapper function around GetNextShardId()
|
||||||
* to be created. This allocation occurs both in shared memory and in write
|
* which allocates and returns a unique shardId for the shard to be created.
|
||||||
* ahead logs; writing to logs avoids the risk of having shardId collisions.
|
|
||||||
*
|
|
||||||
* Please note that the caller is still responsible for finalizing shard data
|
|
||||||
* and the shardId with the master node.
|
|
||||||
*
|
*
|
||||||
* NB: This can be called by any user; for now we have decided that that's
|
* NB: This can be called by any user; for now we have decided that that's
|
||||||
* ok. We might want to restrict this to users part of a specific role or such
|
* ok. We might want to restrict this to users part of a specific role or such
|
||||||
|
@ -244,6 +240,24 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
master_get_new_shardid(PG_FUNCTION_ARGS)
|
master_get_new_shardid(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
uint64 shardId = GetNextShardId();
|
||||||
|
Datum shardIdDatum = Int64GetDatum(shardId);
|
||||||
|
|
||||||
|
PG_RETURN_DATUM(shardIdDatum);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNextShardId allocates and returns a unique shardId for the shard to be
|
||||||
|
* created. This allocation occurs both in shared memory and in write ahead
|
||||||
|
* logs; writing to logs avoids the risk of having shardId collisions.
|
||||||
|
*
|
||||||
|
* Please note that the caller is still responsible for finalizing shard data
|
||||||
|
* and the shardId with the master node.
|
||||||
|
*/
|
||||||
|
uint64
|
||||||
|
GetNextShardId()
|
||||||
{
|
{
|
||||||
text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
|
text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
|
||||||
Oid sequenceId = ResolveRelationId(sequenceName);
|
Oid sequenceId = ResolveRelationId(sequenceName);
|
||||||
|
@ -251,6 +265,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
Datum shardIdDatum = 0;
|
Datum shardIdDatum = 0;
|
||||||
|
uint64 shardId = 0;
|
||||||
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
@ -260,7 +275,9 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
PG_RETURN_DATUM(shardIdDatum);
|
shardId = DatumGetInt64(shardIdDatum);
|
||||||
|
|
||||||
|
return shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -64,8 +64,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
text *relationNameText = PG_GETARG_TEXT_P(0);
|
text *relationNameText = PG_GETARG_TEXT_P(0);
|
||||||
char *relationName = text_to_cstring(relationNameText);
|
char *relationName = text_to_cstring(relationNameText);
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = WorkerNodeList();
|
||||||
Datum shardIdDatum = 0;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
int64 shardId = INVALID_SHARD_ID;
|
|
||||||
List *ddlEventList = NULL;
|
List *ddlEventList = NULL;
|
||||||
uint32 attemptableNodeCount = 0;
|
uint32 attemptableNodeCount = 0;
|
||||||
uint32 liveNodeCount = 0;
|
uint32 liveNodeCount = 0;
|
||||||
|
@ -114,8 +113,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* generate new and unique shardId from sequence */
|
/* generate new and unique shardId from sequence */
|
||||||
shardIdDatum = master_get_new_shardid(NULL);
|
shardId = GetNextShardId();
|
||||||
shardId = DatumGetInt64(shardIdDatum);
|
|
||||||
|
|
||||||
/* get table DDL commands to replay on the worker node */
|
/* get table DDL commands to replay on the worker node */
|
||||||
ddlEventList = GetTableDDLEvents(relationId);
|
ddlEventList = GetTableDDLEvents(relationId);
|
||||||
|
|
|
@ -224,8 +224,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
|
||||||
Oid distributedTableId = PG_GETARG_OID(0);
|
Oid distributedTableId = PG_GETARG_OID(0);
|
||||||
StringInfo minInfo = makeStringInfo();
|
StringInfo minInfo = makeStringInfo();
|
||||||
StringInfo maxInfo = makeStringInfo();
|
StringInfo maxInfo = makeStringInfo();
|
||||||
Datum newShardIdDatum = master_get_new_shardid(NULL);
|
uint64 newShardId = GetNextShardId();
|
||||||
int64 newShardId = DatumGetInt64(newShardIdDatum);
|
|
||||||
text *maxInfoText = NULL;
|
text *maxInfoText = NULL;
|
||||||
text *minInfoText = NULL;
|
text *minInfoText = NULL;
|
||||||
|
|
||||||
|
|
|
@ -90,6 +90,7 @@ extern int ShardPlacementPolicy;
|
||||||
|
|
||||||
/* Function declarations local to the distributed module */
|
/* Function declarations local to the distributed module */
|
||||||
extern bool CStoreTable(Oid relationId);
|
extern bool CStoreTable(Oid relationId);
|
||||||
|
extern uint64 GetNextShardId(void);
|
||||||
extern Oid ResolveRelationId(text *relationName);
|
extern Oid ResolveRelationId(text *relationName);
|
||||||
extern List * GetTableDDLEvents(Oid relationId);
|
extern List * GetTableDDLEvents(Oid relationId);
|
||||||
extern char ShardStorageType(Oid relationId);
|
extern char ShardStorageType(Oid relationId);
|
||||||
|
|
Loading…
Reference in New Issue