From 43456274800e5212a0be4630490fe2ac3108ecec Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Tue, 14 Jun 2022 15:07:08 +0300 Subject: [PATCH] Fix materialized view intermediate result filename (#5982) (cherry picked from commit 268d3fa3a6ecc8a4e83122da070d0c1fc7ba6993) --- .../executor/intermediate_results.c | 48 ++++---- .../transaction/transaction_management.c | 8 +- .../distributed/intermediate_results.h | 2 +- .../regress/expected/intermediate_results.out | 112 +++++++++++++++++- src/test/regress/sql/intermediate_results.sql | 81 +++++++++++++ 5 files changed, 220 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 20c95fe06..96cd1db74 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -45,7 +45,7 @@ #include "utils/syscache.h" -static bool CreatedResultsDirectory = false; +static List *CreatedResultsDirectories = NIL; /* CopyDestReceiver can be used to stream results into a distributed table */ @@ -594,26 +594,28 @@ CreateIntermediateResultsDirectory(void) { char *resultDirectory = IntermediateResultsDirectory(); - if (!CreatedResultsDirectory) + int makeOK = mkdir(resultDirectory, S_IRWXU); + if (makeOK != 0) { - int makeOK = mkdir(resultDirectory, S_IRWXU); - if (makeOK != 0) + if (errno == EEXIST) { - if (errno == EEXIST) - { - /* someone else beat us to it, that's ok */ - return resultDirectory; - } - - ereport(ERROR, (errcode_for_file_access(), - errmsg("could not create intermediate results directory " - "\"%s\": %m", - resultDirectory))); + /* someone else beat us to it, that's ok */ + return resultDirectory; } - CreatedResultsDirectory = true; + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not create intermediate results directory " + "\"%s\": %m", + resultDirectory))); } + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + CreatedResultsDirectories = + lappend(CreatedResultsDirectories, pstrdup(resultDirectory)); + + MemoryContextSwitchTo(oldContext); + return resultDirectory; } @@ -693,13 +695,14 @@ IntermediateResultsDirectory(void) /* - * RemoveIntermediateResultsDirectory removes the intermediate result directory + * RemoveIntermediateResultsDirectories removes the intermediate result directory * for the current distributed transaction, if any was created. */ void -RemoveIntermediateResultsDirectory(void) +RemoveIntermediateResultsDirectories(void) { - if (CreatedResultsDirectory) + char *directoryElement = NULL; + foreach_ptr(directoryElement, CreatedResultsDirectories) { /* * The shared directory is renamed before deleting it. Otherwise it @@ -708,7 +711,7 @@ RemoveIntermediateResultsDirectory(void) * that's not possible. The current PID is included in the new * filename, so there can be no collisions with other backends. */ - char *sharedName = IntermediateResultsDirectory(); + char *sharedName = directoryElement; StringInfo privateName = makeStringInfo(); appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid); if (rename(sharedName, privateName->data)) @@ -728,9 +731,12 @@ RemoveIntermediateResultsDirectory(void) { PathNameDeleteTemporaryDir(privateName->data); } - - CreatedResultsDirectory = false; } + + /* cleanup */ + list_free_deep(CreatedResultsDirectories); + + CreatedResultsDirectories = NIL; } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index f4472ba95..3f55534e5 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -326,7 +326,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) /* stop propagating notices from workers, we know the query is failed */ DisableWorkerMessagePropagation(); - RemoveIntermediateResultsDirectory(); + RemoveIntermediateResultsDirectories(); /* handles both already prepared and open transactions */ if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE) @@ -412,7 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * existing folders that are associated with distributed transaction * ids on the worker nodes. */ - RemoveIntermediateResultsDirectory(); + RemoveIntermediateResultsDirectories(); UnSetDistributedTransactionId(); break; @@ -424,10 +424,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) * If the distributed query involves 2PC, we already removed * the intermediate result directory on XACT_EVENT_PREPARE. However, * if not, we should remove it here on the COMMIT. Since - * RemoveIntermediateResultsDirectory() is idempotent, we're safe + * RemoveIntermediateResultsDirectories() is idempotent, we're safe * to call it here again even if the transaction involves 2PC. */ - RemoveIntermediateResultsDirectory(); + RemoveIntermediateResultsDirectories(); /* nothing further to do if there's no managed remote xacts */ if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index e40eadba9..791ebdbe7 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -79,7 +79,7 @@ extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver); extern void SendQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId); -extern void RemoveIntermediateResultsDirectory(void); +extern void RemoveIntermediateResultsDirectories(void); extern int64 IntermediateResultSize(const char *resultId); extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index 0bc522ea0..01f079f8f 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -569,9 +569,111 @@ WARNING: Query could not find the intermediate result file "squares_2", it was --------------------------------------------------------------------- (0 rows) +-- test refreshing mat views +SET client_min_messages TO ERROR; +SELECT run_command_on_workers($$CREATE USER some_other_user;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE ROLE") + (localhost,57638,t,"CREATE ROLE") +(2 rows) + +CREATE USER some_other_user; +SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,GRANT) + (localhost,57638,t,GRANT) +(2 rows) + +GRANT ALL ON DATABASE regression TO some_other_user; +RESET client_min_messages; +\c - some_other_user +CREATE SCHEMA other_schema; +SET search_path TO other_schema; +CREATE TABLE dist_table (a int, b int); +INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n; +SELECT create_distributed_table('dist_table', 'a'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$other_schema.dist_table$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE MATERIALIZED VIEW mat_view AS +SELECT * +FROM ( + SELECT * FROM dist_table + LIMIT 50000 +) q; +CREATE MATERIALIZED VIEW mat_view_2 AS + SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a; +REFRESH MATERIALIZED VIEW other_schema.mat_view; +REFRESH MATERIALIZED VIEW other_schema.mat_view_2; +-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command, +-- which in turn executes a repartition join query. +\c - postgres +REFRESH MATERIALIZED VIEW other_schema.mat_view; +REFRESH MATERIALIZED VIEW other_schema.mat_view_2; +\c - some_other_user +-- test security definer funcs +CREATE FUNCTION security_definer_in_files() +RETURNS BOOLEAN AS $$ +DECLARE passed BOOLEAN; +BEGIN + SELECT count(*) > 0 INTO passed + FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo, + (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar + WHERE foo.a > bar.a; + + RETURN passed; +END; +$$ LANGUAGE plpgsql + SECURITY DEFINER; +SELECT security_definer_in_files(); + security_definer_in_files +--------------------------------------------------------------------- + f +(1 row) + +\c - postgres +SELECT security_definer_in_files(); + security_definer_in_files +--------------------------------------------------------------------- + f +(1 row) + +CREATE FUNCTION security_definer_in_files_2() +RETURNS BOOLEAN AS $$ +DECLARE passed BOOLEAN; +BEGIN + SELECT count(*) > 0 INTO passed + FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo, + (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar + WHERE foo.a > bar.a; + + RETURN passed; +END; +$$ LANGUAGE plpgsql + SECURITY DEFINER; +BEGIN; + SELECT * FROM security_definer_in_files_2(), security_definer_in_files(); + security_definer_in_files_2 | security_definer_in_files +--------------------------------------------------------------------- + f | f +(1 row) + + SELECT * FROM security_definer_in_files_2(), security_definer_in_files(); + security_definer_in_files_2 | security_definer_in_files +--------------------------------------------------------------------- + f | f +(1 row) + +COMMIT; +-- cleanup +SET client_min_messages TO ERROR; +DROP SCHEMA other_schema CASCADE; DROP SCHEMA intermediate_results CASCADE; -NOTICE: drop cascades to 4 other objects -DETAIL: drop cascades to table interesting_squares -drop cascades to type square_type -drop cascades to table stored_squares -drop cascades to table squares diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index 67e80d1ee..a2b0cf686 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -252,4 +252,85 @@ END; -- results should have been deleted after transaction commit SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); +-- test refreshing mat views +SET client_min_messages TO ERROR; +SELECT run_command_on_workers($$CREATE USER some_other_user;$$); +CREATE USER some_other_user; +SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$); +GRANT ALL ON DATABASE regression TO some_other_user; +RESET client_min_messages; + +\c - some_other_user + +CREATE SCHEMA other_schema; +SET search_path TO other_schema; + +CREATE TABLE dist_table (a int, b int); +INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n; +SELECT create_distributed_table('dist_table', 'a'); + +CREATE MATERIALIZED VIEW mat_view AS +SELECT * +FROM ( + SELECT * FROM dist_table + LIMIT 50000 +) q; + +CREATE MATERIALIZED VIEW mat_view_2 AS + SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a; + +REFRESH MATERIALIZED VIEW other_schema.mat_view; +REFRESH MATERIALIZED VIEW other_schema.mat_view_2; + +-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command, +-- which in turn executes a repartition join query. +\c - postgres +REFRESH MATERIALIZED VIEW other_schema.mat_view; +REFRESH MATERIALIZED VIEW other_schema.mat_view_2; + +\c - some_other_user + +-- test security definer funcs +CREATE FUNCTION security_definer_in_files() +RETURNS BOOLEAN AS $$ +DECLARE passed BOOLEAN; +BEGIN + SELECT count(*) > 0 INTO passed + FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo, + (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar + WHERE foo.a > bar.a; + + RETURN passed; +END; +$$ LANGUAGE plpgsql + SECURITY DEFINER; + +SELECT security_definer_in_files(); + +\c - postgres + +SELECT security_definer_in_files(); + +CREATE FUNCTION security_definer_in_files_2() +RETURNS BOOLEAN AS $$ +DECLARE passed BOOLEAN; +BEGIN + SELECT count(*) > 0 INTO passed + FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo, + (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar + WHERE foo.a > bar.a; + + RETURN passed; +END; +$$ LANGUAGE plpgsql + SECURITY DEFINER; + +BEGIN; + SELECT * FROM security_definer_in_files_2(), security_definer_in_files(); + SELECT * FROM security_definer_in_files_2(), security_definer_in_files(); +COMMIT; + +-- cleanup +SET client_min_messages TO ERROR; +DROP SCHEMA other_schema CASCADE; DROP SCHEMA intermediate_results CASCADE;