diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 962badf06..49ef3a2ab 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -94,15 +94,6 @@ static CustomExecMethods AdaptiveExecutorCustomExecMethods = { .ExplainCustomScan = CitusExplainScan }; -static CustomExecMethods TaskTrackerCustomExecMethods = { - .CustomName = "TaskTrackerScan", - .BeginCustomScan = CitusBeginScan, - .ExecCustomScan = TaskTrackerExecScan, - .EndCustomScan = CitusEndScan, - .ReScanCustomScan = CitusReScan, - .ExplainCustomScan = CitusExplainScan -}; - static CustomExecMethods CoordinatorInsertSelectCustomExecMethods = { .CustomName = "CoordinatorInsertSelectScan", .BeginCustomScan = CitusBeginScan, @@ -126,7 +117,6 @@ IsCitusCustomState(PlanState *planState) CustomScanState *css = castNode(CustomScanState, planState); if (css->methods == &AdaptiveExecutorCustomExecMethods || - css->methods == &TaskTrackerCustomExecMethods || css->methods == &CoordinatorInsertSelectCustomExecMethods) { return true; diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 0016e96b8..fca439a32 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -17,35 +17,6 @@ #include "funcapi.h" #include "miscadmin.h" -#include - -#include "access/htup_details.h" -#include "access/xact.h" -#include "catalog/pg_namespace.h" -#include "catalog/namespace.h" -#include "commands/dbcommands.h" -#include "commands/schemacmds.h" -#include "commands/trigger.h" -#include "distributed/metadata_cache.h" -#include "distributed/multi_client_executor.h" -#include "distributed/multi_server_executor.h" -#include "distributed/resource_lock.h" -#include "distributed/task_tracker.h" -#include "distributed/task_tracker_protocol.h" -#include "distributed/worker_protocol.h" -#include "storage/lwlock.h" -#include "storage/pmsignal.h" -#include "utils/builtins.h" -#include "utils/syscache.h" -#include "utils/lsyscache.h" - - -/* Local functions forward declarations */ -static bool TaskTrackerRunning(void); -static void CreateTask(uint64 jobId, uint32 taskId, char *taskCallString); -static void UpdateTask(WorkerTask *workerTask, char *taskCallString); -static void CleanupTask(WorkerTask *workerTask); - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(task_tracker_assign_task); @@ -62,73 +33,9 @@ PG_FUNCTION_INFO_V1(task_tracker_conninfo_cache_invalidate); Datum task_tracker_assign_task(PG_FUNCTION_ARGS) { - uint64 jobId = PG_GETARG_INT64(0); - uint32 taskId = PG_GETARG_UINT32(1); - text *taskCallStringText = PG_GETARG_TEXT_P(2); + ereport(ERROR, (errmsg("not supposed to get here, did you cheat?"))); - StringInfo jobSchemaName = JobSchemaName(jobId); - - char *taskCallString = text_to_cstring(taskCallStringText); - uint32 taskCallStringLength = strlen(taskCallString); - - - CheckCitusVersion(ERROR); - - /* check that we have a running task tracker on this host */ - bool taskTrackerRunning = TaskTrackerRunning(); - if (!taskTrackerRunning) - { - ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("the task tracker has been disabled or shut down"))); - } - - /* check that we have enough space in our shared hash for this string */ - if (taskCallStringLength >= MaxTaskStringSize) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("task string length (%d) exceeds maximum assignable " - "size (%d)", taskCallStringLength, MaxTaskStringSize), - errhint("Consider increasing citus.max_task_string_size."))); - } - - /* - * If the schema does not exist, we create it. However, the schema does not - * become visible to other processes until the transaction commits, and we - * therefore do not release the resource lock in this case. Otherwise, the - * schema is already visible, and we immediately release the resource lock. - */ - LockJobResource(jobId, AccessExclusiveLock); - bool schemaExists = JobSchemaExists(jobSchemaName); - if (!schemaExists) - { - /* lock gets automatically released upon return from this function */ - CreateJobSchema(jobSchemaName, NULL); - } - else - { - Oid schemaId = get_namespace_oid(jobSchemaName->data, false); - - EnsureSchemaOwner(schemaId); - - UnlockJobResource(jobId, AccessExclusiveLock); - } - - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - - /* check if we already have the task in our shared hash */ - WorkerTask *workerTask = WorkerTasksHashFind(jobId, taskId); - if (workerTask == NULL) - { - CreateTask(jobId, taskId, taskCallString); - } - else - { - UpdateTask(workerTask, taskCallString); - } - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - - PG_RETURN_VOID(); + PG_RETURN_NULL(); } @@ -136,41 +43,9 @@ task_tracker_assign_task(PG_FUNCTION_ARGS) Datum task_tracker_task_status(PG_FUNCTION_ARGS) { - uint64 jobId = PG_GETARG_INT64(0); - uint32 taskId = PG_GETARG_UINT32(1); + ereport(ERROR, (errmsg("not supposed to get here, did you cheat?"))); - WorkerTask *workerTask = NULL; - uint32 taskStatus = 0; - char *userName = CurrentUserName(); - - CheckCitusVersion(ERROR); - - bool taskTrackerRunning = TaskTrackerRunning(); - - if (taskTrackerRunning) - { - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); - - workerTask = WorkerTasksHashFind(jobId, taskId); - if (workerTask == NULL || - (!superuser() && strncmp(userName, workerTask->userName, NAMEDATALEN) != 0)) - { - ereport(ERROR, (errmsg("could not find the worker task"), - errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u", - jobId, taskId))); - } - - taskStatus = (uint32) workerTask->taskStatus; - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - } - else - { - ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("the task tracker has been disabled or shut down"))); - } - - PG_RETURN_UINT32(taskStatus); + PG_RETURN_UINT32(0); } @@ -181,62 +56,9 @@ task_tracker_task_status(PG_FUNCTION_ARGS) Datum task_tracker_cleanup_job(PG_FUNCTION_ARGS) { - uint64 jobId = PG_GETARG_INT64(0); + ereport(ERROR, (errmsg("not supposed to get here, did you cheat?"))); - HASH_SEQ_STATUS status; - - CheckCitusVersion(ERROR); - - StringInfo jobSchemaName = JobSchemaName(jobId); - StringInfo jobDirectoryName = JobDirectoryName(jobId); - - /* - * We'll keep this lock for a while, but that's ok because nothing - * else should be happening on this job. - */ - LockJobResource(jobId, AccessExclusiveLock); - - bool schemaExists = JobSchemaExists(jobSchemaName); - if (schemaExists) - { - Oid schemaId = get_namespace_oid(jobSchemaName->data, false); - - EnsureSchemaOwner(schemaId); - } - - /* - * We first clean up any open connections, and remove tasks belonging to - * this job from the shared hash. - */ - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_EXCLUSIVE); - - hash_seq_init(&status, TaskTrackerTaskHash); - - WorkerTask *currentTask = (WorkerTask *) hash_seq_search(&status); - while (currentTask != NULL) - { - if (currentTask->jobId == jobId) - { - CleanupTask(currentTask); - } - - currentTask = (WorkerTask *) hash_seq_search(&status); - } - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - - /* - * We then delete the job directory and schema, if they exist. This cleans - * up all intermediate files and tables allocated for the job. Note that the - * schema drop call can block if another process is creating the schema or - * writing to a table within the schema. - */ - CitusRemoveDirectory(jobDirectoryName->data); - - RemoveJobSchema(jobSchemaName); - UnlockJobResource(jobId, AccessExclusiveLock); - - PG_RETURN_VOID(); + PG_RETURN_NULL(); } @@ -250,211 +72,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS) Datum task_tracker_conninfo_cache_invalidate(PG_FUNCTION_ARGS) { - if (!CALLED_AS_TRIGGER(fcinfo)) - { - ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), - errmsg("must be called as trigger"))); - } - - CheckCitusVersion(ERROR); - - /* no-op in community edition */ + ereport(ERROR, (errmsg("not supposed to get here, did you cheat?"))); PG_RETURN_DATUM(PointerGetDatum(NULL)); } - - -/* - * TaskTrackerRunning checks if the task tracker process is running. To do this, - * the function checks if the task tracker is configured to start up, and infers - * from shared memory that the tracker hasn't received a shut down request. - */ -static bool -TaskTrackerRunning(void) -{ - bool taskTrackerRunning = true; - - /* if postmaster shut down, infer task tracker shut down from it */ - bool postmasterAlive = PostmasterIsAlive(); - if (!postmasterAlive) - { - return false; - } - - /* - * When the task tracker receives a termination signal, it inserts a special - * marker task to the shared hash. We need to look up this marker task since - * the postmaster doesn't send a terminate signal to running backends. - */ - LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED); - - WorkerTask *workerTask = WorkerTasksHashFind(RESERVED_JOB_ID, - SHUTDOWN_MARKER_TASK_ID); - if (workerTask != NULL) - { - taskTrackerRunning = false; - } - - LWLockRelease(&WorkerTasksSharedState->taskHashLock); - - return taskTrackerRunning; -} - - -/* - * CreateJobSchema creates a job schema with the given schema name. Note that - * this function ensures that our pg_ prefixed schema names can be created. - * Further note that the created schema does not become visible to other - * processes until the transaction commits. - * - * If schemaOwner is NULL, then current user is used. - */ -void -CreateJobSchema(StringInfo schemaName, char *schemaOwner) -{ - const char *queryString = NULL; - - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - RoleSpec currentUserRole = { 0 }; - - /* allow schema names that start with pg_ */ - bool oldAllowSystemTableMods = allowSystemTableMods; - allowSystemTableMods = true; - - /* ensure we're allowed to create this schema */ - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - - if (schemaOwner == NULL) - { - schemaOwner = GetUserNameFromId(savedUserId, false); - } - - /* build a CREATE SCHEMA statement */ - currentUserRole.type = T_RoleSpec; - currentUserRole.roletype = ROLESPEC_CSTRING; - currentUserRole.rolename = schemaOwner; - currentUserRole.location = -1; - - CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); - createSchemaStmt->schemaname = schemaName->data; - createSchemaStmt->schemaElts = NIL; - - /* actually create schema with the current user as owner */ - createSchemaStmt->authrole = ¤tUserRole; - CreateSchemaCommand(createSchemaStmt, queryString, -1, -1); - - CommandCounterIncrement(); - - /* and reset environment */ - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - allowSystemTableMods = oldAllowSystemTableMods; -} - - -/* - * CreateTask creates a new task in shared hash, initializes the task, and sets - * the task to assigned state. Note that this function expects the caller to - * hold an exclusive lock over the shared hash. - */ -static void -CreateTask(uint64 jobId, uint32 taskId, char *taskCallString) -{ - const char *databaseName = CurrentDatabaseName(); - char *userName = CurrentUserName(); - - /* increase task priority for cleanup tasks */ - uint32 assignmentTime = (uint32) time(NULL); - if (taskId == JOB_CLEANUP_TASK_ID) - { - assignmentTime = HIGH_PRIORITY_TASK_TIME; - } - - /* enter the worker task into shared hash and initialize the task */ - WorkerTask *workerTask = WorkerTasksHashEnter(jobId, taskId); - workerTask->assignedAt = assignmentTime; - strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); - - workerTask->taskStatus = TASK_ASSIGNED; - workerTask->connectionId = INVALID_CONNECTION_ID; - workerTask->failureCount = 0; - strlcpy(workerTask->databaseName, databaseName, NAMEDATALEN); - strlcpy(workerTask->userName, userName, NAMEDATALEN); -} - - -/* - * UpdateTask updates the call string text for an already existing task. Note - * that this function expects the caller to hold an exclusive lock over the - * shared hash. - */ -static void -UpdateTask(WorkerTask *workerTask, char *taskCallString) -{ - TaskStatus taskStatus = workerTask->taskStatus; - Assert(taskStatus != TASK_STATUS_INVALID_FIRST); - - /* - * 1. If the task has succeeded or has been canceled, we don't do anything. - * 2. If the task has permanently failed, we update the task call string, - * reset the failure count, and change the task's status to schedulable. - * 3. If the task is in conduit, we update the task call string, and reset - * the failure count. - */ - if (taskStatus == TASK_SUCCEEDED || taskStatus == TASK_CANCEL_REQUESTED || - taskStatus == TASK_CANCELED) - { - /* nothing to do */ - } - else if (taskStatus == TASK_PERMANENTLY_FAILED) - { - strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); - workerTask->failureCount = 0; - workerTask->taskStatus = TASK_ASSIGNED; - } - else - { - strlcpy(workerTask->taskCallString, taskCallString, MaxTaskStringSize); - workerTask->failureCount = 0; - } -} - - -/* Cleans up connection and shared hash entry associated with the given task. */ -static void -CleanupTask(WorkerTask *workerTask) -{ - void *hashKey = (void *) workerTask; - - /* - * If the connection is still valid, the master node decided to terminate - * the task prematurely. This can happen when the user wants to cancel the - * query, or when a speculatively executed task finishes elsewhere and the - * query completes. - */ - if (workerTask->connectionId != INVALID_CONNECTION_ID) - { - /* - * The task tracker process owns the connections to local backends, and - * we cannot interefere with those connections from another process. We - * therefore ask the task tracker to clean up the connection and to - * remove the task from the shared hash. Note that one of the cleaned up - * tasks will always be the clean-up task itself. - */ - ereport(DEBUG3, (errmsg("requesting cancel for worker task"), - errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u", - workerTask->jobId, workerTask->taskId))); - - workerTask->taskStatus = TASK_CANCEL_REQUESTED; - return; - } - - /* remove task from the shared hash */ - WorkerTask *taskRemoved = hash_search(TaskTrackerTaskHash, hashKey, HASH_REMOVE, - NULL); - if (taskRemoved == NULL) - { - ereport(FATAL, (errmsg("worker task hash corrupted"))); - } -} diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index d80b075d1..7becd825d 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -54,6 +54,8 @@ static void CreateTaskTable(StringInfo schemaName, StringInfo relationName, List *columnNameList, List *columnTypeList); static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, StringInfo sourceDirectoryName, Oid userId); +static void +CreateJobSchema(StringInfo schemaName, char *schemaOwner); /* exports for SQL callable functions */ @@ -87,6 +89,58 @@ worker_create_schema(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * CreateJobSchema creates a job schema with the given schema name. Note that + * this function ensures that our pg_ prefixed schema names can be created. + * Further note that the created schema does not become visible to other + * processes until the transaction commits. + * + * If schemaOwner is NULL, then current user is used. + */ +static void +CreateJobSchema(StringInfo schemaName, char *schemaOwner) +{ + const char *queryString = NULL; + + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + RoleSpec currentUserRole = { 0 }; + + /* allow schema names that start with pg_ */ + bool oldAllowSystemTableMods = allowSystemTableMods; + allowSystemTableMods = true; + + /* ensure we're allowed to create this schema */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + if (schemaOwner == NULL) + { + schemaOwner = GetUserNameFromId(savedUserId, false); + } + + /* build a CREATE SCHEMA statement */ + currentUserRole.type = T_RoleSpec; + currentUserRole.roletype = ROLESPEC_CSTRING; + currentUserRole.rolename = schemaOwner; + currentUserRole.location = -1; + + CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); + createSchemaStmt->schemaname = schemaName->data; + createSchemaStmt->schemaElts = NIL; + + /* actually create schema with the current user as owner */ + createSchemaStmt->authrole = ¤tUserRole; + CreateSchemaCommand(createSchemaStmt, queryString, -1, -1); + + CommandCounterIncrement(); + + /* and reset environment */ + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + allowSystemTableMods = oldAllowSystemTableMods; +} + + /* * worker_repartition_cleanup removes the job directory and schema with the given job id . diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index 40cab9a45..f616d2f73 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -187,7 +187,4 @@ extern bool TaskExecutionFailed(TaskExecution *taskExecution); extern void AdjustStateForFailure(TaskExecution *taskExecution); extern int MaxMasterConnectionCount(void); - -extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); - #endif /* MULTI_SERVER_EXECUTOR_H */ diff --git a/src/include/distributed/task_tracker_protocol.h b/src/include/distributed/task_tracker_protocol.h index 7fd82ea94..d6abefd54 100644 --- a/src/include/distributed/task_tracker_protocol.h +++ b/src/include/distributed/task_tracker_protocol.h @@ -17,8 +17,6 @@ #include "fmgr.h" -extern void CreateJobSchema(StringInfo schemaName, char *schemaOwner); - /* Function declarations for distributed task management */ extern Datum task_tracker_assign_task(PG_FUNCTION_ARGS); extern Datum task_tracker_update_data_fetch_task(PG_FUNCTION_ARGS);