Allow generating placement IDs without using the sequence

pull/1750/head
Marco Slot 2017-10-31 18:51:49 +01:00
parent 89eb833375
commit d3b634b301
3 changed files with 47 additions and 4 deletions

View File

@ -69,6 +69,7 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
int NextShardId = 0; int NextShardId = 0;
int NextPlacementId = 0;
static List * GetTableReplicaIdentityCommand(Oid relationId); static List * GetTableReplicaIdentityCommand(Oid relationId);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
@ -294,12 +295,19 @@ GetNextShardId()
{ {
text *sequenceName = NULL; text *sequenceName = NULL;
Oid sequenceId = InvalidOid; Oid sequenceId = InvalidOid;
Datum sequenceIdDatum = NULL; Datum sequenceIdDatum = 0;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
Datum shardIdDatum = 0; Datum shardIdDatum = 0;
uint64 shardId = 0; uint64 shardId = 0;
/*
* In regression tests, we would like to generate shard IDs consistently
* even if the tests run in parallel. Instead of the sequence, we can use
* the next_shard_id GUC to specify which shard ID the current session should
* generate next. The GUC is automatically increased by 1 every time a new
* shard ID is generated.
*/
if (NextShardId > 0) if (NextShardId > 0)
{ {
shardId = NextShardId; shardId = NextShardId;
@ -364,14 +372,33 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
uint64 uint64
GetNextPlacementId(void) GetNextPlacementId(void)
{ {
text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); text *sequenceName = NULL;
Oid sequenceId = ResolveRelationId(sequenceName); Oid sequenceId = InvalidOid;
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Datum sequenceIdDatum = 0;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
Datum placementIdDatum = 0; Datum placementIdDatum = 0;
uint64 placementId = 0; uint64 placementId = 0;
/*
* In regression tests, we would like to generate placement IDs consistently
* even if the tests run in parallel. Instead of the sequence, we can use
* the next_placement_id GUC to specify which shard ID the current session
* should generate next. The GUC is automatically increased by 1 every time
* a new placement ID is generated.
*/
if (NextPlacementId > 0)
{
placementId = NextPlacementId;
NextPlacementId += 1;
return placementId;
}
sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
sequenceId = ResolveRelationId(sequenceName);
sequenceIdDatum = ObjectIdGetDatum(sequenceId);
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);

View File

@ -828,6 +828,21 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.next_placement_id",
gettext_noop("Set the next placement ID to use in placement creation."),
gettext_noop("Placement IDs are normally generated using a sequence. If "
"next_placement_id is set to a non-zero value, placement IDs will "
"instead be generated by incrementing from the value of "
"this GUC and this will be reflected in the GUC. This is "
"mainly useful to ensure consistent placement IDs when running "
"tests in parallel."),
&NextPlacementId,
0, 0, INT_MAX,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.max_task_string_size", "citus.max_task_string_size",
gettext_noop("Sets the maximum size (in bytes) of a worker task call string."), gettext_noop("Sets the maximum size (in bytes) of a worker task call string."),

View File

@ -93,6 +93,7 @@ extern int ShardReplicationFactor;
extern int ShardMaxSize; extern int ShardMaxSize;
extern int ShardPlacementPolicy; extern int ShardPlacementPolicy;
extern int NextShardId; extern int NextShardId;
extern int NextPlacementId;
extern bool IsCoordinator(void); extern bool IsCoordinator(void);