From 38aaf1faba08fb3126e7d8c88e98fbb72a06a5d1 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 27 May 2020 12:29:59 +0300 Subject: [PATCH] use QueryCompletion struct Postgres introduced QueryCompletion struct. Hence a compat utility is added to finish query completion for older versions and pg >= 13. The commit on Postgres side: 2f9661311b83dc481fc19f6e3bda015392010a40 --- src/backend/distributed/commands/multi_copy.c | 38 +++++++++++-------- .../distributed/commands/utility_hook.c | 9 ++++- src/include/distributed/commands/multi_copy.h | 3 +- .../distributed/commands/utility_hook.h | 9 ++++- src/include/distributed/version_compat.h | 2 + 5 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a8af26a8e..19df51152 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -93,6 +93,7 @@ #include "distributed/hash_helpers.h" #include "executor/executor.h" #include "foreign/foreign.h" +#include "tcop/cmdtag.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "nodes/makefuncs.h" @@ -211,8 +212,8 @@ typedef struct ShardConnections /* Local functions forward declarations */ -static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); -static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); +static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); +static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId); static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, @@ -244,7 +245,7 @@ static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); -static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); +static void CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); static HTAB * CreateConnectionStateHash(MemoryContext memoryContext); static HTAB * CreateShardStateHash(MemoryContext memoryContext); static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, @@ -277,7 +278,7 @@ static void UnclaimCopyConnections(List *connectionStateList); static void ShutdownCopyConnectionState(CopyConnectionState *connectionState, CitusCopyDestReceiver *copyDest); static SelectStmt * CitusCopySelect(CopyStmt *copyStatement); -static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag); +static void CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState, MultiConnection *connection); @@ -313,6 +314,7 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); +static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static bool ShouldExecuteCopyLocally(bool isIntermediateResult); @@ -329,7 +331,7 @@ PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb); * and the partition method of the distributed table. */ static void -CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) +CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) { UseCoordinatedTransaction(); @@ -385,7 +387,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) * rows. */ static void -CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) +CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) { Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); @@ -554,8 +556,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) if (completionTag != NULL) { - SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); + CompleteCopyQueryTagCompat(completionTag, processedRowCount); } } @@ -565,7 +566,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) * tables where we create new shards into which to copy rows. */ static void -CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) +CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId) { /* allocate column values and nulls arrays */ Relation distributedRelation = table_open(relationId, RowExclusiveLock); @@ -739,11 +740,19 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) if (completionTag != NULL) { - SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, - "COPY " UINT64_FORMAT, processedRowCount); + CompleteCopyQueryTagCompat(completionTag, processedRowCount); } } +static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount) { + #if PG_VERSION_NUM >= PG_VERSION_13 + SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount); + #else + SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "COPY " UINT64_FORMAT, processedRowCount); + #endif +} + /* * RemoveOptionFromList removes an option from a list of options in a @@ -2769,7 +2778,7 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName) * further processing is needed. */ Node * -ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString) +ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const char *queryString) { /* * Handle special COPY "resultid" FROM STDIN WITH (format result) commands @@ -2917,7 +2926,7 @@ CitusCopySelect(CopyStmt *copyStatement) * table dump. */ static void -CitusCopyTo(CopyStmt *copyStatement, char *completionTag) +CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) { ListCell *shardIntervalCell = NULL; int64 tuplesSent = 0; @@ -3008,8 +3017,7 @@ CitusCopyTo(CopyStmt *copyStatement, char *completionTag) if (completionTag != NULL) { - SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, - tuplesSent); + CompleteCopyQueryTagCompat(completionTag, tuplesSent); } } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 682c08240..af80c8b31 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -25,6 +25,8 @@ *------------------------------------------------------------------------- */ +#include "distributed/pg_version_constants.h" + #include "postgres.h" #include "miscadmin.h" @@ -88,7 +90,9 @@ static bool IsDropSchemaOrDB(Node *parsetree); */ void CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, - ParamListInfo params, DestReceiver *dest, char *completionTag) + ParamListInfo params, DestReceiver *dest, + QueryCompletionCompat *completionTag + ) { PlannedStmt *plannedStmt = makeNode(PlannedStmt); plannedStmt->commandType = CMD_UTILITY; @@ -115,7 +119,8 @@ multi_ProcessUtility(PlannedStmt *pstmt, ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, - char *completionTag) + QueryCompletionCompat *completionTag + ) { Node *parsetree = pstmt->utilityStmt; List *ddlJobs = NIL; diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index b76037706..e840b73b6 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -15,6 +15,7 @@ #include "distributed/metadata_utility.h" #include "distributed/metadata_cache.h" +#include "distributed/version_compat.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_coerce.h" @@ -155,7 +156,7 @@ extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList); extern List * CreateRangeTable(Relation rel, AclMode requiredAccess); -extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, +extern Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); extern bool IsCopyResultStmt(CopyStmt *copyStatement); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 56ea1e48c..c9543d8b1 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -10,6 +10,8 @@ #ifndef MULTI_UTILITY_H #define MULTI_UTILITY_H +#include "distributed/pg_version_constants.h" + #include "postgres.h" #include "utils/relcache.h" @@ -51,10 +53,13 @@ typedef struct DDLJob extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString, ProcessUtilityContext context, ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, - char *completionTag); + QueryCompletionCompat *completionTag + ); extern void CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, ParamListInfo params, - DestReceiver *dest, char *completionTag); + DestReceiver *dest, + QueryCompletionCompat * completionTag + ); extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 80e67e06b..9c2fbfb91 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -41,6 +41,7 @@ #define varnoold varnosyn #define Set_ptr_value(a,b) a->ptr_value = b #define RangeTableEntryFromNSItem(a) a->p_rte +#define QueryCompletionCompat QueryCompletion #else /* pre PG13 */ #define lnext_compat(l, r) lnext(r) #define list_delete_cell_compat(l,c,p) list_delete_cell(l,c,p) @@ -52,6 +53,7 @@ #define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g) #define Set_ptr_value(a,b) a->data.ptr_value = b #define RangeTableEntryFromNSItem(a) a +#define QueryCompletionCompat char #endif #if PG_VERSION_NUM >= PG_VERSION_12