mirror of https://github.com/citusdata/citus.git
Fix materialized view intermediate result filename (#5982)
(cherry picked from commit 268d3fa3a6
)
release-11.0-ahmet-backport
parent
978d31f330
commit
4345627480
|
@ -45,7 +45,7 @@
|
|||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
static bool CreatedResultsDirectory = false;
|
||||
static List *CreatedResultsDirectories = NIL;
|
||||
|
||||
|
||||
/* CopyDestReceiver can be used to stream results into a distributed table */
|
||||
|
@ -594,26 +594,28 @@ CreateIntermediateResultsDirectory(void)
|
|||
{
|
||||
char *resultDirectory = IntermediateResultsDirectory();
|
||||
|
||||
if (!CreatedResultsDirectory)
|
||||
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
||||
if (makeOK != 0)
|
||||
{
|
||||
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
||||
if (makeOK != 0)
|
||||
if (errno == EEXIST)
|
||||
{
|
||||
if (errno == EEXIST)
|
||||
{
|
||||
/* someone else beat us to it, that's ok */
|
||||
return resultDirectory;
|
||||
}
|
||||
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not create intermediate results directory "
|
||||
"\"%s\": %m",
|
||||
resultDirectory)));
|
||||
/* someone else beat us to it, that's ok */
|
||||
return resultDirectory;
|
||||
}
|
||||
|
||||
CreatedResultsDirectory = true;
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not create intermediate results directory "
|
||||
"\"%s\": %m",
|
||||
resultDirectory)));
|
||||
}
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||
|
||||
CreatedResultsDirectories =
|
||||
lappend(CreatedResultsDirectories, pstrdup(resultDirectory));
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
return resultDirectory;
|
||||
}
|
||||
|
||||
|
@ -693,13 +695,14 @@ IntermediateResultsDirectory(void)
|
|||
|
||||
|
||||
/*
|
||||
* RemoveIntermediateResultsDirectory removes the intermediate result directory
|
||||
* RemoveIntermediateResultsDirectories removes the intermediate result directory
|
||||
* for the current distributed transaction, if any was created.
|
||||
*/
|
||||
void
|
||||
RemoveIntermediateResultsDirectory(void)
|
||||
RemoveIntermediateResultsDirectories(void)
|
||||
{
|
||||
if (CreatedResultsDirectory)
|
||||
char *directoryElement = NULL;
|
||||
foreach_ptr(directoryElement, CreatedResultsDirectories)
|
||||
{
|
||||
/*
|
||||
* The shared directory is renamed before deleting it. Otherwise it
|
||||
|
@ -708,7 +711,7 @@ RemoveIntermediateResultsDirectory(void)
|
|||
* that's not possible. The current PID is included in the new
|
||||
* filename, so there can be no collisions with other backends.
|
||||
*/
|
||||
char *sharedName = IntermediateResultsDirectory();
|
||||
char *sharedName = directoryElement;
|
||||
StringInfo privateName = makeStringInfo();
|
||||
appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid);
|
||||
if (rename(sharedName, privateName->data))
|
||||
|
@ -728,9 +731,12 @@ RemoveIntermediateResultsDirectory(void)
|
|||
{
|
||||
PathNameDeleteTemporaryDir(privateName->data);
|
||||
}
|
||||
|
||||
CreatedResultsDirectory = false;
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
list_free_deep(CreatedResultsDirectories);
|
||||
|
||||
CreatedResultsDirectories = NIL;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -326,7 +326,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
/* stop propagating notices from workers, we know the query is failed */
|
||||
DisableWorkerMessagePropagation();
|
||||
|
||||
RemoveIntermediateResultsDirectory();
|
||||
RemoveIntermediateResultsDirectories();
|
||||
|
||||
/* handles both already prepared and open transactions */
|
||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
||||
|
@ -412,7 +412,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
* existing folders that are associated with distributed transaction
|
||||
* ids on the worker nodes.
|
||||
*/
|
||||
RemoveIntermediateResultsDirectory();
|
||||
RemoveIntermediateResultsDirectories();
|
||||
|
||||
UnSetDistributedTransactionId();
|
||||
break;
|
||||
|
@ -424,10 +424,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
* If the distributed query involves 2PC, we already removed
|
||||
* the intermediate result directory on XACT_EVENT_PREPARE. However,
|
||||
* if not, we should remove it here on the COMMIT. Since
|
||||
* RemoveIntermediateResultsDirectory() is idempotent, we're safe
|
||||
* RemoveIntermediateResultsDirectories() is idempotent, we're safe
|
||||
* to call it here again even if the transaction involves 2PC.
|
||||
*/
|
||||
RemoveIntermediateResultsDirectory();
|
||||
RemoveIntermediateResultsDirectories();
|
||||
|
||||
/* nothing further to do if there's no managed remote xacts */
|
||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
||||
|
|
|
@ -79,7 +79,7 @@ extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
|||
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
|
||||
extern void SendQueryResultViaCopy(const char *resultId);
|
||||
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
||||
extern void RemoveIntermediateResultsDirectory(void);
|
||||
extern void RemoveIntermediateResultsDirectories(void);
|
||||
extern int64 IntermediateResultSize(const char *resultId);
|
||||
extern char * QueryResultFileName(const char *resultId);
|
||||
extern char * CreateIntermediateResultsDirectory(void);
|
||||
|
|
|
@ -569,9 +569,111 @@ WARNING: Query could not find the intermediate result file "squares_2", it was
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- test refreshing mat views
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"CREATE ROLE")
|
||||
(localhost,57638,t,"CREATE ROLE")
|
||||
(2 rows)
|
||||
|
||||
CREATE USER some_other_user;
|
||||
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,GRANT)
|
||||
(localhost,57638,t,GRANT)
|
||||
(2 rows)
|
||||
|
||||
GRANT ALL ON DATABASE regression TO some_other_user;
|
||||
RESET client_min_messages;
|
||||
\c - some_other_user
|
||||
CREATE SCHEMA other_schema;
|
||||
SET search_path TO other_schema;
|
||||
CREATE TABLE dist_table (a int, b int);
|
||||
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$other_schema.dist_table$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE MATERIALIZED VIEW mat_view AS
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT * FROM dist_table
|
||||
LIMIT 50000
|
||||
) q;
|
||||
CREATE MATERIALIZED VIEW mat_view_2 AS
|
||||
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
|
||||
-- which in turn executes a repartition join query.
|
||||
\c - postgres
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||
\c - some_other_user
|
||||
-- test security definer funcs
|
||||
CREATE FUNCTION security_definer_in_files()
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE passed BOOLEAN;
|
||||
BEGIN
|
||||
SELECT count(*) > 0 INTO passed
|
||||
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||
WHERE foo.a > bar.a;
|
||||
|
||||
RETURN passed;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
SECURITY DEFINER;
|
||||
SELECT security_definer_in_files();
|
||||
security_definer_in_files
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
\c - postgres
|
||||
SELECT security_definer_in_files();
|
||||
security_definer_in_files
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION security_definer_in_files_2()
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE passed BOOLEAN;
|
||||
BEGIN
|
||||
SELECT count(*) > 0 INTO passed
|
||||
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||
WHERE foo.a > bar.a;
|
||||
|
||||
RETURN passed;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
SECURITY DEFINER;
|
||||
BEGIN;
|
||||
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||
security_definer_in_files_2 | security_definer_in_files
|
||||
---------------------------------------------------------------------
|
||||
f | f
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||
security_definer_in_files_2 | security_definer_in_files
|
||||
---------------------------------------------------------------------
|
||||
f | f
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- cleanup
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA other_schema CASCADE;
|
||||
DROP SCHEMA intermediate_results CASCADE;
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table interesting_squares
|
||||
drop cascades to type square_type
|
||||
drop cascades to table stored_squares
|
||||
drop cascades to table squares
|
||||
|
|
|
@ -252,4 +252,85 @@ END;
|
|||
-- results should have been deleted after transaction commit
|
||||
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
|
||||
|
||||
-- test refreshing mat views
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
|
||||
CREATE USER some_other_user;
|
||||
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
|
||||
GRANT ALL ON DATABASE regression TO some_other_user;
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - some_other_user
|
||||
|
||||
CREATE SCHEMA other_schema;
|
||||
SET search_path TO other_schema;
|
||||
|
||||
CREATE TABLE dist_table (a int, b int);
|
||||
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
|
||||
CREATE MATERIALIZED VIEW mat_view AS
|
||||
SELECT *
|
||||
FROM (
|
||||
SELECT * FROM dist_table
|
||||
LIMIT 50000
|
||||
) q;
|
||||
|
||||
CREATE MATERIALIZED VIEW mat_view_2 AS
|
||||
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
|
||||
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||
|
||||
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
|
||||
-- which in turn executes a repartition join query.
|
||||
\c - postgres
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||
|
||||
\c - some_other_user
|
||||
|
||||
-- test security definer funcs
|
||||
CREATE FUNCTION security_definer_in_files()
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE passed BOOLEAN;
|
||||
BEGIN
|
||||
SELECT count(*) > 0 INTO passed
|
||||
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||
WHERE foo.a > bar.a;
|
||||
|
||||
RETURN passed;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
SECURITY DEFINER;
|
||||
|
||||
SELECT security_definer_in_files();
|
||||
|
||||
\c - postgres
|
||||
|
||||
SELECT security_definer_in_files();
|
||||
|
||||
CREATE FUNCTION security_definer_in_files_2()
|
||||
RETURNS BOOLEAN AS $$
|
||||
DECLARE passed BOOLEAN;
|
||||
BEGIN
|
||||
SELECT count(*) > 0 INTO passed
|
||||
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||
WHERE foo.a > bar.a;
|
||||
|
||||
RETURN passed;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql
|
||||
SECURITY DEFINER;
|
||||
|
||||
BEGIN;
|
||||
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||
COMMIT;
|
||||
|
||||
-- cleanup
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA other_schema CASCADE;
|
||||
DROP SCHEMA intermediate_results CASCADE;
|
||||
|
|
Loading…
Reference in New Issue