Merge pull request #3531 from citusdata/refactor/utility-local

Refactor some pieces of code before implementing local drop & truncate execution
pull/3453/head
Onur Tirtir 2020-02-24 18:35:07 +03:00 committed by GitHub
commit 2e096d4eb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 183 additions and 106 deletions

View File

@ -37,9 +37,9 @@
* same connection since it may hold relevant locks or have uncommitted * same connection since it may hold relevant locks or have uncommitted
* writes. In that case we "assign" the task to a connection by adding * writes. In that case we "assign" the task to a connection by adding
* it to the task queue of specific connection (in * it to the task queue of specific connection (in
* AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task
* and add it to the task queue of a worker pool, which means that it * unassigned and add it to the task queue of a worker pool, which means
* can be executed over any connection in the pool. * that it can be executed over any connection in the pool.
* *
* A task may be executed on multiple placements in case of a reference * A task may be executed on multiple placements in case of a reference
* table or a replicated distributed table. Depending on the type of * table or a replicated distributed table. Depending on the type of
@ -772,7 +772,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList);
/* /*
* We're deliberately not setting execution->rowsProceessed here. The main reason * We're deliberately not setting execution->rowsProcessed here. The main reason
* is that modifications to reference tables would end-up setting it both here * is that modifications to reference tables would end-up setting it both here
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it * and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
* for reference table modifications in AdaptiveExecutor. * for reference table modifications in AdaptiveExecutor.
@ -884,7 +884,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
/* /*
* The code-paths that rely on this function do not know how execute * The code-paths that rely on this function do not know how to execute
* commands locally. * commands locally.
*/ */
ErrorIfTransactionAccessedPlacementsLocally(); ErrorIfTransactionAccessedPlacementsLocally();

View File

@ -134,7 +134,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
if (paramListInfo != NULL) if (paramListInfo != NULL)
{ {
const char **parameterValues = NULL; /* not used anywhere, so decleare here */ /* not used anywhere, so declare here */
const char **parameterValues = NULL;
ExtractParametersForLocalExecution(paramListInfo, &parameterTypes, ExtractParametersForLocalExecution(paramListInfo, &parameterTypes,
&parameterValues); &parameterValues);
@ -228,6 +229,25 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
} }
/*
* LogLocalCommand logs commands executed locally on this node. Although we're
* talking about local execution, the function relies on citus.log_remote_commands
* GUC. This makes sense because the local execution is still on a shard of a
* distributed table, meaning it is part of distributed execution.
*/
static void
LogLocalCommand(Task *task)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
return;
}
ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(TaskQueryString(task)))));
}
/* /*
* ExtractLocalAndRemoteTasks gets a taskList and generates two * ExtractLocalAndRemoteTasks gets a taskList and generates two
* task lists namely localTaskList and remoteTaskList. The function goes * task lists namely localTaskList and remoteTaskList. The function goes
@ -236,9 +256,9 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
* the lists could be NIL depending on the input taskList. * the lists could be NIL depending on the input taskList.
* *
* One slightly different case is modifications to replicated tables * One slightly different case is modifications to replicated tables
* (e.g., reference tables) where a single task ends in two seperate tasks * (e.g., reference tables) where a single task ends in two separate tasks
* and the local task is added to localTaskList and the remanings to the * and the local task is added to localTaskList and the remaning ones to
* remoteTaskList. * the remoteTaskList.
*/ */
void void
ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
@ -280,8 +300,6 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
} }
else else
{ {
Task *remoteTask = NULL;
/* /*
* At this point, we're dealing with reference tables or intermediate results * At this point, we're dealing with reference tables or intermediate results
* where the task has placements on both local and remote nodes. We always * where the task has placements on both local and remote nodes. We always
@ -301,7 +319,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
} }
else else
{ {
remoteTask = copyObject(task); Task *remoteTask = copyObject(task);
remoteTask->taskPlacementList = remoteTaskPlacementList; remoteTask->taskPlacementList = remoteTaskPlacementList;
*remoteTaskList = lappend(*remoteTaskList, remoteTask); *remoteTaskList = lappend(*remoteTaskList, remoteTask);
@ -353,7 +371,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
{ {
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info; ParamListInfo paramListInfo = executorState->es_param_list_info;
DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore); DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore);
ScanDirection scanDirection = ForwardScanDirection; ScanDirection scanDirection = ForwardScanDirection;
QueryEnvironment *queryEnv = create_queryEnv(); QueryEnvironment *queryEnv = create_queryEnv();
int eflags = 0; int eflags = 0;
@ -363,14 +381,14 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que
* Use the tupleStore provided by the scanState because it is shared accross * Use the tupleStore provided by the scanState because it is shared accross
* the other task executions and the adaptive executor. * the other task executions and the adaptive executor.
*/ */
SetTuplestoreDestReceiverParams(tupleStoreDestReceiever, SetTuplestoreDestReceiverParams(tupleStoreDestReceiver,
scanState->tuplestorestate, scanState->tuplestorestate,
CurrentMemoryContext, false); CurrentMemoryContext, false);
/* Create a QueryDesc for the query */ /* Create a QueryDesc for the query */
QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString, QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString,
GetActiveSnapshot(), InvalidSnapshot, GetActiveSnapshot(), InvalidSnapshot,
tupleStoreDestReceiever, paramListInfo, tupleStoreDestReceiver, paramListInfo,
queryEnv, 0); queryEnv, 0);
ExecutorStart(queryDesc, eflags); ExecutorStart(queryDesc, eflags);
@ -430,6 +448,7 @@ ShouldExecuteTasksLocally(List *taskList)
} }
bool singleTask = (list_length(taskList) == 1); bool singleTask = (list_length(taskList) == 1);
if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList))) if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList)))
{ {
/* /*
@ -455,7 +474,7 @@ ShouldExecuteTasksLocally(List *taskList)
{ {
/* /*
* For multi-task executions, switching to local execution would likely to * For multi-task executions, switching to local execution would likely to
* perform poorly, because we'd lose the parallelizm. Note that the local * perform poorly, because we'd lose the parallelism. Note that the local
* execution is happening one task at a time (e.g., similar to sequential * execution is happening one task at a time (e.g., similar to sequential
* distributed execution). * distributed execution).
*/ */
@ -515,25 +534,6 @@ ErrorIfTransactionAccessedPlacementsLocally(void)
} }
/*
* LogLocalCommand logs commands executed locally on this node. Although we're
* talking about local execution, the function relies on citus.log_remote_commands GUC.
* This makes sense because the local execution is still on a shard of a distributed table,
* meaning it is part of distributed execution.
*/
static void
LogLocalCommand(Task *task)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
return;
}
ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(TaskQueryString(task)))));
}
/* /*
* DisableLocalExecution simply a C interface for * DisableLocalExecution simply a C interface for
* setting the following: * setting the following:

View File

@ -27,6 +27,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
@ -77,6 +78,12 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList,
Node *deleteCriteria); Node *deleteCriteria);
static int DropShards(Oid relationId, char *schemaName, char *relationName, static int DropShards(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList); List *deletableShardIntervalList);
static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName,
const char *dropShardPlacementCommand);
static char * CreateDropShardPlacementCommand(const char *schemaName,
const char *shardRelationName, char
storageType);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -326,7 +333,7 @@ master_drop_sequences(PG_FUNCTION_ARGS)
/* /*
* CheckTableSchemaNameForDrop errors out if the current user does not * CheckTableSchemaNameForDrop errors out if the current user does not
* have permission to undistribute the given relation, taking into * have permission to un-distribute the given relation, taking into
* account that it may be called from the drop trigger. If the table exists, * account that it may be called from the drop trigger. If the table exists,
* the function rewrites the given table and schema name. * the function rewrites the given table and schema name.
*/ */
@ -359,40 +366,147 @@ static int
DropShards(Oid relationId, char *schemaName, char *relationName, DropShards(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList) List *deletableShardIntervalList)
{ {
ListCell *shardIntervalCell = NULL; Assert(OidIsValid(relationId));
Assert(schemaName != NULL);
Assert(relationName != NULL);
UseCoordinatedTransaction(); UseCoordinatedTransaction();
/* At this point we intentionally decided to not use 2PC for reference tables */ /*
* At this point we intentionally decided to not use 2PC for reference
* tables
*/
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{ {
CoordinatedTransactionUse2PC(); CoordinatedTransactionUse2PC();
} }
foreach(shardIntervalCell, deletableShardIntervalList) ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, deletableShardIntervalList)
{ {
ListCell *shardPlacementCell = NULL;
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *shardRelationName = pstrdup(relationName); char *shardRelationName = pstrdup(relationName);
Assert(shardInterval->relationId == relationId); Assert(shardInterval->relationId == relationId);
/* Build shard relation name. */ /* build shard relation name */
AppendShardIdToName(&shardRelationName, shardId); AppendShardIdToName(&shardRelationName, shardId);
char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
List *shardPlacementList = ShardPlacementList(shardId); List *shardPlacementList = ShardPlacementList(shardId);
foreach(shardPlacementCell, shardPlacementList)
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{ {
ShardPlacement *shardPlacement = uint64 shardPlacementId = shardPlacement->placementId;
(ShardPlacement *) lfirst(shardPlacementCell);
if (shardPlacement->groupId == COORDINATOR_GROUP_ID &&
IsCoordinator() &&
DropSchemaOrDBInProgress())
{
/*
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the
* shard, if we try to drop it over another connection, we will
* get into a distributed deadlock.
*/
}
else
{
char storageType = shardInterval->storageType;
const char *dropShardPlacementCommand =
CreateDropShardPlacementCommand(schemaName, shardRelationName,
storageType);
/*
* Try to open a new connection (or use an existing one) to
* connect to target node to drop shard placement over that
* remote connection
*/
ExecuteDropShardPlacementCommandRemotely(shardPlacement,
shardRelationName,
dropShardPlacementCommand);
}
DeleteShardPlacementRow(shardPlacementId);
}
/*
* Now that we deleted all placements of the shard (or their metadata),
* delete the shard metadata as well.
*/
DeleteShardRow(shardId);
}
int droppedShardCount = list_length(deletableShardIntervalList);
return droppedShardCount;
}
/*
* ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command
* via remote critical connection.
*/
static void
ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName,
const char *dropShardPlacementCommand)
{
Assert(shardPlacement != NULL);
Assert(shardRelationName != NULL);
Assert(dropShardPlacementCommand != NULL);
uint32 connectionFlags = FOR_DDL;
MultiConnection *connection = GetPlacementConnection(connectionFlags,
shardPlacement,
NULL);
RemoteTransactionBeginIfNecessary(connection);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
uint64 placementId = shardPlacement->placementId;
char *workerName = shardPlacement->nodeName; char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort; uint32 workerPort = shardPlacement->nodePort;
StringInfo workerDropQuery = makeStringInfo();
uint32 connectionFlags = FOR_DDL;
char storageType = shardInterval->storageType; ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
"\"%s:%u\"", shardRelationName, workerName,
workerPort),
errdetail("Marking this shard placement for "
"deletion")));
UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE);
return;
}
MarkRemoteTransactionCritical(connection);
ExecuteCriticalRemoteCommand(connection, dropShardPlacementCommand);
}
/*
* CreateDropShardPlacementCommand function builds the DROP command to drop
* the given shard relation by qualifying it with schema name according to
* shard relation's storage type.
*/
static char *
CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName,
char storageType)
{
Assert(schemaName != NULL);
Assert(shardRelationName != NULL);
StringInfo workerDropQuery = makeStringInfo();
const char *quotedShardName = quote_qualified_identifier(schemaName,
shardRelationName);
/* build workerDropQuery according to shard storage type */
if (storageType == SHARD_STORAGE_TABLE) if (storageType == SHARD_STORAGE_TABLE)
{ {
appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND,
@ -404,54 +518,13 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND,
quotedShardName); quotedShardName);
} }
else
/*
* The active DROP SCHEMA/DATABASE ... CASCADE will drop the shard, if we
* try to drop it over another connection, we will get into a distributed
* deadlock.
*/
if (shardPlacement->groupId == COORDINATOR_GROUP_ID &&
IsCoordinator() &&
DropSchemaOrDBInProgress())
{ {
DeleteShardPlacementRow(shardPlacement->placementId); /* no other storage type is expected here */
continue; Assert(false);
} }
MultiConnection *connection = GetPlacementConnection(connectionFlags, return workerDropQuery->data;
shardPlacement,
NULL);
RemoteTransactionBeginIfNecessary(connection);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
uint64 placementId = shardPlacement->placementId;
ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node "
"\"%s:%u\"", shardRelationName, workerName,
workerPort),
errdetail("Marking this shard placement for "
"deletion")));
UpdateShardPlacementState(placementId, SHARD_STATE_TO_DELETE);
continue;
}
MarkRemoteTransactionCritical(connection);
ExecuteCriticalRemoteCommand(connection, workerDropQuery->data);
DeleteShardPlacementRow(shardPlacement->placementId);
}
DeleteShardRow(shardId);
}
int droppedShardCount = list_length(deletableShardIntervalList);
return droppedShardCount;
} }

View File

@ -27,7 +27,6 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
static List * TruncateTaskList(Oid relationId); static List * TruncateTaskList(Oid relationId);
@ -89,29 +88,36 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
static List * static List *
TruncateTaskList(Oid relationId) TruncateTaskList(Oid relationId)
{ {
List *shardIntervalList = LoadShardIntervalList(relationId); /* resulting task list */
ListCell *shardIntervalCell = NULL;
List *taskList = NIL; List *taskList = NIL;
/* enumerate the tasks when putting them to the taskList */
int taskId = 1; int taskId = 1;
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *relationName = get_rel_name(relationId); char *relationName = get_rel_name(relationId);
List *shardIntervalList = LoadShardIntervalList(relationId);
/* lock metadata before getting placement lists */ /* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock); LockShardListMetadata(shardIntervalList, ShareLock);
ListCell *shardIntervalCell = NULL;
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *shardRelationName = pstrdup(relationName);
/* build shard relation name */
AppendShardIdToName(&shardRelationName, shardId);
char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
StringInfo shardQueryString = makeStringInfo(); StringInfo shardQueryString = makeStringInfo();
char *shardName = pstrdup(relationName); appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", quotedShardName);
AppendShardIdToName(&shardName, shardId);
appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE",
quote_qualified_identifier(schemaName, shardName));
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID; task->jobId = INVALID_JOB_ID;

View File

@ -20,16 +20,14 @@ extern bool LogLocalCommands;
extern bool TransactionAccessedLocalPlacement; extern bool TransactionAccessedLocalPlacement;
extern bool TransactionConnectedToLocalGroup; extern bool TransactionConnectedToLocalGroup;
/* extern function declarations */
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList); List **localTaskList, List **remoteTaskList);
extern bool ShouldExecuteTasksLocally(List *taskList); extern bool ShouldExecuteTasksLocally(List *taskList);
extern bool AnyTaskAccessesLocalNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task);
extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void ErrorIfTransactionAccessedPlacementsLocally(void);
extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList);
extern char * TaskQueryString(Task *task);
extern bool TaskAccessesLocalNode(Task *task);
extern void DisableLocalExecution(void); extern void DisableLocalExecution(void);
extern bool AnyTaskAccessesRemoteNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task);
#endif /* LOCAL_EXECUTION_H */ #endif /* LOCAL_EXECUTION_H */