From 35cfa5d7b9f8c935c3dfd8274819864c3dd56499 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Fri, 13 Aug 2021 15:32:18 +0300 Subject: [PATCH] Introduces CopyFromState_compat macro CopyState struct is divided into parts and one of them is CopyFromState This macro uses the appropriate one for PG versions Relevant PG commit: c532d15dddff14b01fe9ef1d465013cb8ef186df --- .../distributed/commands/local_multi_copy.c | 13 ++++---- src/backend/distributed/commands/multi_copy.c | 30 ++++++++++--------- .../distributed/executor/multi_executor.c | 5 ++-- src/include/distributed/version_compat.h | 2 ++ 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index ca532ef70..12b53e90a 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -209,13 +209,12 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat Oid shardOid = GetTableLocalShardOid(relationId, shardId); Relation shard = table_open(shardOid, RowExclusiveLock); ParseState *pState = make_parsestate(NULL); - - /* p_rtable of pState is set so that we can check constraints. */ - pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); - - CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, - ReadFromLocalBufferCallback, - copyStatement->attlist, copyStatement->options); + (void) addRangeTableEntryForRelation(pState, shard, AccessShareLock, + NULL, false, false); + CopyFromState cstate = BeginCopyFrom_compat(pState, shard, NULL, NULL, false, + ReadFromLocalBufferCallback, + copyStatement->attlist, + copyStatement->options); CopyFrom(cstate); EndCopyFrom(cstate); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 98449d0d3..ebc9f3fa6 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -520,13 +520,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT } /* initialize copy state to read from COPY data source */ - CopyState copyState = BeginCopyFrom(NULL, - copiedDistributedRelation, - copyStatement->filename, - copyStatement->is_program, - NULL, - copyStatement->attlist, - copyStatement->options); + CopyFromState copyState = BeginCopyFrom_compat(NULL, + copiedDistributedRelation, + NULL, + copyStatement->filename, + copyStatement->is_program, + NULL, + copyStatement->attlist, + copyStatement->options); /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; @@ -617,13 +618,14 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O (ShardConnections *) palloc0(sizeof(ShardConnections)); /* initialize copy state to read from COPY data source */ - CopyState copyState = BeginCopyFrom(NULL, - distributedRelation, - copyStatement->filename, - copyStatement->is_program, - NULL, - copyStatement->attlist, - copyStatement->options); + CopyFromState copyState = BeginCopyFrom_compat(NULL, + distributedRelation, + NULL, + copyStatement->filename, + copyStatement->is_program, + NULL, + copyStatement->attlist, + copyStatement->options); CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); copyOutState->delim = (char *) delimiterCharacter; diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 121766a02..cdbafacc0 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -410,8 +410,9 @@ ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescript location); copyOptions = lappend(copyOptions, copyOption); - CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, - NULL, copyOptions); + CopyFromState copyState = BeginCopyFrom_compat(NULL, stubRelation, NULL, + fileName, false, NULL, + NULL, copyOptions); while (true) { diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 0694219be..994da5f2d 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -48,6 +48,7 @@ #define VACOPTVALUE_DISABLED_COMPAT VACOPTVALUE_DISABLED #define VACOPTVALUE_ENABLED_COMPAT VACOPTVALUE_ENABLED #define IsReindexWithParam_compat(reindex, param) IsReindexWithParam(reindex, param) +#define CopyFromState_compat CopyFromState #else #define AlterTableStmtObjType(a) ((a)->relkind) #define F_NEXTVAL_COMPAT F_NEXTVAL_OID @@ -69,6 +70,7 @@ ((strcmp(param, "concurrently") == 0) ? ((reindex)->concurrent) : \ ((strcmp(param, "verbose") == 0) ? ((reindex)->options == REINDEXOPT_VERBOSE) : \ false)) +#define CopyFromState_compat CopyState #endif #if PG_VERSION_NUM >= PG_VERSION_13