use QueryCompletion struct

Postgres introduced QueryCompletion struct. Hence a compat utility is
added to finish query completion for older versions and pg >= 13.

The commit on Postgres side:
2f9661311b83dc481fc19f6e3bda015392010a40
pull/3900/head
Sait Talha Nisanci 2020-05-27 12:29:59 +03:00
parent 9f1ec792b3
commit 38aaf1faba
5 changed files with 41 additions and 20 deletions

View File

@ -93,6 +93,7 @@
#include "distributed/hash_helpers.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "tcop/cmdtag.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "nodes/makefuncs.h"
@ -211,8 +212,8 @@ typedef struct ShardConnections
/* Local functions forward declarations */
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId);
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool
stopOnFailure,
@ -244,7 +245,7 @@ static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
bool binaryFormat);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
static void CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag);
static HTAB * CreateConnectionStateHash(MemoryContext memoryContext);
static HTAB * CreateShardStateHash(MemoryContext memoryContext);
static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
@ -277,7 +278,7 @@ static void UnclaimCopyConnections(List *connectionStateList);
static void ShutdownCopyConnectionState(CopyConnectionState *connectionState,
CitusCopyDestReceiver *copyDest);
static SelectStmt * CitusCopySelect(CopyStmt *copyStatement);
static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag);
static void CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag);
static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState,
MultiConnection *connection);
@ -313,6 +314,7 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot,
static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool ContainsLocalPlacement(int64 shardId);
static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount);
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
static bool ShouldExecuteCopyLocally(bool isIntermediateResult);
@ -329,7 +331,7 @@ PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb);
* and the partition method of the distributed table.
*/
static void
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
{
UseCoordinatedTransaction();
@ -385,7 +387,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
* rows.
*/
static void
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
{
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
@ -554,8 +556,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
if (completionTag != NULL)
{
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
CompleteCopyQueryTagCompat(completionTag, processedRowCount);
}
}
@ -565,7 +566,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
* tables where we create new shards into which to copy rows.
*/
static void
CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, Oid relationId)
{
/* allocate column values and nulls arrays */
Relation distributedRelation = table_open(relationId, RowExclusiveLock);
@ -739,11 +740,19 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (completionTag != NULL)
{
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
CompleteCopyQueryTagCompat(completionTag, processedRowCount);
}
}
static void CompleteCopyQueryTagCompat(QueryCompletionCompat* completionTag, uint64 processedRowCount) {
#if PG_VERSION_NUM >= PG_VERSION_13
SetQueryCompletion(completionTag, CMDTAG_COPY, processedRowCount);
#else
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
#endif
}
/*
* RemoveOptionFromList removes an option from a list of options in a
@ -2769,7 +2778,7 @@ CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
* further processing is needed.
*/
Node *
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString)
ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, const char *queryString)
{
/*
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
@ -2917,7 +2926,7 @@ CitusCopySelect(CopyStmt *copyStatement)
* table dump.
*/
static void
CitusCopyTo(CopyStmt *copyStatement, char *completionTag)
CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
{
ListCell *shardIntervalCell = NULL;
int64 tuplesSent = 0;
@ -3008,8 +3017,7 @@ CitusCopyTo(CopyStmt *copyStatement, char *completionTag)
if (completionTag != NULL)
{
SafeSnprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT,
tuplesSent);
CompleteCopyQueryTagCompat(completionTag, tuplesSent);
}
}

View File

@ -25,6 +25,8 @@
*-------------------------------------------------------------------------
*/
#include "distributed/pg_version_constants.h"
#include "postgres.h"
#include "miscadmin.h"
@ -88,7 +90,9 @@ static bool IsDropSchemaOrDB(Node *parsetree);
*/
void
CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context,
ParamListInfo params, DestReceiver *dest, char *completionTag)
ParamListInfo params, DestReceiver *dest,
QueryCompletionCompat *completionTag
)
{
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY;
@ -115,7 +119,8 @@ multi_ProcessUtility(PlannedStmt *pstmt,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
char *completionTag)
QueryCompletionCompat *completionTag
)
{
Node *parsetree = pstmt->utilityStmt;
List *ddlJobs = NIL;

View File

@ -15,6 +15,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/version_compat.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "parser/parse_coerce.h"
@ -155,7 +156,7 @@ extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void EndRemoteCopy(int64 shardId, List *connectionList);
extern List * CreateRangeTable(Relation rel, AclMode requiredAccess);
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag,
const char *queryString);
extern void CheckCopyPermissions(CopyStmt *copyStatement);
extern bool IsCopyResultStmt(CopyStmt *copyStatement);

View File

@ -10,6 +10,8 @@
#ifndef MULTI_UTILITY_H
#define MULTI_UTILITY_H
#include "distributed/pg_version_constants.h"
#include "postgres.h"
#include "utils/relcache.h"
@ -51,10 +53,13 @@ typedef struct DDLJob
extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
struct QueryEnvironment *queryEnv, DestReceiver *dest,
char *completionTag);
QueryCompletionCompat *completionTag
);
extern void CitusProcessUtility(Node *node, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag);
DestReceiver *dest,
QueryCompletionCompat * completionTag
);
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);

View File

@ -41,6 +41,7 @@
#define varnoold varnosyn
#define Set_ptr_value(a,b) a->ptr_value = b
#define RangeTableEntryFromNSItem(a) a->p_rte
#define QueryCompletionCompat QueryCompletion
#else /* pre PG13 */
#define lnext_compat(l, r) lnext(r)
#define list_delete_cell_compat(l,c,p) list_delete_cell(l,c,p)
@ -52,6 +53,7 @@
#define ExplainOnePlanCompat(a,b,c,d,e,f,g,h) ExplainOnePlan(a,b,c,d,e,f,g)
#define Set_ptr_value(a,b) a->data.ptr_value = b
#define RangeTableEntryFromNSItem(a) a
#define QueryCompletionCompat char
#endif
#if PG_VERSION_NUM >= PG_VERSION_12