diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index e6b1668c4..911408c96 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -225,8 +225,7 @@ MultiRealTimeExecute(Job *job) */ if (taskFailed) { - ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, job->jobId), - errdetail("Failure due to failed task %u", failedTaskId))); + ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId))); } else if (QueryCancelPending) { diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 44411ae16..307a82f1a 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -150,7 +150,6 @@ MultiTaskTrackerExecute(Job *job) ListCell *taskAndExecutionCell = NULL; uint32 taskTrackerCount = 0; uint32 topLevelTaskCount = 0; - uint64 failedJobId = 0; uint32 failedTaskId = 0; bool allTasksCompleted = false; bool taskFailed = false; @@ -279,7 +278,6 @@ MultiTaskTrackerExecute(Job *job) taskFailed = TaskExecutionFailed(taskExecution); if (taskFailed) { - failedJobId = taskExecution->jobId; failedTaskId = taskExecution->taskId; break; } @@ -336,7 +334,6 @@ MultiTaskTrackerExecute(Job *job) taskTransmitFailed = TaskExecutionFailed(taskExecution); if (taskTransmitFailed) { - failedJobId = taskExecution->jobId; failedTaskId = taskExecution->taskId; break; } @@ -415,13 +412,11 @@ MultiTaskTrackerExecute(Job *job) */ if (taskFailed) { - ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, failedJobId), - errdetail("Failure due to failed task %u", failedTaskId))); + ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId))); } else if (clusterFailed) { - ereport(ERROR, (errmsg("failed to execute job " UINT64_FORMAT, job->jobId), - errdetail("Too many task tracker failures"))); + ereport(ERROR, (errmsg("failed to execute task %u", failedTaskId))); } else if (QueryCancelPending) { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 47aa1e375..b31d3f766 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -22,6 +22,7 @@ #include "access/heapam.h" #include "access/nbtree.h" #include "access/skey.h" +#include "access/xlog.h" #include "catalog/pg_am.h" #include "catalog/pg_operator.h" #include "catalog/pg_type.h" @@ -63,6 +64,7 @@ /* Policy to use when assigning tasks to worker nodes */ int TaskAssignmentPolicy = TASK_ASSIGNMENT_GREEDY; +bool EnableUniqueJobIds = true; /* @@ -1693,41 +1695,61 @@ ChildNodeList(MultiNode *multiNode) /* * UniqueJobId allocates and returns a unique jobId for the job to be executed. - * This allocation occurs both in shared memory and in write ahead logs; writing - * to logs avoids the risk of having jobId collisions. * - * Please note that the jobId sequence wraps around after 2^32 integers. This - * leaves the upper 32-bits to slave nodes and their jobs. + * The resulting job ID is built up as: + * <16-bit group ID><24-bit process ID><1-bit secondary flag><23-bit local counter> + * + * When citus.enable_unique_job_ids is off then only the local counter is + * included to get repeatable results. */ static uint64 UniqueJobId(void) { - text *sequenceName = cstring_to_text(JOBID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); - Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); - Datum jobIdDatum = 0; - int64 jobId = 0; - int64 localizedJobId = 0; - int64 localGroupId = GetLocalGroupId(); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; + static uint32 jobIdCounter = 0; - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + uint64 jobId = 0; + uint64 jobIdNumber = 0; + uint64 processId = 0; + uint64 localGroupId = 0; - /* generate new and unique jobId from sequence */ - jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - jobId = DatumGetInt64(jobIdDatum); + jobIdCounter++; + + if (EnableUniqueJobIds) + { + /* + * Add the local group id information to the jobId to + * prevent concurrent jobs on different groups to conflict. + */ + localGroupId = GetLocalGroupId() & 0xFF; + jobId = jobId | (localGroupId << 48); + + /* + * Add the current process ID to distinguish jobs by this + * backends from jobs started by other backends. Process + * IDs can have at most 24-bits on platforms supported by + * Citus. + */ + processId = MyProcPid & 0xFFFFFF; + jobId = jobId | (processId << 24); + + /* + * Add an extra bit for secondaries to distinguish their + * jobs from primaries. + */ + if (RecoveryInProgress()) + { + jobId = jobId | (1 << 23); + } + } /* - * Add the local group id information to the jobId to - * prevent concurrent jobs on different groups to conflict. + * Use the remaining 23 bits to distinguish jobs by the + * same backend. */ - localizedJobId = jobId | (localGroupId << 32); + jobIdNumber = jobIdCounter & 0x1FFFFFF; + jobId = jobId | jobIdNumber; - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - - return localizedJobId; + return jobId; } @@ -2144,8 +2166,7 @@ SubquerySqlTaskList(Job *job) sqlTask->dependedTaskList = dataFetchTaskList; /* log the query string we generated */ - ereport(DEBUG4, (errmsg("generated sql query for job " UINT64_FORMAT - " and task %d", sqlTask->jobId, sqlTask->taskId), + ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId), errdetail("query string: \"%s\"", sqlQueryString->data))); sqlTask->anchorShardId = AnchorShardId(fragmentCombination, anchorRangeTableId); @@ -2260,8 +2281,7 @@ SqlTaskList(Job *job) sqlTask->dependedTaskList = dataFetchTaskList; /* log the query string we generated */ - ereport(DEBUG4, (errmsg("generated sql query for job " UINT64_FORMAT - " and task %d", sqlTask->jobId, sqlTask->taskId), + ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId), errdetail("query string: \"%s\"", sqlQueryString->data))); sqlTask->anchorShardId = INVALID_SHARD_ID; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 115f39dcc..5a924bfe8 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -634,6 +634,18 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_unique_job_ids", + gettext_noop("Enables unique job IDs by prepending the local process ID and " + "group ID. This should usually be enabled, but can be disabled " + "for repeatable output in regression tests."), + NULL, + &EnableUniqueJobIds, + true, + PGC_USERSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 0cf340899..96d74561c 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -28,7 +28,6 @@ /* Definitions local to the physical planner */ -#define JOBID_SEQUENCE_NAME "pg_dist_jobid_seq" #define ARRAY_OUT_FUNC_ID 751 #define NON_PRUNABLE_JOIN -1 #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber @@ -244,6 +243,8 @@ typedef struct OperatorCacheEntry /* Config variable managed via guc.c */ extern int TaskAssignmentPolicy; +extern bool EnableUniqueJobIds; + /* Function declarations for building physical plans and constructing queries */ extern MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree);