From 29d45bd1b912c14a304235468d72bc8a2e4e9480 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 28 Oct 2019 14:28:22 +0300 Subject: [PATCH] Do not assign InvalidOid for local execution while extracting parameters (#3131) * do not assign InvalidOid for local execution while extracting parameters * rename functions * rename parameter and replace function --- .../distributed/executor/adaptive_executor.c | 4 +- .../distributed/executor/local_executor.c | 20 +++++++++- .../executor/multi_router_executor.c | 31 ++++++++++++---- .../distributed/multi_router_executor.h | 10 +++-- .../expected/local_shard_execution.out | 37 +++++++++++++++++++ .../regress/sql/local_shard_execution.sql | 34 +++++++++++++++++ 6 files changed, 123 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 79df46344..8c4dbe7b4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -2925,8 +2925,8 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, /* force evaluation of bound params */ paramListInfo = copyParamList(paramListInfo); - ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, - ¶meterValues); + ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, + ¶meterValues); querySent = SendRemoteCommandParams(connection, queryString, parameterCount, parameterTypes, parameterValues); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 588fcc05b..62dad5e9a 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -107,6 +107,10 @@ static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskP static bool TaskAccessesLocalNode(Task *task); static void LogLocalCommand(const char *command); +static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues); + /* * ExecuteLocalTasks gets a CitusScanState node and list of local tasks. @@ -130,7 +134,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) { const char **parameterValues = NULL; /* not used anywhere, so decleare here */ - ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, + ExtractParametersForLocalExecution(paramListInfo, ¶meterTypes, ¶meterValues); numParams = paramListInfo->numParams; @@ -173,6 +177,20 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } +/* + * ExtractParametersForLocalExecution extracts parameter types and values from + * the given ParamListInfo structure, and fills parameter type and value arrays. + * It does not change the oid of custom types, because the query will be run locally. + */ +static void +ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, + const char ***parameterValues) +{ + ExtractParametersFromParamList(paramListInfo, parameterTypes, + parameterValues, true); +} + + /* * ExtractLocalAndRemoteTasks gets a taskList and generates two * task lists namely localTaskList and remoteTaskList. The function goes diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2072b9713..5514add64 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -117,7 +117,6 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect static bool ConsumeQueryResult(MultiConnection *connection, bool alwaysThrowErrorOnFailure, int64 *rows); - /* * AcquireMetadataLocks acquires metadata locks on each of the anchor * shards in the task list to prevent a shard being modified while it @@ -1725,8 +1724,8 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query, /* force evaluation of bound params */ paramListInfo = copyParamList(paramListInfo); - ExtractParametersFromParamListInfo(paramListInfo, ¶meterTypes, - ¶meterValues); + ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, + ¶meterValues); querySent = SendRemoteCommandParams(connection, query, parameterCount, parameterTypes, parameterValues); @@ -1758,12 +1757,30 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query, /* - * ExtractParametersFromParamListInfo extracts parameter types and values from + * ExtractParametersForRemoteExecution extracts parameter types and values from * the given ParamListInfo structure, and fills parameter type and value arrays. + * It changes oid of custom types to InvalidOid so that they are the same in workers + * and coordinators. */ void -ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, - const char ***parameterValues) +ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, + const char ***parameterValues) +{ + ExtractParametersFromParamList(paramListInfo, parameterTypes, + parameterValues, false); +} + + +/* + * ExtractParametersFromParamList extracts parameter types and values from + * the given ParamListInfo structure, and fills parameter type and value arrays. + * If useOriginalCustomTypeOids is true, it uses the original oids for custom types. + */ +void +ExtractParametersFromParamList(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues, bool + useOriginalCustomTypeOids) { int parameterIndex = 0; int parameterCount = paramListInfo->numParams; @@ -1783,7 +1800,7 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT * the master and worker nodes. Therefore, the worker nodes can * infer the correct oid. */ - if (parameterData->ptype >= FirstNormalObjectId) + if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids) { (*parameterTypes)[parameterIndex] = 0; } diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index c0d7e80e7..e747bbfbf 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -62,8 +62,12 @@ extern bool TaskListRequires2PC(List *taskList); extern bool ReadOnlyTask(TaskType taskType); extern List * BuildPlacementSelectList(int32 groupId, List *relationShardList); extern List * BuildPlacementDDLList(int32 groupId, List *relationShardList); -extern void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, - Oid **parameterTypes, - const char ***parameterValues); +extern void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues); +extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, + Oid **parameterTypes, + const char ***parameterValues, bool + useOriginalCustomTypeOids); #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 172a4cc46..c487b2416 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1089,6 +1089,43 @@ LOG: executing the command locally: INSERT INTO local_shard_execution.reference (1 row) \c - - - :master_port +-- local execution with custom type +SET citus.replication_model TO "streaming"; +SET citus.shard_replication_factor TO 1; +CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); +CREATE TABLE event_responses ( + event_id int, + user_id int, + response invite_resp, + primary key (event_id, user_id) +); +SELECT create_distributed_table('event_responses', 'event_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) +LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice) + ON CONFLICT (event_id, user_id) + DO UPDATE SET response = EXCLUDED.response; +END; +$fn$; +SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses'); + create_distributed_function +----------------------------- + +(1 row) + +-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time) +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_execution CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index e6f7d11ee..7099690cf 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -622,6 +622,40 @@ COMMIT; WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; \c - - - :master_port + +-- local execution with custom type +SET citus.replication_model TO "streaming"; +SET citus.shard_replication_factor TO 1; +CREATE TYPE invite_resp AS ENUM ('yes', 'no', 'maybe'); + +CREATE TABLE event_responses ( + event_id int, + user_id int, + response invite_resp, + primary key (event_id, user_id) +); + +SELECT create_distributed_table('event_responses', 'event_id'); + +CREATE OR REPLACE PROCEDURE register_for_event(p_event_id int, p_user_id int, p_choice invite_resp) +LANGUAGE plpgsql AS $fn$ +BEGIN + INSERT INTO event_responses VALUES (p_event_id, p_user_id, p_choice) + ON CONFLICT (event_id, user_id) + DO UPDATE SET response = EXCLUDED.response; +END; +$fn$; + +SELECT create_distributed_function('register_for_event(int,int,invite_resp)', 'p_event_id', 'event_responses'); + +-- call 6 times to make sure it works after the 5th time(postgres binds values after the 5th time) +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); +CALL register_for_event(16, 1, 'yes'); + SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_execution CASCADE;