Introduces CopyFromState_compat macro

CopyState struct is divided into parts and one of them is CopyFromState
This macro uses the appropriate one for PG versions

Relevant PG commit:
c532d15dddff14b01fe9ef1d465013cb8ef186df
pull/5209/head
Halil Ozan Akgul 2021-08-13 15:32:18 +03:00 committed by Sait Talha Nisanci
parent 8f34f84ce6
commit 35cfa5d7b9
4 changed files with 27 additions and 23 deletions

View File

@ -209,13 +209,12 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
Oid shardOid = GetTableLocalShardOid(relationId, shardId); Oid shardOid = GetTableLocalShardOid(relationId, shardId);
Relation shard = table_open(shardOid, RowExclusiveLock); Relation shard = table_open(shardOid, RowExclusiveLock);
ParseState *pState = make_parsestate(NULL); ParseState *pState = make_parsestate(NULL);
(void) addRangeTableEntryForRelation(pState, shard, AccessShareLock,
/* p_rtable of pState is set so that we can check constraints. */ NULL, false, false);
pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); CopyFromState cstate = BeginCopyFrom_compat(pState, shard, NULL, NULL, false,
ReadFromLocalBufferCallback,
CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, copyStatement->attlist,
ReadFromLocalBufferCallback, copyStatement->options);
copyStatement->attlist, copyStatement->options);
CopyFrom(cstate); CopyFrom(cstate);
EndCopyFrom(cstate); EndCopyFrom(cstate);

View File

@ -520,13 +520,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
} }
/* initialize copy state to read from COPY data source */ /* initialize copy state to read from COPY data source */
CopyState copyState = BeginCopyFrom(NULL, CopyFromState copyState = BeginCopyFrom_compat(NULL,
copiedDistributedRelation, copiedDistributedRelation,
copyStatement->filename, NULL,
copyStatement->is_program, copyStatement->filename,
NULL, copyStatement->is_program,
copyStatement->attlist, NULL,
copyStatement->options); copyStatement->attlist,
copyStatement->options);
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
@ -617,13 +618,14 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O
(ShardConnections *) palloc0(sizeof(ShardConnections)); (ShardConnections *) palloc0(sizeof(ShardConnections));
/* initialize copy state to read from COPY data source */ /* initialize copy state to read from COPY data source */
CopyState copyState = BeginCopyFrom(NULL, CopyFromState copyState = BeginCopyFrom_compat(NULL,
distributedRelation, distributedRelation,
copyStatement->filename, NULL,
copyStatement->is_program, copyStatement->filename,
NULL, copyStatement->is_program,
copyStatement->attlist, NULL,
copyStatement->options); copyStatement->attlist,
copyStatement->options);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter; copyOutState->delim = (char *) delimiterCharacter;

View File

@ -410,8 +410,9 @@ ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc tupleDescript
location); location);
copyOptions = lappend(copyOptions, copyOption); copyOptions = lappend(copyOptions, copyOption);
CopyState copyState = BeginCopyFrom(NULL, stubRelation, fileName, false, NULL, CopyFromState copyState = BeginCopyFrom_compat(NULL, stubRelation, NULL,
NULL, copyOptions); fileName, false, NULL,
NULL, copyOptions);
while (true) while (true)
{ {

View File

@ -48,6 +48,7 @@
#define VACOPTVALUE_DISABLED_COMPAT VACOPTVALUE_DISABLED #define VACOPTVALUE_DISABLED_COMPAT VACOPTVALUE_DISABLED
#define VACOPTVALUE_ENABLED_COMPAT VACOPTVALUE_ENABLED #define VACOPTVALUE_ENABLED_COMPAT VACOPTVALUE_ENABLED
#define IsReindexWithParam_compat(reindex, param) IsReindexWithParam(reindex, param) #define IsReindexWithParam_compat(reindex, param) IsReindexWithParam(reindex, param)
#define CopyFromState_compat CopyFromState
#else #else
#define AlterTableStmtObjType(a) ((a)->relkind) #define AlterTableStmtObjType(a) ((a)->relkind)
#define F_NEXTVAL_COMPAT F_NEXTVAL_OID #define F_NEXTVAL_COMPAT F_NEXTVAL_OID
@ -69,6 +70,7 @@
((strcmp(param, "concurrently") == 0) ? ((reindex)->concurrent) : \ ((strcmp(param, "concurrently") == 0) ? ((reindex)->concurrent) : \
((strcmp(param, "verbose") == 0) ? ((reindex)->options == REINDEXOPT_VERBOSE) : \ ((strcmp(param, "verbose") == 0) ? ((reindex)->options == REINDEXOPT_VERBOSE) : \
false)) false))
#define CopyFromState_compat CopyState
#endif #endif
#if PG_VERSION_NUM >= PG_VERSION_13 #if PG_VERSION_NUM >= PG_VERSION_13