Stop using a sequence to generate unique job IDs

pull/1208/head
Marco Slot 2017-02-06 11:03:06 +01:00
parent be6dfaa596
commit dfd7d86948
5 changed files with 65 additions and 38 deletions

View File

@ -225,8 +225,7 @@ MultiRealTimeExecute(Job *job)
*/
if (taskFailed)
{
ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, job->jobId),
errdetail("Failure due to failed task %u", failedTaskId)));
ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId)));
}
else if (QueryCancelPending)
{

View File

@ -150,7 +150,6 @@ MultiTaskTrackerExecute(Job *job)
ListCell *taskAndExecutionCell = NULL;
uint32 taskTrackerCount = 0;
uint32 topLevelTaskCount = 0;
uint64 failedJobId = 0;
uint32 failedTaskId = 0;
bool allTasksCompleted = false;
bool taskFailed = false;
@ -279,7 +278,6 @@ MultiTaskTrackerExecute(Job *job)
taskFailed = TaskExecutionFailed(taskExecution);
if (taskFailed)
{
failedJobId = taskExecution->jobId;
failedTaskId = taskExecution->taskId;
break;
}
@ -336,7 +334,6 @@ MultiTaskTrackerExecute(Job *job)
taskTransmitFailed = TaskExecutionFailed(taskExecution);
if (taskTransmitFailed)
{
failedJobId = taskExecution->jobId;
failedTaskId = taskExecution->taskId;
break;
}
@ -415,13 +412,11 @@ MultiTaskTrackerExecute(Job *job)
*/
if (taskFailed)
{
ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, failedJobId),
errdetail("Failure due to failed task %u", failedTaskId)));
ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId)));
}
else if (clusterFailed)
{
ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, job->jobId),
errdetail("Too many task tracker failures")));
ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId)));
}
else if (QueryCancelPending)
{

View File

@ -22,6 +22,7 @@
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/skey.h"
#include "access/xlog.h"
#include "catalog/pg_am.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
@ -63,6 +64,7 @@
/* Policy to use when assigning tasks to worker nodes */
int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY;
bool EnableUniqueJobIds = true;
/*
@ -1693,41 +1695,61 @@ ChildNodeList(MultiNode *multiNode)
/*
* UniqueJobId allocates and returns a unique jobId for the job to be executed.
* This allocation occurs both in shared memory and in write ahead logs; writing
* to logs avoids the risk of having jobId collisions.
*
* Please note that the jobId sequence wraps around after 2^32 integers. This
* leaves the upper 32-bits to slave nodes and their jobs.
* The resulting job ID is built up as:
* <16-bit group ID><24-bit process ID><1-bit secondary flag><23-bit local counter>
*
* When citus.enable_unique_job_ids is off then only the local counter is
* included to get repeatable results.
*/
static uint64
UniqueJobId(void)
{
text *sequenceName = cstring_to_text(JOBID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Datum jobIdDatum = 0;
int64 jobId = 0;
int64 localizedJobId = 0;
int64 localGroupId = GetLocalGroupId();
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
static uint32 jobIdCounter = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
uint64 jobId = 0;
uint64 jobIdNumber = 0;
uint64 processId = 0;
uint64 localGroupId = 0;
/* generate new and unique jobId from sequence */
jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
jobId = DatumGetInt64(jobIdDatum);
jobIdCounter++;
if (EnableUniqueJobIds)
{
/*
* Add the local group id information to the jobId to
* prevent concurrent jobs on different groups to conflict.
*/
localizedJobId = jobId | (localGroupId << 32);
localGroupId = GetLocalGroupId() & 0xFF;
jobId = jobId | (localGroupId << 48);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
/*
* Add the current process ID to distinguish jobs by this
* backends from jobs started by other backends. Process
* IDs can have at most 24-bits on platforms supported by
* Citus.
*/
processId = MyProcPid & 0xFFFFFF;
jobId = jobId | (processId << 24);
return localizedJobId;
/*
* Add an extra bit for secondaries to distinguish their
* jobs from primaries.
*/
if (RecoveryInProgress())
{
jobId = jobId | (1 << 23);
}
}
/*
* Use the remaining 23 bits to distinguish jobs by the
* same backend.
*/
jobIdNumber = jobIdCounter & 0x1FFFFFF;
jobId = jobId | jobIdNumber;
return jobId;
}
@ -2144,8 +2166,7 @@ SubquerySqlTaskList(Job *job)
sqlTask->dependedTaskList = dataFetchTaskList;
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for job " UINT64_FORMAT
" and task %d", sqlTask->jobId, sqlTask->taskId),
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
errdetail("query string: \"%s\"", sqlQueryString->data)));
sqlTask->anchorShardId = AnchorShardId(fragmentCombination, anchorRangeTableId);
@ -2260,8 +2281,7 @@ SqlTaskList(Job *job)
sqlTask->dependedTaskList = dataFetchTaskList;
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for job " UINT64_FORMAT
" and task %d", sqlTask->jobId, sqlTask->taskId),
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
errdetail("query string: \"%s\"", sqlQueryString->data)));
sqlTask->anchorShardId = INVALID_SHARD_ID;

View File

@ -634,6 +634,18 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_unique_job_ids",
gettext_noop("Enables unique job IDs by prepending the local process ID and "
"group ID. This should usually be enabled, but can be disabled "
"for repeatable output in regression tests."),
NULL,
&EnableUniqueJobIds,
true,
PGC_USERSET,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");
}

View File

@ -28,7 +28,6 @@
/* Definitions local to the physical planner */
#define JOBID_SEQUENCE_NAME "pg_dist_jobid_seq"
#define ARRAY_OUT_FUNC_ID 751
#define NON_PRUNABLE_JOIN -1
#define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
@ -244,6 +243,8 @@ typedef struct OperatorCacheEntry
/* Config variable managed via guc.c */
extern int TaskAssignmentPolicy;
extern bool EnableUniqueJobIds;
/* Function declarations for building physical plans and constructing queries */
extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree);