Do not assign InvalidOid for local execution while extracting parameters (#3131)

* do not assign InvalidOid for local execution while extracting parameters

* rename functions

* rename parameter and replace function
pull/3129/head
SaitTalhaNisanci 2019-10-28 14:28:22 +03:00 committed by GitHub
parent dceaddbe4d
commit 29d45bd1b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 123 additions and 13 deletions

View File

@ -2925,8 +2925,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
/* force evaluation of bound params */
paramListInfo = copyParamList(paramListInfo);
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
&parameterValues);
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
parameterTypes, parameterValues);
}

View File

@ -107,6 +107,10 @@ static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskP
static bool TaskAccessesLocalNode(Task *task);
static void LogLocalCommand(const char *command);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
/*
* ExecuteLocalTasks gets a CitusScanState node and list of local tasks.
@ -130,7 +134,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
{
const char **parameterValues = NULL; /* not used anywhere, so decleare here */
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
ExtractParametersForLocalExecution(paramListInfo, &parameterTypes,
&parameterValues);
numParams = paramListInfo->numParams;
@ -173,6 +177,20 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
}
/*
* ExtractParametersForLocalExecution extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* It does not change the oid of custom types, because the query will be run locally.
*/
static void
ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{
ExtractParametersFromParamList(paramListInfo, parameterTypes,
parameterValues, true);
}
/*
* ExtractLocalAndRemoteTasks gets a taskList and generates two
* task lists namely localTaskList and remoteTaskList. The function goes

View File

@ -117,7 +117,6 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect
static bool ConsumeQueryResult(MultiConnection *connection, bool
alwaysThrowErrorOnFailure, int64 *rows);
/*
* AcquireMetadataLocks acquires metadata locks on each of the anchor
* shards in the task list to prevent a shard being modified while it
@ -1725,8 +1724,8 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query,
/* force evaluation of bound params */
paramListInfo = copyParamList(paramListInfo);
ExtractParametersFromParamListInfo(paramListInfo, &parameterTypes,
&parameterValues);
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
querySent = SendRemoteCommandParams(connection, query, parameterCount,
parameterTypes, parameterValues);
@ -1758,12 +1757,30 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query,
/*
* ExtractParametersFromParamListInfo extracts parameter types and values from
* ExtractParametersForRemoteExecution extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* It changes oid of custom types to InvalidOid so that they are the same in workers
* and coordinators.
*/
void
ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
const char ***parameterValues)
{
ExtractParametersFromParamList(paramListInfo, parameterTypes,
parameterValues, false);
}
/*
* ExtractParametersFromParamList extracts parameter types and values from
* the given ParamListInfo structure, and fills parameter type and value arrays.
* If useOriginalCustomTypeOids is true, it uses the original oids for custom types.
*/
void
ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids)
{
int parameterIndex = 0;
int parameterCount = paramListInfo->numParams;
@ -1783,7 +1800,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT
* the master and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
*/
if (parameterData->ptype >= FirstNormalObjectId)
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
{
(*parameterTypes)[parameterIndex] = 0;
}

View File

@ -62,8 +62,12 @@ extern bool TaskListRequires2PC(List *taskList);
extern bool ReadOnlyTask(TaskType taskType);
extern List * BuildPlacementSelectList(int32 groupId, List *relationShardList);
extern List * BuildPlacementDDLList(int32 groupId, List *relationShardList);
extern void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
extern void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
extern void ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues, bool
useOriginalCustomTypeOids);
#endif /* MULTI_ROUTER_EXECUTOR_H_ */

View File

@ -1089,6 +1089,43 @@ LOG: executing the command locally: INSERT INTO local_shard_execution.reference
(1 row)
\c - - - :master_port
-- local execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
CREATE TABLE event_responses (
event_id int,
user_id int,
response invite_resp,
primary key (event_id, user_id)
);
SELECT create_distributed_table('event_responses', 'event_id');
create_distributed_table
--------------------------
(1 row)
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice)
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;
SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses');
create_distributed_function
-----------------------------
(1 row)
-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time)
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
SET client_min_messages TO ERROR;
SET search_path TO public;
DROP SCHEMA local_shard_execution CASCADE;

View File

@ -622,6 +622,40 @@ COMMIT;
WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed;
\c - - - :master_port
-- local execution with custom type
SET citus.replication_model TO "streaming";
SET citus.shard_replication_factor TO 1;
CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe');
CREATE TABLE event_responses (
event_id int,
user_id int,
response invite_resp,
primary key (event_id, user_id)
);
SELECT create_distributed_table('event_responses', 'event_id');
CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp)
LANGUAGE plpgsql AS $fn$
BEGIN
INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice)
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;
SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses');
-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time)
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
CALL register_for_event(16, 1, 'yes');
SET client_min_messages TO ERROR;
SET search_path TO public;
DROP SCHEMA local_shard_execution CASCADE;