Fix materialized view intermediate result filename (#5982)

(cherry picked from commit 268d3fa3a6)
release-10.2-ahmet-backport
Ahmet Gedemenli 2022-06-14 15:07:08 +03:00
parent 042e54c9cf
commit c187ae8e79
5 changed files with 220 additions and 31 deletions

View File

@ -44,7 +44,7 @@
#include "utils/syscache.h" #include "utils/syscache.h"
static bool CreatedResultsDirectory = false; static List *CreatedResultsDirectories = NIL;
/* CopyDestReceiver can be used to stream results into a distributed table */ /* CopyDestReceiver can be used to stream results into a distributed table */
@ -593,26 +593,28 @@ CreateIntermediateResultsDirectory(void)
{ {
char *resultDirectory = IntermediateResultsDirectory(); char *resultDirectory = IntermediateResultsDirectory();
if (!CreatedResultsDirectory) int makeOK = mkdir(resultDirectory, S_IRWXU);
if (makeOK != 0)
{ {
int makeOK = mkdir(resultDirectory, S_IRWXU); if (errno == EEXIST)
if (makeOK != 0)
{ {
if (errno == EEXIST) /* someone else beat us to it, that's ok */
{ return resultDirectory;
/* someone else beat us to it, that's ok */
return resultDirectory;
}
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create intermediate results directory "
"\"%s\": %m",
resultDirectory)));
} }
CreatedResultsDirectory = true; ereport(ERROR, (errcode_for_file_access(),
errmsg("could not create intermediate results directory "
"\"%s\": %m",
resultDirectory)));
} }
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
CreatedResultsDirectories =
lappend(CreatedResultsDirectories, pstrdup(resultDirectory));
MemoryContextSwitchTo(oldContext);
return resultDirectory; return resultDirectory;
} }
@ -692,13 +694,14 @@ IntermediateResultsDirectory(void)
/* /*
* RemoveIntermediateResultsDirectory removes the intermediate result directory * RemoveIntermediateResultsDirectories removes the intermediate result directory
* for the current distributed transaction, if any was created. * for the current distributed transaction, if any was created.
*/ */
void void
RemoveIntermediateResultsDirectory(void) RemoveIntermediateResultsDirectories(void)
{ {
if (CreatedResultsDirectory) char *directoryElement = NULL;
foreach_ptr(directoryElement, CreatedResultsDirectories)
{ {
/* /*
* The shared directory is renamed before deleting it. Otherwise it * The shared directory is renamed before deleting it. Otherwise it
@ -707,7 +710,7 @@ RemoveIntermediateResultsDirectory(void)
* that's not possible. The current PID is included in the new * that's not possible. The current PID is included in the new
* filename, so there can be no collisions with other backends. * filename, so there can be no collisions with other backends.
*/ */
char *sharedName = IntermediateResultsDirectory(); char *sharedName = directoryElement;
StringInfo privateName = makeStringInfo(); StringInfo privateName = makeStringInfo();
appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid); appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid);
if (rename(sharedName, privateName->data)) if (rename(sharedName, privateName->data))
@ -727,9 +730,12 @@ RemoveIntermediateResultsDirectory(void)
{ {
PathNameDeleteTemporaryDir(privateName->data); PathNameDeleteTemporaryDir(privateName->data);
} }
CreatedResultsDirectory = false;
} }
/* cleanup */
list_free_deep(CreatedResultsDirectories);
CreatedResultsDirectories = NIL;
} }

View File

@ -320,7 +320,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
/* stop propagating notices from workers, we know the query is failed */ /* stop propagating notices from workers, we know the query is failed */
DisableWorkerMessagePropagation(); DisableWorkerMessagePropagation();
RemoveIntermediateResultsDirectory(); RemoveIntermediateResultsDirectories();
ResetShardPlacementTransactionState(); ResetShardPlacementTransactionState();
@ -408,7 +408,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* existing folders that are associated with distributed transaction * existing folders that are associated with distributed transaction
* ids on the worker nodes. * ids on the worker nodes.
*/ */
RemoveIntermediateResultsDirectory(); RemoveIntermediateResultsDirectories();
UnSetDistributedTransactionId(); UnSetDistributedTransactionId();
break; break;
@ -420,10 +420,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
* If the distributed query involves 2PC, we already removed * If the distributed query involves 2PC, we already removed
* the intermediate result directory on XACT_EVENT_PREPARE. However, * the intermediate result directory on XACT_EVENT_PREPARE. However,
* if not, we should remove it here on the COMMIT. Since * if not, we should remove it here on the COMMIT. Since
* RemoveIntermediateResultsDirectory() is idempotent, we're safe * RemoveIntermediateResultsDirectories() is idempotent, we're safe
* to call it here again even if the transaction involves 2PC. * to call it here again even if the transaction involves 2PC.
*/ */
RemoveIntermediateResultsDirectory(); RemoveIntermediateResultsDirectories();
/* nothing further to do if there's no managed remote xacts */ /* nothing further to do if there's no managed remote xacts */
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)

View File

@ -57,7 +57,7 @@ extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver); extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
extern void SendQueryResultViaCopy(const char *resultId); extern void SendQueryResultViaCopy(const char *resultId);
extern void ReceiveQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId);
extern void RemoveIntermediateResultsDirectory(void); extern void RemoveIntermediateResultsDirectories(void);
extern int64 IntermediateResultSize(const char *resultId); extern int64 IntermediateResultSize(const char *resultId);
extern char * QueryResultFileName(const char *resultId); extern char * QueryResultFileName(const char *resultId);
extern char * CreateIntermediateResultsDirectory(void); extern char * CreateIntermediateResultsDirectory(void);

View File

@ -573,9 +573,111 @@ WARNING: Query could not find the intermediate result file "squares_2", it was
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
-- test refreshing mat views
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE ROLE")
(localhost,57638,t,"CREATE ROLE")
(2 rows)
CREATE USER some_other_user;
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,GRANT)
(localhost,57638,t,GRANT)
(2 rows)
GRANT ALL ON DATABASE regression TO some_other_user;
RESET client_min_messages;
\c - some_other_user
CREATE SCHEMA other_schema;
SET search_path TO other_schema;
CREATE TABLE dist_table (a int, b int);
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
SELECT create_distributed_table('dist_table', 'a');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$other_schema.dist_table$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE MATERIALIZED VIEW mat_view AS
SELECT *
FROM (
SELECT * FROM dist_table
LIMIT 50000
) q;
CREATE MATERIALIZED VIEW mat_view_2 AS
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
REFRESH MATERIALIZED VIEW other_schema.mat_view;
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
-- which in turn executes a repartition join query.
\c - postgres
REFRESH MATERIALIZED VIEW other_schema.mat_view;
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
\c - some_other_user
-- test security definer funcs
CREATE FUNCTION security_definer_in_files()
RETURNS BOOLEAN AS $$
DECLARE passed BOOLEAN;
BEGIN
SELECT count(*) > 0 INTO passed
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
WHERE foo.a > bar.a;
RETURN passed;
END;
$$ LANGUAGE plpgsql
SECURITY DEFINER;
SELECT security_definer_in_files();
security_definer_in_files
---------------------------------------------------------------------
f
(1 row)
\c - postgres
SELECT security_definer_in_files();
security_definer_in_files
---------------------------------------------------------------------
f
(1 row)
CREATE FUNCTION security_definer_in_files_2()
RETURNS BOOLEAN AS $$
DECLARE passed BOOLEAN;
BEGIN
SELECT count(*) > 0 INTO passed
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
WHERE foo.a > bar.a;
RETURN passed;
END;
$$ LANGUAGE plpgsql
SECURITY DEFINER;
BEGIN;
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
security_definer_in_files_2 | security_definer_in_files
---------------------------------------------------------------------
f | f
(1 row)
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
security_definer_in_files_2 | security_definer_in_files
---------------------------------------------------------------------
f | f
(1 row)
COMMIT;
-- cleanup
SET client_min_messages TO ERROR;
DROP SCHEMA other_schema CASCADE;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table interesting_squares
drop cascades to type square_type
drop cascades to table stored_squares
drop cascades to table squares

View File

@ -255,4 +255,85 @@ END;
-- results should have been deleted after transaction commit -- results should have been deleted after transaction commit
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
-- test refreshing mat views
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
CREATE USER some_other_user;
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
GRANT ALL ON DATABASE regression TO some_other_user;
RESET client_min_messages;
\c - some_other_user
CREATE SCHEMA other_schema;
SET search_path TO other_schema;
CREATE TABLE dist_table (a int, b int);
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
SELECT create_distributed_table('dist_table', 'a');
CREATE MATERIALIZED VIEW mat_view AS
SELECT *
FROM (
SELECT * FROM dist_table
LIMIT 50000
) q;
CREATE MATERIALIZED VIEW mat_view_2 AS
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
REFRESH MATERIALIZED VIEW other_schema.mat_view;
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
-- which in turn executes a repartition join query.
\c - postgres
REFRESH MATERIALIZED VIEW other_schema.mat_view;
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
\c - some_other_user
-- test security definer funcs
CREATE FUNCTION security_definer_in_files()
RETURNS BOOLEAN AS $$
DECLARE passed BOOLEAN;
BEGIN
SELECT count(*) > 0 INTO passed
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
WHERE foo.a > bar.a;
RETURN passed;
END;
$$ LANGUAGE plpgsql
SECURITY DEFINER;
SELECT security_definer_in_files();
\c - postgres
SELECT security_definer_in_files();
CREATE FUNCTION security_definer_in_files_2()
RETURNS BOOLEAN AS $$
DECLARE passed BOOLEAN;
BEGIN
SELECT count(*) > 0 INTO passed
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
WHERE foo.a > bar.a;
RETURN passed;
END;
$$ LANGUAGE plpgsql
SECURITY DEFINER;
BEGIN;
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
COMMIT;
-- cleanup
SET client_min_messages TO ERROR;
DROP SCHEMA other_schema CASCADE;
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;