From a2debe0f0263b9d25757205c5e657ca280db6d9c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 12 Apr 2022 15:16:58 +0200 Subject: [PATCH 01/18] Do not assign distributed transaction ids for local execution In the past, for all modifications on the local execution, we enabled 2PC (with https://github.com/citusdata/citus/commit/6a7ed7b309caca3688c51d28ca4fdf1f165398fd). This also required us to enable coordinated transactions via https://github.com/citusdata/citus/pull/4831 . However, it does have a very substantial impact on the distributed deadlock detection. The distributed deadlock detection is designed to avoid single-statement transactions because they cannot lead to any actual deadlocks. The implementation is to skip backends without distributed transactions are assigned. Now that we assign single statement local executions in the lock graphs, we are conflicting with the design of distributed deadlock detection. In general, we should fix it. However, one might think that it is not a big deal, even if the processes show up in the lock graphs, the deadlock detection should not be causing any false positives. That is false, unless https://github.com/citusdata/citus/issues/1803 is fixed. Now that local processes are considered as a single distributed backend, the lock graphs might find: local execution 1 [tx id: 1] -> any local process [tx id: 0] any local process [tx id: 0] -> local execution 2 [tx id: 2] And, decides that there is a distributed deadlock. This commit is: (a) right thing to do, as local execuion should not need any distributed tx id (b) Eliminates performance issues that might come up with deadlock detection does a lot of unncessary checks (c) After moving local execution after the remote execution via https://github.com/citusdata/citus/pull/4301, the vauge requirement for assigning distributed tx ids are already gone. --- .../distributed/executor/local_executor.c | 13 - .../transaction/transaction_management.c | 13 +- .../expected/local_shard_execution.out | 760 ++++++++++++++++++ src/test/regress/expected/single_node.out | 9 - src/test/regress/multi_mx_schedule | 4 +- .../regress/sql/local_shard_execution.sql | 286 +++++++ src/test/regress/sql/single_node.sql | 4 - 7 files changed, 1060 insertions(+), 29 deletions(-) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7c0426c0a..306b7f3e1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList, uint64 totalRowsProcessed = 0; ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); - /* - * Even if we are executing local tasks, we still enable - * coordinated transaction. This is because - * (a) we might be in a transaction, and the next commands may - * require coordinated transaction - * (b) we might be executing some tasks locally and the others - * via remote execution - * - * Also, there is no harm enabling coordinated transaction even if - * we only deal with local tasks in the transaction. - */ - UseCoordinatedTransaction(); - LocalExecutorLevel++; PG_TRY(); { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0337411cb..5b6c8d116 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -205,8 +205,17 @@ InCoordinatedTransaction(void) void Use2PCForCoordinatedTransaction(void) { - Assert(InCoordinatedTransaction()); - + /* + * If this transaction is also a coordinated + * transaction, use 2PC. Otherwise, this + * state change does nothing. + * + * In other words, when this flag is set, + * we "should" use 2PC when needed (e.g., + * we are in a coordinated transaction and + * the coordinated transaction does a remote + * modification). + */ ShouldCoordinatedTransactionUse2PC = true; } diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 5d29a4e71..98639a953 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1983,6 +1983,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); (1 row) INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -2432,6 +2443,755 @@ DEBUG: Creating router plan 17 | 777 | no (2 rows) +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.log_remote_commands TO ON; +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 2 +NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +ERROR: The transaction is not a coordinated transaction +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 13 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes +(9 rows) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 20 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the -- local host diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index c854ec48a..5051bbee9 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_ (1 row) ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; -NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1 - coordinated_transaction_should_use_2pc ---------------------------------------------------------------------- - t -(1 row) - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index ff5cccf11..a88b4ac0d 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -43,7 +43,9 @@ test: coordinator_evaluation_modify test: coordinator_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution local_shard_execution_replicated +test: multi_mx_modifications local_shard_execution_replicated +# the following test has to be run sequentially +test: local_shard_execution test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: local_shard_copy test: undistribute_table_cascade_mx diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b68863a7f..7a0fc8b8a 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -922,6 +922,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); + +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); + +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + + + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -1114,6 +1125,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response RETURNING *; +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; + + +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; + +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; + +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; + +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + + +SET citus.log_remote_commands TO ON; + +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; + +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 74c857d4e..0e1714003 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -1102,10 +1102,6 @@ BEGIN; SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; From 5fc76611697176293806ac4152517e5604144b37 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 14 Apr 2022 11:58:31 +0200 Subject: [PATCH 02/18] Do not set coordinator's metadatasynced column to false After a disable_node --- .../distributed/metadata/node_metadata.c | 21 ++++++++++++------- .../multi_remove_node_reference_table.out | 19 +++++++++++++++++ .../sql/multi_remove_node_reference_table.sql | 8 ++++++- 3 files changed, 39 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c66b0d3e2..f74305012 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -116,7 +116,7 @@ static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); -static bool UnsetMetadataSyncedForAll(void); +static bool UnsetMetadataSyncedForAllWorkers(void); static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum value); @@ -535,7 +535,7 @@ citus_disable_node(PG_FUNCTION_ARGS) * metadata at this point. Instead, we defer that to citus_activate_node() * where we expect all nodes up and running. */ - if (UnsetMetadataSyncedForAll()) + if (UnsetMetadataSyncedForAllWorkers()) { TriggerNodeMetadataSyncOnCommit(); } @@ -1319,7 +1319,7 @@ citus_update_node(PG_FUNCTION_ARGS) * early, but that's fine, since this will start a retry loop with * 5 second intervals until sync is complete. */ - if (UnsetMetadataSyncedForAll()) + if (UnsetMetadataSyncedForAllWorkers()) { TriggerNodeMetadataSyncOnCommit(); } @@ -2646,15 +2646,15 @@ DatumToString(Datum datum, Oid dataType) /* - * UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata - * nodes to false. It returns true if it updated at least a node. + * UnsetMetadataSyncedForAllWorkers sets the metadatasynced column of all metadata + * worker nodes to false. It returns true if it updated at least a node. */ static bool -UnsetMetadataSyncedForAll(void) +UnsetMetadataSyncedForAllWorkers(void) { bool updatedAtLeastOne = false; - ScanKeyData scanKey[2]; - int scanKeyCount = 2; + ScanKeyData scanKey[3]; + int scanKeyCount = 3; bool indexOK = false; /* @@ -2669,6 +2669,11 @@ UnsetMetadataSyncedForAll(void) ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); + /* coordinator always has the up to date metadata */ + ScanKeyInit(&scanKey[2], Anum_pg_dist_node_groupid, + BTGreaterStrategyNumber, F_INT4GT, + Int32GetDatum(COORDINATOR_GROUP_ID)); + CatalogIndexState indstate = CatalogOpenIndexes(relation); SysScanDesc scanDescriptor = systable_beginscan(relation, diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index b2d38a196..c39b20735 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -978,6 +978,12 @@ ORDER BY shardid ASC; (0 rows) \c - - - :master_port +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT citus_disable_node('localhost', :worker_2_port); citus_disable_node --------------------------------------------------------------------- @@ -997,6 +1003,19 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; 1 (1 row) +-- never mark coordinator metadatasynced = false +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port; + hasmetadata | metadatasynced +--------------------------------------------------------------------- + t | t +(1 row) + +SELECT 1 FROM citus_remove_node('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT shardid, shardstate, shardlength, nodename, nodeport FROM diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 310002b74..37c5a0cb2 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -580,13 +580,19 @@ WHERE ORDER BY shardid ASC; \c - - - :master_port - +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); SELECT citus_disable_node('localhost', :worker_2_port); SELECT public.wait_until_metadata_sync(); -- status after citus_disable_node_and_wait SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; +-- never mark coordinator metadatasynced = false +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port; + +SELECT 1 FROM citus_remove_node('localhost', :master_port); + + SELECT shardid, shardstate, shardlength, nodename, nodeport FROM From 518fb0873e5ef2b8e16a6ed2b5f60171cf90b181 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Thu, 28 Apr 2022 16:52:02 +0300 Subject: [PATCH 03/18] Introduce one new alternative text output to fix flakiness (#5913) Here is a flaky test output that is quite hard to fix: ```diff diff -dU10 -w /home/circleci/project/src/test/regress/expected/isolation_master_update_node_1.out /home/circleci/project/src/test/regress/results/isolation_master_update_node.out --- /home/circleci/project/src/test/regress/expected/isolation_master_update_node_1.out.modified 2022-03-21 19:03:54.237042562 +0000 +++ /home/circleci/project/src/test/regress/results/isolation_master_update_node.out.modified 2022-03-21 19:03:54.257043084 +0000 @@ -49,18 +49,20 @@ step s2-update-node-1-force: <... completed> master_update_node ------------------ (1 row) step s2-abort: ABORT; step s1-abort: ABORT; FATAL: terminating connection due to administrator command -SSL connection has been closed unexpectedly +server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. ``` I could not come up with a solution that would decrease the flakiness in the test outputs. We already have 3 output files for the same test and now I introduced a 4th one. I can also add complex regular expressions that span multiple lines, and normalize these error messages. Feel free to suggest a normalized error message in a comment here. ## Current alternative file contents `isolation_master_update_node.out` ``` step s1-abort: ABORT; FATAL: terminating connection due to administrator command FATAL: terminating connection due to administrator command SSL connection has been closed unexpectedly ``` `isolation_master_update_node_0.out` ``` step s1-abort: ABORT; WARNING: this step had a leftover error message FATAL: terminating connection due to administrator command server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. ``` `isolation_master_update_node_1.out` ``` step s1-abort: ABORT; FATAL: terminating connection due to administrator command SSL connection has been closed unexpectedly ``` new file: `isolation_master_update_node_2.out` ``` step s1-abort: ABORT; FATAL: terminating connection due to administrator command server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. ``` --- .../isolation_master_update_node_2.out | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/test/regress/expected/isolation_master_update_node_2.out diff --git a/src/test/regress/expected/isolation_master_update_node_2.out b/src/test/regress/expected/isolation_master_update_node_2.out new file mode 100644 index 000000000..46e0d23d5 --- /dev/null +++ b/src/test/regress/expected/isolation_master_update_node_2.out @@ -0,0 +1,68 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1: + -- update a specific node by address + SELECT master_update_node(nodeid, 'localhost', nodeport + 10) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s1-abort: ABORT; +step s2-update-node-1: <... completed> +master_update_node +--------------------------------------------------------------------- + +(1 row) + +step s2-abort: ABORT; +master_remove_node +--------------------------------------------------------------------- + + +(2 rows) + + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1-force: + -- update a specific node by address (force) + SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s2-update-node-1-force: <... completed> +master_update_node +--------------------------------------------------------------------- + +(1 row) + +step s2-abort: ABORT; +step s1-abort: ABORT; +FATAL: terminating connection due to administrator command +server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. + +master_remove_node +--------------------------------------------------------------------- + + +(2 rows) + From f944722c6a51274510abdb23bb9f05137450a0b3 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 10:38:30 -0700 Subject: [PATCH 04/18] PG15: Use RelationGetSmgr() instead of RelationOpenSmgr(). Handle PG commit f10f0ae420. --- src/backend/columnar/columnar_debug.c | 2 -- src/backend/columnar/columnar_metadata.c | 5 ++--- src/backend/columnar/columnar_storage.c | 14 ++++++-------- src/backend/columnar/columnar_tableam.c | 22 +++++++--------------- src/backend/distributed/test/fake_am.c | 8 +++----- src/include/pg_version_compat.h | 19 +++++++++++++++++++ 6 files changed, 37 insertions(+), 33 deletions(-) diff --git a/src/backend/columnar/columnar_debug.c b/src/backend/columnar/columnar_debug.c index 220d259fe..e6b19f768 100644 --- a/src/backend/columnar/columnar_debug.c +++ b/src/backend/columnar/columnar_debug.c @@ -115,8 +115,6 @@ columnar_storage_info(PG_FUNCTION_ARGS) RelationGetRelationName(rel)))); } - RelationOpenSmgr(rel); - Datum values[STORAGE_INFO_NATTS] = { 0 }; bool nulls[STORAGE_INFO_NATTS] = { 0 }; diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 62d64861f..ccea52b6d 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1738,11 +1738,10 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade) return; } - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks < 2) { - ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId()); + ColumnarStorageInit(RelationGetSmgr(rel), ColumnarMetadataNewStorageId()); return; } diff --git a/src/backend/columnar/columnar_storage.c b/src/backend/columnar/columnar_storage.c index 71fc75ccb..9712e7160 100644 --- a/src/backend/columnar/columnar_storage.c +++ b/src/backend/columnar/columnar_storage.c @@ -44,6 +44,8 @@ #include "storage/bufmgr.h" #include "storage/lmgr.h" +#include "pg_version_compat.h" + #include "columnar/columnar.h" #include "columnar/columnar_storage.h" @@ -354,8 +356,7 @@ ColumnarStorageGetReservedOffset(Relation rel, bool force) bool ColumnarStorageIsCurrent(Relation rel) { - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks < 2) { @@ -439,8 +440,7 @@ ColumnarStorageReserveData(Relation rel, uint64 amount) PhysicalAddr final = LogicalToPhysical(nextReservation - 1); /* extend with new pages */ - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); while (nblocks <= final.blockno) { @@ -547,8 +547,7 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation) rel->rd_id, newDataReservation); } - RelationOpenSmgr(rel); - BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (old_rel_pages == 0) { /* nothing to do */ @@ -627,8 +626,7 @@ ColumnarOverwriteMetapage(Relation relation, ColumnarMetapage columnarMetapage) static ColumnarMetapage ColumnarMetapageRead(Relation rel, bool force) { - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks == 0) { /* diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index b6179ac8c..e64b7613e 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -913,8 +913,7 @@ columnar_relation_nontransactional_truncate(Relation rel) RelationTruncate(rel, 0); uint64 storageId = ColumnarMetadataNewStorageId(); - RelationOpenSmgr(rel); - ColumnarStorageInit(rel->rd_smgr, storageId); + ColumnarStorageInit(RelationGetSmgr(rel), storageId); } @@ -1136,8 +1135,7 @@ LogRelationStats(Relation rel, int elevel) totalStripeLength += stripe->dataLength; } - RelationOpenSmgr(rel); - uint64 relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); RelationCloseSmgr(rel); Datum storageId = DirectFunctionCall1(columnar_relation_storageid, @@ -1239,8 +1237,7 @@ TruncateColumnar(Relation rel, int elevel) uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1, ColumnarFirstLogicalOffset); - RelationOpenSmgr(rel); - BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (!ColumnarStorageTruncate(rel, newDataReservation)) { @@ -1248,8 +1245,7 @@ TruncateColumnar(Relation rel, int elevel) return; } - RelationOpenSmgr(rel); - BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); /* * We can release the exclusive lock as soon as we have truncated. @@ -1784,20 +1780,17 @@ columnar_relation_size(Relation rel, ForkNumber forkNumber) uint64 nblocks = 0; - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(rel); - /* InvalidForkNumber indicates returning the size for all forks */ if (forkNumber == InvalidForkNumber) { for (int i = 0; i < MAX_FORKNUM; i++) { - nblocks += smgrnblocks(rel->rd_smgr, i); + nblocks += smgrnblocks(RelationGetSmgr(rel), i); } } else { - nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber); } return nblocks * BLCKSZ; @@ -1819,8 +1812,7 @@ columnar_estimate_rel_size(Relation rel, int32 *attr_widths, double *allvisfrac) { CheckCitusColumnarVersion(ERROR); - RelationOpenSmgr(rel); - *pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + *pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); *tuples = ColumnarTableRowCount(rel); /* diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c index ce7784510..63d5a69fe 100644 --- a/src/backend/distributed/test/fake_am.c +++ b/src/backend/distributed/test/fake_am.c @@ -20,6 +20,7 @@ #include "postgres.h" #include "distributed/pg_version_constants.h" +#include "pg_version_compat.h" #include "access/amapi.h" @@ -446,20 +447,17 @@ fake_relation_size(Relation rel, ForkNumber forkNumber) uint64 nblocks = 0; - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(rel); - /* InvalidForkNumber indicates returning the size for all forks */ if (forkNumber == InvalidForkNumber) { for (int i = 0; i < MAX_FORKNUM; i++) { - nblocks += smgrnblocks(rel->rd_smgr, i); + nblocks += smgrnblocks(RelationGetSmgr(rel), i); } } else { - nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber); } return nblocks * BLCKSZ; diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index ad7a8bbb0..e298138d9 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,25 @@ #include "distributed/pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_15 +#else + +#include "storage/smgr.h" +#include "utils/rel.h" + +static inline SMgrRelation +RelationGetSmgr(Relation rel) +{ + if (unlikely(rel->rd_smgr == NULL)) + { + smgrsetowner(&(rel->rd_smgr), smgropen(rel->rd_node, rel->rd_backend)); + } + return rel->rd_smgr; +} + + +#endif + #if PG_VERSION_NUM >= PG_VERSION_14 #define AlterTableStmtObjType_compat(a) ((a)->objtype) #define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b) From ac952b2cc2f7870d1c55c47b230d4f10b3060681 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 10:49:44 -0700 Subject: [PATCH 05/18] PG15: Handle extra argument to ExecARDeleteTriggers. Account for PG commit ba9a7e3921. Introduce ExecARDeleteTriggers_compat. --- src/backend/columnar/columnar_metadata.c | 2 +- src/include/columnar/columnar_version_compat.h | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index ccea52b6d..8b9b9efc6 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1433,7 +1433,7 @@ DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple) simple_heap_delete(state->rel, tid); /* execute AFTER ROW DELETE Triggers to enforce constraints */ - ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL); + ExecARDeleteTriggers_compat(estate, resultRelInfo, tid, NULL, NULL, false); } diff --git a/src/include/columnar/columnar_version_compat.h b/src/include/columnar/columnar_version_compat.h index 45b8a0e55..611b40d15 100644 --- a/src/include/columnar/columnar_version_compat.h +++ b/src/include/columnar/columnar_version_compat.h @@ -14,6 +14,14 @@ #include "distributed/pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_15 +#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \ + ExecARDeleteTriggers(a, b, c, d, e, f) +#else +#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \ + ExecARDeleteTriggers(a, b, c, d, e) +#endif + #if PG_VERSION_NUM >= PG_VERSION_14 #define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \ ColumnarProcessUtility(a, b, c, d, e, f, g, h) From 1c1ef7ab8d989af88bf3adba9d01c50321049142 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 10:54:15 -0700 Subject: [PATCH 06/18] PG15: Handle extra argument to RelationCreateStorage. Account for PG commit 9c08aea6a309. Introduce RelationCreateStorage_compat. --- src/backend/columnar/columnar_tableam.c | 2 +- src/backend/distributed/test/fake_am.c | 2 +- src/include/pg_version_compat.h | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index e64b7613e..9967a265e 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -881,7 +881,7 @@ columnar_relation_set_new_filenode(Relation rel, *freezeXid = RecentXmin; *minmulti = GetOldestMultiXactId(); - SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true); ColumnarStorageInit(srel, ColumnarMetadataNewStorageId()); InitColumnarOptions(rel->rd_id); diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c index 63d5a69fe..5a8ede316 100644 --- a/src/backend/distributed/test/fake_am.c +++ b/src/backend/distributed/test/fake_am.c @@ -326,7 +326,7 @@ fake_relation_set_new_filenode(Relation rel, */ *minmulti = GetOldestMultiXactId(); - SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true); /* * If required, set up an init fork for an unlogged table so that it can diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index e298138d9..dc21acf5f 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -14,11 +14,15 @@ #include "distributed/pg_version_constants.h" #if PG_VERSION_NUM >= PG_VERSION_15 +#define ProcessCompletedNotifies() +#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b, c) #else #include "storage/smgr.h" #include "utils/rel.h" +#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) + static inline SMgrRelation RelationGetSmgr(Relation rel) { From 9915fe8a1a8a80c54054c1e9ad94510c850b0fa9 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 10:57:26 -0700 Subject: [PATCH 07/18] PG15: Handle different ways to get publication actions. Account for PG commit 52e4f0cd47. --- src/backend/columnar/columnar_tableam.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 9967a265e..a16210fb7 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -2292,18 +2292,30 @@ detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull) static void ColumnarCheckLogicalReplication(Relation rel) { + bool pubActionInsert = false; + if (!is_publishable_relation(rel)) { return; } +#if PG_VERSION_NUM >= PG_VERSION_15 + { + PublicationDesc pubdesc; + + RelationBuildPublicationDesc(rel, &pubdesc); + pubActionInsert = pubdesc.pubactions.pubinsert; + } +#else if (rel->rd_pubactions == NULL) { GetRelationPublicationActions(rel); Assert(rel->rd_pubactions != NULL); } + pubActionInsert = rel->rd_pubactions->pubinsert; +#endif - if (rel->rd_pubactions->pubinsert) + if (pubActionInsert) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( From 70c915a0f27ade36e0239e621d5569d5e2021cae Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 11:01:10 -0700 Subject: [PATCH 08/18] PG15: Handle data type changes in pg_collation. Account for PG commit 54637508f8. --- src/backend/distributed/commands/collation.c | 25 ++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index c284404ce..930d7c770 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -60,12 +60,30 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(heapTuple); char collprovider = collationForm->collprovider; - const char *collcollate = NameStr(collationForm->collcollate); - const char *collctype = NameStr(collationForm->collctype); Oid collnamespace = collationForm->collnamespace; const char *collname = NameStr(collationForm->collname); bool collisdeterministic = collationForm->collisdeterministic; +#if PG_VERSION_NUM >= PG_VERSION_15 + bool isnull; + Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate, + &isnull); + Assert(!isnull); + char *collcollate = TextDatumGetCString(datum); + datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collctype, &isnull); + Assert(!isnull); + char *collctype = TextDatumGetCString(datum); +#else + + /* + * In versions before 15, collcollate and collctype were type "name". Use + * pstrdup() to match the interface of 15 so that we consistently free the + * result later. + */ + char *collcollate = pstrdup(NameStr(collationForm->collcollate)); + char *collctype = pstrdup(NameStr(collationForm->collctype)); +#endif + if (collowner != NULL) { *collowner = collationForm->collowner; @@ -103,6 +121,9 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati quote_literal_cstr(collctype)); } + pfree(collcollate); + pfree(collctype); + if (!collisdeterministic) { appendStringInfoString(&collationNameDef, ", deterministic = false"); From 26f5e20580b087e517350d4dede6d8fb77090a91 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 21:34:39 -0700 Subject: [PATCH 09/18] PG15: update integer parsing APIs. Account for PG commits 3c6f8c011f and cfc7191dfe. --- src/backend/distributed/commands/function.c | 2 +- .../distributed/executor/adaptive_executor.c | 3 +-- .../distributed/operations/shard_rebalancer.c | 5 ++--- .../distributed/operations/stage_protocol.c | 2 +- .../distributed/test/shard_rebalancer.c | 1 - .../distributed/transaction/lock_graph.c | 2 +- .../transaction/remote_transaction.c | 2 +- .../worker/worker_data_fetch_protocol.c | 2 +- src/include/pg_version_compat.h | 18 ++++++++++++++++++ 9 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 879aa4770..51141c4d5 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -490,7 +490,7 @@ GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, distributionArgumentName++; /* throws error if the input is not an integer */ - distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0); + distributionArgumentIndex = pg_strtoint32(distributionArgumentName); if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2b32916ee..347bd4d35 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -171,7 +171,6 @@ #include "storage/fd.h" #include "storage/latch.h" #include "utils/builtins.h" -#include "utils/int8.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -4513,7 +4512,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) /* if there are multiple replicas, make sure to consider only one */ if (storeRows && *currentAffectedTupleString != '\0') { - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + currentAffectedTupleCount = pg_strtoint64(currentAffectedTupleString); Assert(currentAffectedTupleCount >= 0); execution->rowsProcessed += currentAffectedTupleCount; } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 16ee50c52..ecb0d6673 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -54,7 +54,6 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" -#include "utils/int8.h" #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1396,9 +1395,9 @@ GetShardStatistics(MultiConnection *connection, HTAB *shardIds) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { char *shardIdString = PQgetvalue(result, rowIndex, 0); - uint64 shardId = pg_strtouint64(shardIdString, NULL, 10); + uint64 shardId = strtou64(shardIdString, NULL, 10); char *sizeString = PQgetvalue(result, rowIndex, 1); - uint64 totalSize = pg_strtouint64(sizeString, NULL, 10); + uint64 totalSize = strtou64(sizeString, NULL, 10); ShardStatistics *statistics = hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index d6e9c0f2a..8f77205cb 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -923,7 +923,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam } errno = 0; - uint64 tableSize = pg_strtouint64(tableSizeString, &tableSizeStringEnd, 0); + uint64 tableSize = strtou64(tableSizeString, &tableSizeStringEnd, 0); if (errno != 0 || (*tableSizeStringEnd) != '\0') { PQclear(queryResult); diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index ea770cb6e..f3640f415 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -28,7 +28,6 @@ #include "funcapi.h" #include "miscadmin.h" #include "utils/builtins.h" -#include "utils/int8.h" #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/memutils.h" diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 62b5e4e04..e672dafd8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -309,7 +309,7 @@ ParseIntField(PGresult *result, int rowIndex, int colIndex) char *resultString = PQgetvalue(result, rowIndex, colIndex); - return pg_strtouint64(resultString, NULL, 10); + return strtou64(resultString, NULL, 10); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 2859ec4c9..55a560575 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1408,7 +1408,7 @@ ParsePreparedTransactionName(char *preparedTransactionName, /* step ahead of the current '_' character */ ++currentCharPointer; - *transactionNumber = pg_strtouint64(currentCharPointer, NULL, 10); + *transactionNumber = strtou64(currentCharPointer, NULL, 10); if ((*transactionNumber == 0 && errno != 0) || (*transactionNumber == ULLONG_MAX && errno == ERANGE)) { diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 5e78c19ce..cbc7af89a 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -387,7 +387,7 @@ ExtractShardIdFromTableName(const char *tableName, bool missingOk) shardIdString++; errno = 0; - uint64 shardId = pg_strtouint64(shardIdString, &shardIdStringEnd, 0); + uint64 shardId = strtou64(shardIdString, &shardIdStringEnd, 0); if (errno != 0 || (*shardIdStringEnd != '\0')) { diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index dc21acf5f..fd7767a2b 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -19,10 +19,28 @@ #else #include "storage/smgr.h" +#include "utils/int8.h" #include "utils/rel.h" + +#ifdef HAVE_LONG_INT_64 +#define strtoi64(str, endptr, base) ((int64) strtol(str, endptr, base)) +#define strtou64(str, endptr, base) ((uint64) strtoul(str, endptr, base)) +#else +#define strtoi64(str, endptr, base) ((int64) strtoll(str, endptr, base)) +#define strtou64(str, endptr, base) ((uint64) strtoull(str, endptr, base)) +#endif #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) +static inline int64 +pg_strtoint64(char *s) +{ + int64 result; + (void) scanint8(s, false, &result); + return result; +} + + static inline SMgrRelation RelationGetSmgr(Relation rel) { From 3799f957429f7ecc539c0cbd0c2835ff0eb66631 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sat, 9 Apr 2022 11:01:49 -0700 Subject: [PATCH 10/18] PG15: Value -> String, Integer, Float. Handle PG commit 639a86e36a. --- src/backend/distributed/commands/collation.c | 4 ++- src/backend/distributed/commands/database.c | 2 +- src/backend/distributed/commands/extension.c | 6 ++--- .../distributed/commands/foreign_server.c | 4 +-- src/backend/distributed/commands/function.c | 6 ++--- src/backend/distributed/commands/multi_copy.c | 2 +- src/backend/distributed/commands/role.c | 17 +++++++++++- src/backend/distributed/commands/schema.c | 4 +-- src/backend/distributed/commands/statistics.c | 4 +-- src/backend/distributed/commands/trigger.c | 20 +++++++------- src/backend/distributed/commands/type.c | 2 +- src/backend/distributed/commands/vacuum.c | 2 +- .../deparser/deparse_database_stmts.c | 4 ++- .../deparser/deparse_foreign_server_stmts.c | 2 +- .../deparser/deparse_function_stmts.c | 12 ++++----- .../deparser/deparse_schema_stmts.c | 2 +- .../deparser/deparse_statistics_stmts.c | 6 ++--- .../deparser/deparse_text_search.c | 2 +- .../distributed/metadata/metadata_cache.c | 6 +++-- src/backend/distributed/planner/cte_inline.c | 3 ++- .../distributed/planner/distributed_planner.c | 2 +- .../planner/multi_physical_planner.c | 2 +- .../planner/multi_router_planner.c | 8 +++--- .../distributed/planner/recursive_planning.c | 2 +- .../distributed/relay/relay_event_utility.c | 12 ++++----- .../test/shared_connection_counters.c | 26 +++++++++++++++++-- src/include/pg_version_compat.h | 2 ++ 27 files changed, 105 insertions(+), 59 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 930d7c770..bc642bed1 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -10,6 +10,8 @@ */ #include "postgres.h" +#include "pg_version_compat.h" + #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_collation.h" @@ -521,7 +523,7 @@ GenerateBackupNameForCollationCollision(const ObjectAddress *address) return NULL; } Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(collationTuple); - Value *namespace = makeString(get_namespace_name(collationForm->collnamespace)); + String *namespace = makeString(get_namespace_name(collationForm->collnamespace)); ReleaseSysCache(collationTuple); while (true) diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 59902b038..76d74119e 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -115,7 +115,7 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok) AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); Assert(stmt->objectType == OBJECT_DATABASE); - Oid databaseOid = get_database_oid(strVal((Value *) stmt->object), missing_ok); + Oid databaseOid = get_database_oid(strVal((String *) stmt->object), missing_ok); ObjectAddress address = { 0 }; ObjectAddressSet(address, DatabaseRelationId, databaseOid); diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index c1cf06039..e61d62054 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -295,7 +295,7 @@ FilterDistributedExtensions(List *extensionObjectList) { List *extensionNameList = NIL; - Value *objectName = NULL; + String *objectName = NULL; foreach_ptr(objectName, extensionObjectList) { const char *extensionName = strVal(objectName); @@ -334,7 +334,7 @@ ExtensionNameListToObjectAddressList(List *extensionObjectList) { List *extensionObjectAddressList = NIL; - Value *objectName; + String *objectName; foreach_ptr(objectName, extensionObjectList) { /* @@ -671,7 +671,7 @@ IsDropCitusExtensionStmt(Node *parseTree) } /* now that we have a DropStmt, check if citus extension is among the objects to dropped */ - Value *objectName; + String *objectName; foreach_ptr(objectName, dropStmt->objects) { const char *extensionName = strVal(objectName); diff --git a/src/backend/distributed/commands/foreign_server.c b/src/backend/distributed/commands/foreign_server.c index 0777814df..76acf7f66 100644 --- a/src/backend/distributed/commands/foreign_server.c +++ b/src/backend/distributed/commands/foreign_server.c @@ -190,7 +190,7 @@ PreprocessDropForeignServerStmt(Node *node, const char *queryString, Assert(list_length(stmt->objects) == 1); - Value *serverValue = linitial(stmt->objects); + String *serverValue = linitial(stmt->objects); ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); /* unmark distributed server */ @@ -362,7 +362,7 @@ RecreateForeignServerStmt(Oid serverId) static bool NameListHasDistributedServer(List *serverNames) { - Value *serverValue = NULL; + String *serverValue = NULL; foreach_ptr(serverValue, serverNames) { ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 51141c4d5..cda28d375 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1893,7 +1893,7 @@ AlterFunctionSchemaStmtObjectAddress(Node *node, bool missing_ok) */ /* the name of the function is the last in the list of names */ - Value *funcNameStr = lfirst(list_tail(names)); + String *funcNameStr = lfirst(list_tail(names)); List *newNames = list_make2(makeString(stmt->newschema), funcNameStr); /* @@ -1938,8 +1938,8 @@ GenerateBackupNameForProcCollision(const ObjectAddress *address) char *newName = palloc0(NAMEDATALEN); char suffix[NAMEDATALEN] = { 0 }; int count = 0; - Value *namespace = makeString(get_namespace_name(get_func_namespace( - address->objectId))); + String *namespace = makeString(get_namespace_name(get_func_namespace( + address->objectId))); char *baseName = get_func_name(address->objectId); int baseLength = strlen(baseName); Oid *argtypes = NULL; diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d2d7d9b23..6036ec00a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2009,7 +2009,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); - Value *columnNameValue = makeString(columnName); + String *columnNameValue = makeString(columnName); attributeList = lappend(attributeList, columnNameValue); } diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 608dc0060..af0a3a856 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -150,7 +150,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString) if (encryptedPassword != NULL) { - Value *encryptedPasswordValue = makeString((char *) encryptedPassword); + String *encryptedPasswordValue = makeString((char *) encryptedPassword); option->arg = (Node *) encryptedPasswordValue; } else @@ -741,8 +741,13 @@ makeStringConst(char *str, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.sval.type = T_String; + n->val.sval.sval = str; +#else n->val.type = T_String; n->val.val.str = str; +#endif n->location = location; return (Node *) n; @@ -759,8 +764,13 @@ makeIntConst(int val, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.ival.type = T_Integer; + n->val.ival.ival = val; +#else n->val.type = T_Integer; n->val.val.ival = val; +#endif n->location = location; return (Node *) n; @@ -777,8 +787,13 @@ makeFloatConst(char *str, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.fval.type = T_Float; + n->val.fval.fval = str; +#else n->val.type = T_Float; n->val.val.str = str; +#endif n->location = location; return (Node *) n; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index cdee81349..32aac0106 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -107,7 +107,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, EnsureSequentialMode(OBJECT_SCHEMA); - Value *schemaVal = NULL; + String *schemaVal = NULL; foreach_ptr(schemaVal, distributedSchemas) { if (SchemaHasDistributedTableWithFKey(strVal(schemaVal))) @@ -288,7 +288,7 @@ FilterDistributedSchemas(List *schemas) { List *distributedSchemas = NIL; - Value *schemaValue = NULL; + String *schemaValue = NULL; foreach_ptr(schemaValue, schemas) { const char *schemaName = strVal(schemaValue); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 79a758c10..984d63969 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -295,7 +295,7 @@ PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString) AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); Assert(stmt->objectType == OBJECT_STATISTIC_EXT); - Value *statName = llast((List *) stmt->object); + String *statName = llast((List *) stmt->object); Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), statName), false); Oid relationId = GetRelIdByStatsOid(statsOid); @@ -328,7 +328,7 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); ObjectAddress address = { 0 }; - Value *statName = llast((List *) stmt->object); + String *statName = llast((List *) stmt->object); Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), statName), missingOk); ObjectAddressSet(address, StatisticExtRelationId, statsOid); diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index a277cb372..2e3c107fa 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -44,8 +44,8 @@ /* local function forward declarations */ static bool IsCreateCitusTruncateTriggerStmt(CreateTrigStmt *createTriggerStmt); -static Value * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt * - alterTriggerDependsStmt); +static String * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt * + alterTriggerDependsStmt); static void ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt); static RangeVar * GetDropTriggerStmtRelation(DropStmt *dropTriggerStmt); static void ExtractDropStmtTriggerAndRelationName(DropStmt *dropTriggerStmt, @@ -416,7 +416,7 @@ PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString, * workers */ - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); ereport(ERROR, (errmsg( "Triggers \"%s\" on distributed tables and local tables added to metadata " @@ -454,7 +454,7 @@ PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString) EnsureCoordinator(); ErrorOutForTriggerIfNotSupported(relationId); - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue), queryString); @@ -476,7 +476,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS char **relationName = &(relation->relname); AppendShardIdToName(relationName, shardId); - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); AppendShardIdToName(&strVal(triggerNameValue), shardId); @@ -489,7 +489,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS * GetAlterTriggerDependsTriggerName returns Value object for the trigger * name that given AlterObjectDependsStmt is executed for. */ -static Value * +static String * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDependsStmt) { List *triggerObjectNameList = (List *) alterTriggerDependsStmt->object; @@ -503,7 +503,7 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen * be the name of the trigger in either before or after standard process * utility. */ - Value *triggerNameValue = llast(triggerObjectNameList); + String *triggerNameValue = llast(triggerObjectNameList); return triggerNameValue; } @@ -642,12 +642,12 @@ DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName, uint64 ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, &relationName); AppendShardIdToName(&triggerName, shardId); - Value *triggerNameValue = makeString(triggerName); + String *triggerNameValue = makeString(triggerName); AppendShardIdToName(&relationName, shardId); - Value *relationNameValue = makeString(relationName); + String *relationNameValue = makeString(relationName); - Value *schemaNameValue = makeString(pstrdup(schemaName)); + String *schemaNameValue = makeString(pstrdup(schemaName)); List *shardTriggerNameList = list_make3(schemaNameValue, relationNameValue, triggerNameValue); diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 4973aafd0..e80f5b14e 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -878,7 +878,7 @@ AlterTypeSchemaStmtObjectAddress(Node *node, bool missing_ok) */ /* typename is the last in the list of names */ - Value *typeNameStr = lfirst(list_tail(names)); + String *typeNameStr = lfirst(list_tail(names)); /* * we don't error here either, as the error would be not a good user facing diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 7f1e04f76..9b1e0bfb3 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -432,7 +432,7 @@ DeparseVacuumColumnNames(List *columnNameList) appendStringInfoString(columnNames, " ("); - Value *columnName = NULL; + String *columnName = NULL; foreach_ptr(columnName, columnNameList) { appendStringInfo(columnNames, "%s,", strVal(columnName)); diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 0ebc69238..b72787993 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -11,6 +11,8 @@ #include "postgres.h" +#include "pg_version_compat.h" + #include "catalog/namespace.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" @@ -44,6 +46,6 @@ AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) appendStringInfo(buf, "ALTER DATABASE %s OWNER TO %s;", - quote_identifier(strVal((Value *) stmt->object)), + quote_identifier(strVal((String *) stmt->object)), RoleSpecString(stmt->newowner, true)); } diff --git a/src/backend/distributed/deparser/deparse_foreign_server_stmts.c b/src/backend/distributed/deparser/deparse_foreign_server_stmts.c index 62c5f98c8..805f24f90 100644 --- a/src/backend/distributed/deparser/deparse_foreign_server_stmts.c +++ b/src/backend/distributed/deparser/deparse_foreign_server_stmts.c @@ -223,7 +223,7 @@ AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt) static void AppendServerNames(StringInfo buf, DropStmt *stmt) { - Value *serverValue = NULL; + String *serverValue = NULL; foreach_ptr(serverValue, stmt->objects) { const char *serverString = quote_identifier(strVal(serverValue)); diff --git a/src/backend/distributed/deparser/deparse_function_stmts.c b/src/backend/distributed/deparser/deparse_function_stmts.c index d58faabfb..93bb65b4d 100644 --- a/src/backend/distributed/deparser/deparse_function_stmts.c +++ b/src/backend/distributed/deparser/deparse_function_stmts.c @@ -396,18 +396,18 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) appendStringInfo(buf, " SET %s =", quote_identifier(setStmt->name)); } - Value value = varArgConst->val; - switch (value.type) + Node *value = (Node *) &varArgConst->val; + switch (value->type) { case T_Integer: { - appendStringInfo(buf, " %d", intVal(&value)); + appendStringInfo(buf, " %d", intVal(value)); break; } case T_Float: { - appendStringInfo(buf, " %s", strVal(&value)); + appendStringInfo(buf, " %s", strVal(value)); break; } @@ -428,7 +428,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) Datum interval = DirectFunctionCall3(interval_in, - CStringGetDatum(strVal(&value)), + CStringGetDatum(strVal(value)), ObjectIdGetDatum(InvalidOid), Int32GetDatum(typmod)); @@ -440,7 +440,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) else { appendStringInfo(buf, " %s", quote_literal_cstr(strVal( - &value))); + value))); } break; } diff --git a/src/backend/distributed/deparser/deparse_schema_stmts.c b/src/backend/distributed/deparser/deparse_schema_stmts.c index ebc76d5e8..21ea16fbe 100644 --- a/src/backend/distributed/deparser/deparse_schema_stmts.c +++ b/src/backend/distributed/deparser/deparse_schema_stmts.c @@ -126,7 +126,7 @@ AppendDropSchemaStmt(StringInfo buf, DropStmt *stmt) appendStringInfoString(buf, "IF EXISTS "); } - Value *schemaValue = NULL; + String *schemaValue = NULL; foreach_ptr(schemaValue, stmt->objects) { const char *schemaString = quote_identifier(strVal(schemaValue)); diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index fb1e67977..90828cc67 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -200,10 +200,10 @@ AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt) { - Value *schemaNameVal = (Value *) linitial(stmt->defnames); + String *schemaNameVal = (String *) linitial(stmt->defnames); const char *schemaName = quote_identifier(strVal(schemaNameVal)); - Value *statNameVal = (Value *) lsecond(stmt->defnames); + String *statNameVal = (String *) lsecond(stmt->defnames); const char *statName = quote_identifier(strVal(statNameVal)); appendStringInfo(buf, "%s.%s", schemaName, statName); @@ -220,7 +220,7 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt) appendStringInfoString(buf, " ("); - Value *statType = NULL; + String *statType = NULL; foreach_ptr(statType, stmt->stat_types) { appendStringInfoString(buf, strVal(statType)); diff --git a/src/backend/distributed/deparser/deparse_text_search.c b/src/backend/distributed/deparser/deparse_text_search.c index 43d162678..e0c750d0d 100644 --- a/src/backend/distributed/deparser/deparse_text_search.c +++ b/src/backend/distributed/deparser/deparse_text_search.c @@ -464,7 +464,7 @@ DeparseTextSearchDictionaryCommentStmt(Node *node) static void AppendStringInfoTokentypeList(StringInfo buf, List *tokentypes) { - Value *tokentype = NULL; + String *tokentype = NULL; bool first = true; foreach_ptr(tokentype, tokentypes) { diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 63c2f8695..af80df0c4 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -7,7 +7,9 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" #include "distributed/pg_version_constants.h" +#include "pg_version_compat.h" #include "stdint.h" #include "postgres.h" @@ -2864,8 +2866,8 @@ CurrentUserName(void) Oid LookupTypeOid(char *schemaNameSting, char *typeNameString) { - Value *schemaName = makeString(schemaNameSting); - Value *typeName = makeString(typeNameString); + String *schemaName = makeString(schemaNameSting); + String *typeName = makeString(typeNameString); List *qualifiedName = list_make2(schemaName, typeName); TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName); diff --git a/src/backend/distributed/planner/cte_inline.c b/src/backend/distributed/planner/cte_inline.c index 4a3ba156f..2356ebf48 100644 --- a/src/backend/distributed/planner/cte_inline.c +++ b/src/backend/distributed/planner/cte_inline.c @@ -12,6 +12,7 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "pg_version_compat.h" #include "distributed/pg_version_constants.h" #include "distributed/cte_inline.h" @@ -309,7 +310,7 @@ inline_cte_walker(Node *node, inline_cte_walker_context *context) */ if (columnAliasCount >= columnIndex) { - Value *columnAlias = (Value *) list_nth(columnAliasList, columnIndex - 1); + String *columnAlias = (String *) list_nth(columnAliasList, columnIndex - 1); Assert(IsA(columnAlias, String)); TargetEntry *targetEntry = list_nth(rte->subquery->targetList, columnIndex - 1); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 6e053cecd..fca03eef8 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1353,7 +1353,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) TargetEntry *targetEntry = NULL; foreach_ptr(targetEntry, customScan->scan.plan.targetlist) { - Value *columnName = makeString(targetEntry->resname); + String *columnName = makeString(targetEntry->resname); columnNameList = lappend(columnNameList, columnName); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a26bf158d..b8d87c4b7 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -798,7 +798,7 @@ DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId) appendStringInfo(columnName, UINT64_FORMAT "_", generatingJobId); appendStringInfo(columnName, "%u", columnIndex); - Value *columnValue = makeString(columnName->data); + String *columnValue = makeString(columnName->data); columnNameList = lappend(columnNameList, columnValue); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 017b46149..7c57a77f2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -151,7 +151,7 @@ static Job * RouterJob(Query *originalQuery, static bool RelationPrunesToMultipleShards(List *relationShardList); static void NormalizeMultiRowInsertTargetList(Query *query); static void AppendNextDummyColReference(Alias *expendedReferenceNames); -static Value * MakeDummyColumnString(int dummyColumnId); +static String * MakeDummyColumnString(int dummyColumnId); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); @@ -3249,7 +3249,7 @@ AppendNextDummyColReference(Alias *expendedReferenceNames) { int existingColReferences = list_length(expendedReferenceNames->colnames); int nextColReferenceId = existingColReferences + 1; - Value *missingColumnString = MakeDummyColumnString(nextColReferenceId); + String *missingColumnString = MakeDummyColumnString(nextColReferenceId); expendedReferenceNames->colnames = lappend(expendedReferenceNames->colnames, missingColumnString); } @@ -3259,12 +3259,12 @@ AppendNextDummyColReference(Alias *expendedReferenceNames) * MakeDummyColumnString returns a String (Value) object by appending given * integer to end of the "column" string. */ -static Value * +static String * MakeDummyColumnString(int dummyColumnId) { StringInfo dummyColumnStringInfo = makeStringInfo(); appendStringInfo(dummyColumnStringInfo, "column%d", dummyColumnId); - Value *dummyColumnString = makeString(dummyColumnStringInfo->data); + String *dummyColumnString = makeString(dummyColumnStringInfo->data); return dummyColumnString; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 9138b1b80..e84c821fa 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1952,7 +1952,7 @@ BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, */ if (columnAliasCount >= columnNumber) { - Value *columnAlias = (Value *) list_nth(columnAliasList, columnNumber - 1); + String *columnAlias = (String *) list_nth(columnAliasList, columnNumber - 1); Assert(IsA(columnAlias, String)); newTargetEntry->resname = strVal(columnAlias); } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 7388ff383..8f4821bc1 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -326,8 +326,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX || objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER) { - Value *relationSchemaNameValue = NULL; - Value *relationNameValue = NULL; + String *relationSchemaNameValue = NULL; + String *relationNameValue = NULL; uint32 dropCount = list_length(dropStmt->objects); if (dropCount > 1) @@ -381,11 +381,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) /* prefix with schema name if it is not added already */ if (relationSchemaNameValue == NULL) { - Value *schemaNameValue = makeString(pstrdup(schemaName)); + String *schemaNameValue = makeString(pstrdup(schemaName)); relationNameList = lcons(schemaNameValue, relationNameList); } - char **relationName = &(relationNameValue->val.str); + char **relationName = &(strVal(relationNameValue)); AppendShardIdToName(relationName, shardId); } else if (objectType == OBJECT_POLICY) @@ -750,10 +750,10 @@ UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId) * extend the penultimate element with the shardId. */ int colrefFieldCount = list_length(columnRef->fields); - Value *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); + String *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); Assert(IsA(relnameValue, String)); - AppendShardIdToName(&relnameValue->val.str, *shardId); + AppendShardIdToName(&strVal(relnameValue), *shardId); } /* might be more than one ColumnRef to visit */ diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c index e95a2ccbb..641cfd314 100644 --- a/src/backend/distributed/test/shared_connection_counters.c +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -37,6 +37,29 @@ wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) } +/* + * makeIntConst creates a Const Node that stores a given integer + * + * copied from backend/parser/gram.c + */ +static Node * +makeIntConst(int val, int location) +{ + A_Const *n = makeNode(A_Const); + +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.ival.type = T_Integer; + n->val.ival.ival = val; +#else + n->val.type = T_Integer; + n->val.val.ival = val; +#endif + n->location = location; + + return (Node *) n; +} + + /* * set_max_shared_pool_size is a SQL * interface for setting MaxSharedPoolSize. We use this function in isolation @@ -49,9 +72,8 @@ set_max_shared_pool_size(PG_FUNCTION_ARGS) AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt)); - A_Const *aConstValue = makeNode(A_Const); + A_Const *aConstValue = castNode(A_Const, makeIntConst(value, 0)); - aConstValue->val = *makeInteger(value); alterSystemStmt->setstmt = makeNode(VariableSetStmt); alterSystemStmt->setstmt->name = "citus.max_shared_pool_size"; alterSystemStmt->setstmt->is_local = false; diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index fd7767a2b..4f10eadf6 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -18,10 +18,12 @@ #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b, c) #else +#include "nodes/value.h" #include "storage/smgr.h" #include "utils/int8.h" #include "utils/rel.h" +typedef Value String; #ifdef HAVE_LONG_INT_64 #define strtoi64(str, endptr, base) ((int64) strtol(str, endptr, base)) From bd455f42e3f9b3ffa917b41d4f1f859b6508e96b Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Sun, 10 Apr 2022 13:56:22 -0700 Subject: [PATCH 11/18] PG15: handle change to SeqScan structure. Account for PG commit 2226b4189b. The one site dependent on it can do just as well with a Scan instead of a SeqScan. --- src/backend/distributed/planner/fast_path_router_planner.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 8a2d87fe7..5d02be07c 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -102,15 +102,15 @@ PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse) { PlannedStmt *result = makeNode(PlannedStmt); - SeqScan *seqScanNode = makeNode(SeqScan); - Plan *plan = &seqScanNode->plan; + Scan *scanNode = makeNode(Scan); + Plan *plan = &scanNode->plan; Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; AssertArg(FastPathRouterQuery(parse, &distKey)); /* there is only a single relation rte */ - seqScanNode->scanrelid = 1; + scanNode->scanrelid = 1; plan->targetlist = copyObject(FetchStatementTargetList((Node *) parse)); From 033f9cfff74765bef5da166f6f9af20948fb3bc4 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Mon, 11 Apr 2022 11:08:57 -0700 Subject: [PATCH 12/18] PG15: update copied pg_get_object_address() code. Account for PG commits 5a2832465fd8 and a0ffa885e478. --- ..._12_13_14.c => pg_get_object_address_13_14_15.c} | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) rename src/backend/distributed/metadata/{pg_get_object_address_12_13_14.c => pg_get_object_address_13_14_15.c} (98%) diff --git a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c similarity index 98% rename from src/backend/distributed/metadata/pg_get_object_address_12_13_14.c rename to src/backend/distributed/metadata/pg_get_object_address_13_14_15.c index 26248f025..339ca5c18 100644 --- a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c +++ b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * pg_get_object_address_12_13_14.c + * pg_get_object_address_13_14_15.c * * Copied functions from Postgres pg_get_object_address with acl/owner check. * Since we need to use intermediate data types Relation and Node from @@ -40,11 +40,6 @@ static void ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, Relation *relation); static List * textarray_to_strvaluelist(ArrayType *arr); -/* It is defined on PG >= 13 versions by default */ -#if PG_VERSION_NUM < PG_VERSION_13 - #define TYPALIGN_INT 'i' -#endif - /* * PgGetObjectAddress gets the object address. This function is mostly copied from * pg_get_object_address of the PG code. We need to copy that function to use @@ -283,6 +278,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) case OBJECT_FDW: case OBJECT_FOREIGN_SERVER: case OBJECT_LANGUAGE: +#if PG_VERSION_NUM >= PG_VERSION_15 + case OBJECT_PARAMETER_ACL: +#endif case OBJECT_PUBLICATION: case OBJECT_ROLE: case OBJECT_SCHEMA: @@ -320,6 +318,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) break; } +#if PG_VERSION_NUM >= PG_VERSION_15 + case OBJECT_PUBLICATION_NAMESPACE: +#endif case OBJECT_USER_MAPPING: { objnode = (Node *) list_make2(linitial(name), linitial(args)); From 33ee4877d4f0fbe35b1f5793d4f105e3ab7a62d1 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Mon, 11 Apr 2022 11:39:55 -0700 Subject: [PATCH 13/18] PG15: rename pgstat_initstats() -> pgstat_init_relation(). From PG commits bff258a273 and be902e2651. --- src/backend/distributed/utils/multi_partitioning_utils.c | 2 +- src/include/pg_version_compat.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index a7477e5e5..e11bc5419 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -917,7 +917,7 @@ try_relation_open_nolock(Oid relationId) return NULL; } - pgstat_initstats(relation); + pgstat_init_relation(relation); return relation; } diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 4f10eadf6..5ccc8a9f6 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -33,6 +33,7 @@ typedef Value String; #define strtou64(str, endptr, base) ((uint64) strtoull(str, endptr, base)) #endif #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) +#define pgstat_init_relation(r) pgstat_initstats(r) static inline int64 pg_strtoint64(char *s) From b6a5617ea88bc7ddbc28d644cdca3cc6168554a6 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Mon, 11 Apr 2022 12:02:24 -0700 Subject: [PATCH 14/18] PG15: handle pg_analyze_and_rewrite_* renaming. From PG commit 791b1b71da. --- src/backend/distributed/executor/multi_executor.c | 3 ++- src/backend/distributed/test/deparse_shard_query.c | 6 +++--- src/backend/distributed/test/distribution_metadata.c | 6 +++--- src/include/pg_version_compat.h | 2 ++ 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index f03e96b7c..5fd0f5b08 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -617,7 +617,8 @@ RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, i numParams) { List *queryTreeList = - pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL); + pg_analyze_and_rewrite_fixedparams(rawStmt, queryString, paramOids, numParams, + NULL); if (list_length(queryTreeList) != 1) { diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 1961ad52d..a6196146f 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -49,9 +49,9 @@ deparse_shard_query_test(PG_FUNCTION_ARGS) Node *parsetree = NULL; foreach_ptr(parsetree, parseTreeList) { - List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, - queryStringChar, - NULL, 0, NULL); + List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree, + queryStringChar, + NULL, 0, NULL); Query *query = NULL; foreach_ptr(query, queryTreeList) diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index f9afd3b68..6d769ef27 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -259,9 +259,9 @@ relation_count_in_query(PG_FUNCTION_ARGS) Node *parsetree = NULL; foreach_ptr(parsetree, parseTreeList) { - List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, - queryStringChar, - NULL, 0, NULL); + List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree, + queryStringChar, + NULL, 0, NULL); Query *query = NULL; foreach_ptr(query, queryTreeList) diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index 5ccc8a9f6..db51fcf69 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -34,6 +34,8 @@ typedef Value String; #endif #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) #define pgstat_init_relation(r) pgstat_initstats(r) +#define pg_analyze_and_rewrite_fixedparams(a, b, c, d, e) pg_analyze_and_rewrite(a, b, c, \ + d, e) static inline int64 pg_strtoint64(char *s) From 3e1180de7800ba09dbc87509bec6680ceca1d35a Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Mon, 11 Apr 2022 12:14:28 -0700 Subject: [PATCH 15/18] PG15: handle extra argument to parse_analyze_varparams(). From PG commit 25751f54b8. --- src/backend/distributed/planner/multi_explain.c | 4 ++-- src/include/pg_version_compat.h | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 4c6370e5a..a807085af 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -1062,8 +1062,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* resolve OIDs of unknown (user-defined) types */ - Query *analyzedQuery = parse_analyze_varparams(parseTree, queryString, - ¶mTypes, &numParams); + Query *analyzedQuery = parse_analyze_varparams_compat(parseTree, queryString, + ¶mTypes, &numParams, NULL); #if PG_VERSION_NUM >= PG_VERSION_14 diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index db51fcf69..2f076cf07 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -16,6 +16,8 @@ #if PG_VERSION_NUM >= PG_VERSION_15 #define ProcessCompletedNotifies() #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b, c) +#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d, \ + e) #else #include "nodes/value.h" @@ -33,6 +35,7 @@ typedef Value String; #define strtou64(str, endptr, base) ((uint64) strtoull(str, endptr, base)) #endif #define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) +#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d) #define pgstat_init_relation(r) pgstat_initstats(r) #define pg_analyze_and_rewrite_fixedparams(a, b, c, d, e) pg_analyze_and_rewrite(a, b, c, \ d, e) From ceb593c9da336a5b61a8288a1f2920914e1d669e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 2 May 2022 22:41:19 +0200 Subject: [PATCH 16/18] Convert citus.hide_shards_from_app_name_prefixes to citus.show_shards_for_app_name_prefixes --- src/backend/distributed/shared_library_init.c | 57 ++++++++++--------- .../distributed/sql/citus--11.0-1--11.0-2.sql | 3 +- .../sql/downgrades/citus--11.0-2--11.0-1.sql | 3 +- .../citus_shard_indexes_on_worker/11.0-2.sql | 39 +++++++++++++ .../citus_shard_indexes_on_worker/latest.sql | 2 +- .../udfs/citus_shards_on_worker/11.0-2.sql | 34 +++++++++++ .../udfs/citus_shards_on_worker/latest.sql | 2 +- .../worker/worker_shard_visibility.c | 23 ++++---- .../distributed/worker_shard_visibility.h | 2 +- .../expected/multi_mx_hide_shard_names.out | 12 ++-- src/test/regress/pg_regress_multi.pl | 2 +- .../regress/sql/multi_mx_hide_shard_names.sql | 12 ++-- 12 files changed, 134 insertions(+), 57 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql create mode 100644 src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00b541968..fddd1b49d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -159,9 +159,9 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source); static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSource source); -static bool HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, - GucSource source); -static void HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra); +static bool ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, + GucSource source); +static void ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra); static void ApplicationNameAssignHook(const char *newval, void *extra); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); @@ -1174,24 +1174,6 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); - DefineCustomStringVariable( - "citus.hide_shards_from_app_name_prefixes", - gettext_noop("If application_name starts with one of these values, hide shards"), - gettext_noop("Citus places distributed tables and shards in the same schema. " - "That can cause confusion when inspecting the list of tables on " - "a node with shards. This GUC can be used to hide the shards from " - "pg_class for certain applications based on the application_name " - "of the connection. The default is *, which hides shards from all " - "applications. This behaviour can be overridden using the " - "citus.override_table_visibility setting"), - &HideShardsFromAppNamePrefixes, - "*", - PGC_USERSET, - GUC_STANDARD, - HideShardsFromAppNamePrefixesCheckHook, - HideShardsFromAppNamePrefixesAssignHook, - NULL); - DefineCustomIntVariable( "citus.isolation_test_session_process_id", NULL, @@ -1716,6 +1698,25 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.show_shards_for_app_name_prefixes", + gettext_noop("If application_name starts with one of these values, show shards"), + gettext_noop("Citus places distributed tables and shards in the same schema. " + "That can cause confusion when inspecting the list of tables on " + "a node with shards. By default the shards are hidden from " + "pg_class. This GUC can be used to show the shards to certain " + "applications based on the application_name of the connection. " + "The default is empty string, which hides shards from all " + "applications. This behaviour can be overridden using the " + "citus.override_table_visibility setting"), + &ShowShardsForAppNamePrefixes, + "", + PGC_USERSET, + GUC_STANDARD, + ShowShardsForAppNamePrefixesCheckHook, + ShowShardsForAppNamePrefixesAssignHook, + NULL); + DefineCustomBoolVariable( "citus.sort_returning", gettext_noop("Sorts the RETURNING clause to get consistent test output"), @@ -1985,12 +1986,12 @@ WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source) /* - * HideShardsFromAppNamePrefixesCheckHook ensures that the - * citus.hide_shards_from_app_name_prefixes holds a valid list of application_name + * ShowShardsForAppNamePrefixesCheckHook ensures that the + * citus.show_shards_for_app_name_prefixes holds a valid list of application_name * values. */ static bool -HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source) +ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source) { List *prefixList = NIL; @@ -2020,7 +2021,7 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so if (strcmp(prefixAscii, appNamePrefix) != 0) { - GUC_check_errdetail("prefix %s in citus.hide_shards_from_app_name_prefixes " + GUC_check_errdetail("prefix %s in citus.show_shards_for_app_name_prefixes " "contains non-ascii characters", appNamePrefix); return false; } @@ -2031,12 +2032,12 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so /* - * HideShardsFromAppNamePrefixesAssignHook ensures changes to - * citus.hide_shards_from_app_name_prefixes are reflected in the decision + * ShowShardsForAppNamePrefixesAssignHook ensures changes to + * citus.show_shards_for_app_name_prefixes are reflected in the decision * whether or not to show shards. */ static void -HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra) +ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra) { ResetHideShardsDecision(); } diff --git a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql index 7f39b5980..54fdd2f44 100644 --- a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql +++ b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql @@ -1 +1,2 @@ --- bump version to 11.0-2 +#include "udfs/citus_shards_on_worker/11.0-2.sql" +#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql index 163dca315..fca765ba7 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql @@ -1 +1,2 @@ --- bump down version to 11.0-1 +#include "../udfs/citus_shards_on_worker/11.0-1.sql" +#include "../udfs/citus_shard_indexes_on_worker/11.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql new file mode 100644 index 000000000..fd4684b18 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql @@ -0,0 +1,39 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker( + OUT schema_name name, + OUT index_name name, + OUT table_type text, + OUT owner_name name, + OUT shard_name name) + RETURNS SETOF record + LANGUAGE plpgsql + SET citus.show_shards_for_app_name_prefixes = '*' + AS $$ +BEGIN + -- this is the query that \di produces, except pg_table_is_visible + -- is replaced with pg_catalog.relation_is_a_known_shard(c.oid) + RETURN QUERY + SELECT n.nspname as "Schema", + c.relname as "Name", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner", + c2.relname as "Table" + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid + LEFT JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid + WHERE c.relkind IN ('i','') + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + AND n.nspname !~ '^pg_toast' + AND pg_catalog.relation_is_a_known_shard(c.oid) + ORDER BY 1,2; +END; +$$; + +CREATE OR REPLACE VIEW pg_catalog.citus_shard_indexes_on_worker AS + SELECT schema_name as "Schema", + index_name as "Name", + table_type as "Type", + owner_name as "Owner", + shard_name as "Table" + FROM pg_catalog.citus_shard_indexes_on_worker() s; diff --git a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql index d98cdafe5..fd4684b18 100644 --- a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql @@ -6,7 +6,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker( OUT shard_name name) RETURNS SETOF record LANGUAGE plpgsql - SET citus.hide_shards_from_app_name_prefixes = '' + SET citus.show_shards_for_app_name_prefixes = '*' AS $$ BEGIN -- this is the query that \di produces, except pg_table_is_visible diff --git a/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql new file mode 100644 index 000000000..dbb7498e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql @@ -0,0 +1,34 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker( + OUT schema_name name, + OUT shard_name name, + OUT table_type text, + OUT owner_name name) + RETURNS SETOF record + LANGUAGE plpgsql + SET citus.show_shards_for_app_name_prefixes = '*' + AS $$ +BEGIN + -- this is the query that \d produces, except pg_table_is_visible + -- is replaced with pg_catalog.relation_is_a_known_shard(c.oid) + RETURN QUERY + SELECT n.nspname as "Schema", + c.relname as "Name", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner" + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r','p','v','m','S','f','') + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + AND n.nspname !~ '^pg_toast' + AND pg_catalog.relation_is_a_known_shard(c.oid) + ORDER BY 1,2; +END; +$$; + +CREATE OR REPLACE VIEW pg_catalog.citus_shards_on_worker AS + SELECT schema_name as "Schema", + shard_name as "Name", + table_type as "Type", + owner_name as "Owner" + FROM pg_catalog.citus_shards_on_worker() s; diff --git a/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql b/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql index 895c92ae8..dbb7498e8 100644 --- a/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql @@ -5,7 +5,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker( OUT owner_name name) RETURNS SETOF record LANGUAGE plpgsql - SET citus.hide_shards_from_app_name_prefixes = '' + SET citus.show_shards_for_app_name_prefixes = '*' AS $$ BEGIN -- this is the query that \d produces, except pg_table_is_visible diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index da9c87a22..e482a955c 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -40,8 +40,8 @@ typedef enum HideShardsMode bool OverrideTableVisibility = true; bool EnableManualChangesToShards = false; -/* hide shards when the application_name starts with one of: */ -char *HideShardsFromAppNamePrefixes = "*"; +/* show shards when the application_name starts with one of: */ +char *ShowShardsForAppNamePrefixes = ""; /* cache of whether or not to hide shards */ static HideShardsMode HideShards = CHECK_APPLICATION_NAME; @@ -271,8 +271,8 @@ RelationIsAKnownShard(Oid shardRelationId) /* * HideShardsFromSomeApplications transforms queries to pg_class to - * filter out known shards if the application_name matches any of - * the prefixes in citus.hide_shards_from_app_name_prefixes. + * filter out known shards if the application_name does not match any of + * the prefixes in citus.show_shards_for_app_name_prefix. */ void HideShardsFromSomeApplications(Query *query) @@ -294,7 +294,7 @@ HideShardsFromSomeApplications(Query *query) * ShouldHideShards returns whether we should hide shards in the current * session. It only checks the application_name once and then uses a * cached response unless either the application_name or - * citus.hide_shards_from_app_name_prefixes changes. + * citus.show_shards_for_app_name_prefix changes. */ static bool ShouldHideShards(void) @@ -367,32 +367,33 @@ ShouldHideShardsInternal(void) List *prefixList = NIL; /* SplitGUCList scribbles on the input */ - char *splitCopy = pstrdup(HideShardsFromAppNamePrefixes); + char *splitCopy = pstrdup(ShowShardsForAppNamePrefixes); if (!SplitGUCList(splitCopy, ',', &prefixList)) { /* invalid GUC value, ignore */ - return false; + return true; } char *appNamePrefix = NULL; foreach_ptr(appNamePrefix, prefixList) { - /* always hide shards when one of the prefixes is * */ + /* never hide shards when one of the prefixes is * */ if (strcmp(appNamePrefix, "*") == 0) { - return true; + return false; } /* compare only the first first characters */ int prefixLength = strlen(appNamePrefix); if (strncmp(application_name, appNamePrefix, prefixLength) == 0) { - return true; + return false; } } - return false; + /* default behaviour: hide shards */ + return true; } diff --git a/src/include/distributed/worker_shard_visibility.h b/src/include/distributed/worker_shard_visibility.h index 957992fed..7eea5fbf7 100644 --- a/src/include/distributed/worker_shard_visibility.h +++ b/src/include/distributed/worker_shard_visibility.h @@ -15,7 +15,7 @@ extern bool OverrideTableVisibility; extern bool EnableManualChangesToShards; -extern char *HideShardsFromAppNamePrefixes; +extern char *ShowShardsForAppNamePrefixes; extern void HideShardsFromSomeApplications(Query *query); diff --git a/src/test/regress/expected/multi_mx_hide_shard_names.out b/src/test/regress/expected/multi_mx_hide_shard_names.out index c0265b282..60003caa2 100644 --- a/src/test/regress/expected/multi_mx_hide_shard_names.out +++ b/src/test/regress/expected/multi_mx_hide_shard_names.out @@ -114,7 +114,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name (2 rows) -- changing application_name reveals the shards -SET application_name TO ''; +SET application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- @@ -137,7 +137,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- changing application_name in transaction reveals the shards BEGIN; -SET LOCAL application_name TO ''; +SET LOCAL application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- @@ -160,7 +160,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- now with session-level GUC, but ROLLBACK; BEGIN; -SET application_name TO ''; +SET application_name TO 'pg_regress'; ROLLBACK; -- shards are hidden again after GUCs are reset SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; @@ -173,7 +173,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- we should hide correctly based on application_name with savepoints BEGIN; SAVEPOINT s1; -SET application_name TO ''; +SET application_name TO 'pg_regress'; -- changing application_name reveals the shards SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname @@ -196,9 +196,9 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name (2 rows) ROLLBACK; --- changing citus.hide_shards_from_app_name_prefixes reveals the shards +-- changing citus.show_shards_for_app_name_prefix reveals the shards BEGIN; -SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; +SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 9b03b88d8..b3147b06b 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -466,7 +466,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); # Some tests look at shards in pg_class, make sure we can usually see them: -push(@pgOptions, "citus.hide_shards_from_app_name_prefixes='psql,pg_dump'"); +push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); # we disable slow start by default to encourage parallelism within tests push(@pgOptions, "citus.executor_slow_start_interval=0ms"); diff --git a/src/test/regress/sql/multi_mx_hide_shard_names.sql b/src/test/regress/sql/multi_mx_hide_shard_names.sql index b56329150..558a699a6 100644 --- a/src/test/regress/sql/multi_mx_hide_shard_names.sql +++ b/src/test/regress/sql/multi_mx_hide_shard_names.sql @@ -67,7 +67,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; -- changing application_name reveals the shards -SET application_name TO ''; +SET application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; RESET application_name; @@ -76,7 +76,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- changing application_name in transaction reveals the shards BEGIN; -SET LOCAL application_name TO ''; +SET LOCAL application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; @@ -85,7 +85,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- now with session-level GUC, but ROLLBACK; BEGIN; -SET application_name TO ''; +SET application_name TO 'pg_regress'; ROLLBACK; -- shards are hidden again after GUCs are reset @@ -94,7 +94,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- we should hide correctly based on application_name with savepoints BEGIN; SAVEPOINT s1; -SET application_name TO ''; +SET application_name TO 'pg_regress'; -- changing application_name reveals the shards SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK TO SAVEPOINT s1; @@ -102,9 +102,9 @@ ROLLBACK TO SAVEPOINT s1; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; --- changing citus.hide_shards_from_app_name_prefixes reveals the shards +-- changing citus.show_shards_for_app_name_prefix reveals the shards BEGIN; -SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; +SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; From f193e16a018bcb2bebb16bcb4d1593ff13910f00 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 4 May 2022 16:44:06 +0200 Subject: [PATCH 17/18] Refrain reading the metadata cache for all tables during upgrade First, it is not needed. Second, in the past we had issues regarding this: https://github.com/citusdata/citus/pull/4344 When I create 10k tables, ~120K shards, this saves 40Mb of memory during ALTER EXTENSION citus UPDATE. Before the change: MetadataCacheMemoryContext: 41943040 ~ 40MB After the change: MetadataCacheMemoryContext: 8192 --- src/backend/distributed/commands/extension.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index e61d62054..e4b720b4c 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -513,7 +513,8 @@ MarkExistingObjectDependenciesDistributedIfSupported() ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); - if (ShouldSyncTableMetadata(citusTableId)) + /* refrain reading the metadata cache for all tables */ + if (ShouldSyncTableMetadataViaCatalog(citusTableId)) { /* we need to pass pointer allocated in the heap */ ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); From 06a94d167e81ce5545439830eddbca928f534d39 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Thu, 28 Apr 2022 18:04:42 +0300 Subject: [PATCH 18/18] Use object address instead of relation id on DDLJob to decide on syncing metadata --- src/backend/distributed/commands/index.c | 8 +++++--- src/backend/distributed/commands/rename.c | 2 +- src/backend/distributed/commands/statistics.c | 12 +++++------ src/backend/distributed/commands/table.c | 4 ++-- src/backend/distributed/commands/trigger.c | 2 +- .../distributed/commands/utility_hook.c | 20 +++++++++++-------- .../distributed/metadata/metadata_sync.c | 16 +++++++++++++++ .../distributed/planner/distributed_planner.c | 2 +- .../distributed/commands/utility_hook.h | 4 ++-- src/include/distributed/metadata_sync.h | 2 ++ 10 files changed, 48 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 5ff984f66..3e25483b0 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -464,7 +464,8 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, + CreateIndexStmtGetRelationId(createIndexStatement)); ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->metadataSyncCommand = createIndexCommand; ddlJob->taskList = CreateIndexTaskList(createIndexStatement); @@ -598,7 +599,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement, "concurrently"); ddlJob->metadataSyncCommand = reindexCommand; @@ -695,7 +696,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, MarkInvalidateForeignKeyGraph(); } - ddlJob->targetRelationId = distributedRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, + distributedRelationId); /* * We do not want DROP INDEX CONCURRENTLY to commit locally before diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index d777c420b..3ece05a0a 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -127,7 +127,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = tableRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId); ddlJob->metadataSyncCommand = renameCommand; ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 984d63969..6f8e6df54 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -92,7 +92,7 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -197,7 +197,7 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -236,7 +236,7 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -274,7 +274,7 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -376,7 +376,7 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -416,7 +416,7 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 220a4d049..832df667c 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1102,7 +1102,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, /* fill them here as it is possible to use them in some conditional blocks below */ DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, leftRelationId); const char *sqlForTaskList = alterTableCommand; if (deparseAT) @@ -1779,7 +1779,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); QualifyTreeNode((Node *) stmt); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt); ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand); return list_make1(ddlJob); diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index 2e3c107fa..94f4f4cef 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -712,7 +712,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, const char *queryString) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = queryString; if (!triggerName) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 9c93f0737..fba205cfc 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1044,16 +1044,20 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsureCoordinator(); - Oid targetRelationId = ddlJob->targetRelationId; + ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; - if (OidIsValid(targetRelationId)) + if (OidIsValid(targetObjectAddress.classId)) { /* - * Only for ddlJobs that are targetting a relation (table) we want to sync - * its metadata and verify some properties around the table. + * Only for ddlJobs that are targetting an object we want to sync + * its metadata. */ - shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId); - EnsurePartitionTableNotReplicated(targetRelationId); + shouldSyncMetadata = ShouldSyncUserCommandForObject(targetObjectAddress); + + if (targetObjectAddress.classId == RelationRelationId) + { + EnsurePartitionTableNotReplicated(targetObjectAddress.objectId); + } } bool localExecutionSupported = true; @@ -1304,7 +1308,7 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command) } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = GetTableDDLCommand(command); ddlJob->taskList = taskList; @@ -1555,7 +1559,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = InvalidOid; + ddlJob->targetObjectAddress = InvalidObjectAddress; ddlJob->metadataSyncCommand = NULL; ddlJob->taskList = list_make1(task); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4b62afc3b..ee9634617 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -425,6 +425,22 @@ ClusterHasKnownMetadataWorkers() } +/* + * ShouldSyncUserCommandForObject checks if the user command should be synced to the + * worker nodes for the given object. + */ +bool +ShouldSyncUserCommandForObject(ObjectAddress objectAddress) +{ + if (objectAddress.classId == RelationRelationId) + { + return ShouldSyncTableMetadata(objectAddress.objectId); + } + + return false; +} + + /* * ShouldSyncTableMetadata checks if the metadata of a distributed table should be * propagated to metadata workers, i.e. the table is a hash distributed table or diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index fca03eef8..39f6c0b63 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1065,7 +1065,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* - * EnsurePartitionTableNotReplicated errors out if the infput relation is + * EnsurePartitionTableNotReplicated errors out if the input relation is * a partition table and the table has a replication factor greater than * one. * diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 615a7c6d2..246d413d9 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -50,13 +50,13 @@ extern bool InDelegatedProcedureCall; /* * A DDLJob encapsulates the remote tasks and commands needed to process all or - * part of a distributed DDL command. It hold the distributed relation's oid, + * part of a distributed DDL command. It hold the target object's address, * the original DDL command string (for MX DDL propagation), and a task list of * DDL_TASK-type Tasks to be executed. */ typedef struct DDLJob { - Oid targetRelationId; /* oid of the target distributed relation */ + ObjectAddress targetObjectAddress; /* target distributed object address */ /* * Whether to commit and start a new transaction before sending commands diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e67726bfc..babecd210 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -13,6 +13,7 @@ #define METADATA_SYNC_H +#include "distributed/commands/utility_hook.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "nodes/pg_list.h" @@ -34,6 +35,7 @@ extern void SyncCitusTableMetadata(Oid relationId); extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); extern char * LocalGroupIdUpdateCommand(int32 groupId); +extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress); extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern List * NodeMetadataCreateCommands(void);