diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 0c55564ea..8f845c096 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -25,6 +25,7 @@ #include "distributed/metadata_cache.h" #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" +#include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "mb/pg_wchar.h" #include "utils/hsearch.h" @@ -41,9 +42,13 @@ static uint32 ConnectionHashHash(const void *key, Size keysize); static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); +static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); +static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; + + /* * Initialize per-backend connection management infrastructure. */ @@ -663,6 +668,8 @@ StartConnectionEstablishment(ConnectionHashKey *key) */ PQsetnonblocking(connection->pgConn, true); + SetCitusNoticeProcessor(connection); + return connection; } @@ -766,3 +773,83 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } } } + + +/* + * SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor + */ +void +SetCitusNoticeProcessor(MultiConnection *connection) +{ + PQsetNoticeProcessor(connection->pgConn, DefaultCitusNoticeProcessor, + connection); +} + + +/* + * SetCitusNoticeLevel is used to set the notice level for distributed + * queries. + */ +void +SetCitusNoticeLevel(int level) +{ + CitusNoticeLogLevel = level; +} + + +/* + * UnsetCitusNoticeLevel sets the CitusNoticeLogLevel back to + * its default value. + */ +void +UnsetCitusNoticeLevel() +{ + CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; +} + + +/* + * DefaultCitusNoticeProcessor is used to redirect worker notices + * from logfile to console. + */ +static void +DefaultCitusNoticeProcessor(void *arg, const char *message) +{ + MultiConnection *connection = (MultiConnection *) arg; + char *nodeName = connection->hostname; + uint32 nodePort = connection->port; + char *chompedMessage = pchomp(message); + char *trimmedMessage = TrimLogLevel(chompedMessage); + char *level = strtok((char *) message, ":"); + + ereport(CitusNoticeLogLevel, (errmsg("%s", trimmedMessage), + errdetail("%s from %s:%d", + level, nodeName, nodePort))); +} + + +/* + * TrimLogLevel makes a copy of the string with the leading log level + * and spaces removed such as + * From: + * INFO: "normal2_102070": scanned 0 of 0 pages... + * To: + * "normal2_102070": scanned 0 of 0 pages... + */ +char * +TrimLogLevel(const char *message) +{ + size_t n; + + n = 0; + while (n < strlen(message) && message[n] != ':') + { + n++; + } + + do { + n++; + } while (n < strlen(message) && message[n] == ' '); + + return pstrdup(message + n); +} diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 49eecdae7..9cddba5c3 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -1092,7 +1092,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn CoordinatedTransactionUse2PC(); } - if (firstTask->taskType == DDL_TASK) + if (firstTask->taskType == DDL_TASK || firstTask->taskType == VACUUM_ANALYZE_TASK) { connectionFlags = FOR_DDL; } @@ -1173,6 +1173,15 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn connection = (MultiConnection *) list_nth(connectionList, placementIndex); + /* + * if the task is a VACUUM or ANALYZE, we set CitusNoticeLogLevel to INFO + * to see the logs in console. + */ + if (task->taskType == VACUUM_ANALYZE_TASK) + { + SetCitusNoticeLevel(INFO); + } + /* * If caller is interested, store query results the first time * through. The output of the query's execution on other shards is @@ -1235,6 +1244,9 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn placementIndex++; } + /* we should set the log level back to its default value since the task is done */ + UnsetCitusNoticeLevel(); + UnclaimAllShardConnections(shardConnectionHash); CHECK_FOR_INTERRUPTS(); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 607698741..158ffccf8 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -128,8 +128,7 @@ static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, const char *alterObjectSchemaCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); -static bool IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, - List *vacuumRelationIdList); +static bool IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList); static List * VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList); static StringInfo DeparseVacuumStmtPrefix(int vacuumFlags); static char * DeparseVacuumColumnNames(List *columnNameList); @@ -1609,7 +1608,7 @@ static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) { int relationIndex = 0; - bool supportedVacuumStmt = false; + bool distributedVacuumStmt = false; List *vacuumRelationList = ExtractVacuumTargetRels(vacuumStmt); ListCell *vacuumRelationCell = NULL; List *relationIdList = NIL; @@ -1625,8 +1624,8 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) relationIdList = lappend_oid(relationIdList, relationId); } - supportedVacuumStmt = IsSupportedDistributedVacuumStmt(vacuumStmt, relationIdList); - if (!supportedVacuumStmt) + distributedVacuumStmt = IsDistributedVacuumStmt(vacuumStmt, relationIdList); + if (!distributedVacuumStmt) { return; } @@ -1672,11 +1671,10 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) * the list of tables targeted by the provided statement. * * Returns true if the statement requires distributed execution and returns - * false otherwise; however, this function will raise errors if the provided - * statement needs distributed execution but contains unsupported options. + * false otherwise. */ static bool -IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList) +IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList) { const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; bool distributeStmt = false; @@ -1718,12 +1716,6 @@ IsSupportedDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdL "send targeted %s commands to worker nodes.", stmtName))); } - else if (vacuumStmt->options & VACOPT_VERBOSE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("the VERBOSE option is currently unsupported in " - "distributed %s commands", stmtName))); - } else { distributeStmt = true; @@ -1784,7 +1776,7 @@ VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList) task = CitusMakeNode(Task); task->jobId = jobId; task->taskId = taskId++; - task->taskType = DDL_TASK; + task->taskType = VACUUM_ANALYZE_TASK; task->queryString = pstrdup(vacuumString->data); task->dependedTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; @@ -1812,7 +1804,8 @@ DeparseVacuumStmtPrefix(int vacuumFlags) VACOPT_ANALYZE | VACOPT_DISABLE_PAGE_SKIPPING | VACOPT_FREEZE | - VACOPT_FULL + VACOPT_FULL | + VACOPT_VERBOSE ); /* determine actual command and block out its bit */ @@ -1825,6 +1818,12 @@ DeparseVacuumStmtPrefix(int vacuumFlags) { appendStringInfoString(vacuumPrefix, "ANALYZE "); vacuumFlags &= ~VACOPT_ANALYZE; + + if (vacuumFlags & VACOPT_VERBOSE) + { + appendStringInfoString(vacuumPrefix, "VERBOSE "); + vacuumFlags &= ~VACOPT_VERBOSE; + } } /* unsupported flags should have already been rejected */ @@ -1859,6 +1858,11 @@ DeparseVacuumStmtPrefix(int vacuumFlags) appendStringInfoString(vacuumPrefix, "FULL,"); } + if (vacuumFlags & VACOPT_VERBOSE) + { + appendStringInfoString(vacuumPrefix, "VERBOSE,"); + } + vacuumPrefix->data[vacuumPrefix->len - 1] = ')'; appendStringInfoChar(vacuumPrefix, ' '); diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index 90c582d27..3dfdbde53 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -81,7 +81,7 @@ OpenTransactionsForAllTasks(List *taskList, int connectionFlags) else { /* can only open connections for DDL and DML commands */ - Assert(task->taskType == DDL_TASK); + Assert(task->taskType == DDL_TASK || VACUUM_ANALYZE_TASK); accessType = PLACEMENT_ACCESS_DDL; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1de4dc065..e78563fc6 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -237,6 +237,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) */ SubPlanLevel = 0; UnSetDistributedTransactionId(); + UnsetCitusNoticeLevel(); break; } @@ -352,6 +353,8 @@ CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId, { CoordinatedRemoteTransactionsSavepointRollback(subId); } + + UnsetCitusNoticeLevel(); break; } diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 95b11170b..912aee9af 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -21,6 +21,9 @@ /* maximum (textual) lengths of hostname and port */ #define MAX_NODE_LENGTH 255 /* includes 0 byte */ +/* default notice level */ +#define DEFAULT_CITUS_NOTICE_LEVEL DEBUG1 + /* forward declare, to avoid forcing large headers on everyone */ struct pg_conn; /* target of the PGconn typedef */ struct MemoryContextData; @@ -165,5 +168,11 @@ extern void FinishConnectionEstablishment(MultiConnection *connection); extern void ClaimConnectionExclusively(MultiConnection *connection); extern void UnclaimConnection(MultiConnection *connection); +/* dealing with notice handler */ +extern void SetCitusNoticeProcessor(MultiConnection *connection); +extern void SetCitusNoticeLevel(int level); +extern char * TrimLogLevel(const char *message); +extern void UnsetCitusNoticeLevel(void); + #endif /* CONNECTION_MANAGMENT_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 58ea5f6e2..e7ee201da 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -84,7 +84,8 @@ typedef enum MERGE_FETCH_TASK = 5, MODIFY_TASK = 6, ROUTER_TASK = 7, - DDL_TASK = 8 + DDL_TASK = 8, + VACUUM_ANALYZE_TASK = 9 } TaskType; diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index 75e18764b..c6f8e645d 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -469,7 +469,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test'); 1 (1 row) --- TODO: support VERBOSE --- VACUUM VERBOSE dustbunnies; --- VACUUM (FULL, VERBOSE) dustbunnies; --- ANALYZE VERBOSE dustbunnies; diff --git a/src/test/regress/expected/multi_utilities_0.out b/src/test/regress/expected/multi_utilities_0.out index c2a8c0191..97e3e149e 100644 --- a/src/test/regress/expected/multi_utilities_0.out +++ b/src/test/regress/expected/multi_utilities_0.out @@ -472,7 +472,3 @@ SELECT 1 FROM citus_create_restore_point('regression-test'); 1 (1 row) --- TODO: support VERBOSE --- VACUUM VERBOSE dustbunnies; --- VACUUM (FULL, VERBOSE) dustbunnies; --- ANALYZE VERBOSE dustbunnies; diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index 2069bd6ca..b70d7a0af 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -292,8 +292,3 @@ SELECT citus_truncate_trigger(); -- confirm that citus_create_restore_point works SELECT 1 FROM citus_create_restore_point('regression-test'); - --- TODO: support VERBOSE --- VACUUM VERBOSE dustbunnies; --- VACUUM (FULL, VERBOSE) dustbunnies; --- ANALYZE VERBOSE dustbunnies;