Port master_append_table_to_shard to new connection API

pull/1149/head
Brian Cloutier 2017-01-20 18:49:24 +03:00
parent a3c20e4ea5
commit e3c9483bae
1 changed files with 10 additions and 7 deletions

View File

@ -265,9 +265,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = shardPlacement->nodeName; MultiConnection *connection = GetPlacementConnection(FOR_DML, shardPlacement,
uint32 workerPort = shardPlacement->nodePort; NULL);
List *queryResultList = NIL; PGresult *queryResult = NULL;
int executeCommand = 0;
StringInfo workerAppendQuery = makeStringInfo(); StringInfo workerAppendQuery = makeStringInfo();
appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD, appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD,
@ -275,10 +276,12 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
quote_literal_cstr(sourceTableName), quote_literal_cstr(sourceTableName),
quote_literal_cstr(sourceNodeName), sourceNodePort); quote_literal_cstr(sourceNodeName), sourceNodePort);
/* inserting data should be performed by the current user */ executeCommand = ExecuteOptionalRemoteCommand(connection, workerAppendQuery->data,
queryResultList = ExecuteRemoteQuery(workerName, workerPort, NULL, &queryResult);
workerAppendQuery); PQclear(queryResult);
if (queryResultList != NIL) ForgetResults(connection);
if (executeCommand == 0)
{ {
succeededPlacementList = lappend(succeededPlacementList, shardPlacement); succeededPlacementList = lappend(succeededPlacementList, shardPlacement);
} }