mirror of https://github.com/citusdata/citus.git
Fix materialized view intermediate result filename (#5982)
parent
d19d876b5f
commit
268d3fa3a6
|
@ -47,7 +47,7 @@
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
static bool CreatedResultsDirectory = false;
|
static List *CreatedResultsDirectories = NIL;
|
||||||
|
|
||||||
|
|
||||||
/* CopyDestReceiver can be used to stream results into a distributed table */
|
/* CopyDestReceiver can be used to stream results into a distributed table */
|
||||||
|
@ -596,26 +596,28 @@ CreateIntermediateResultsDirectory(void)
|
||||||
{
|
{
|
||||||
char *resultDirectory = IntermediateResultsDirectory();
|
char *resultDirectory = IntermediateResultsDirectory();
|
||||||
|
|
||||||
if (!CreatedResultsDirectory)
|
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
||||||
|
if (makeOK != 0)
|
||||||
{
|
{
|
||||||
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
if (errno == EEXIST)
|
||||||
if (makeOK != 0)
|
|
||||||
{
|
{
|
||||||
if (errno == EEXIST)
|
/* someone else beat us to it, that's ok */
|
||||||
{
|
return resultDirectory;
|
||||||
/* someone else beat us to it, that's ok */
|
|
||||||
return resultDirectory;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not create intermediate results directory "
|
|
||||||
"\"%s\": %m",
|
|
||||||
resultDirectory)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CreatedResultsDirectory = true;
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not create intermediate results directory "
|
||||||
|
"\"%s\": %m",
|
||||||
|
resultDirectory)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
|
||||||
|
|
||||||
|
CreatedResultsDirectories =
|
||||||
|
lappend(CreatedResultsDirectories, pstrdup(resultDirectory));
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
return resultDirectory;
|
return resultDirectory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -695,13 +697,14 @@ IntermediateResultsDirectory(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoveIntermediateResultsDirectory removes the intermediate result directory
|
* RemoveIntermediateResultsDirectories removes the intermediate result directory
|
||||||
* for the current distributed transaction, if any was created.
|
* for the current distributed transaction, if any was created.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
RemoveIntermediateResultsDirectory(void)
|
RemoveIntermediateResultsDirectories(void)
|
||||||
{
|
{
|
||||||
if (CreatedResultsDirectory)
|
char *directoryElement = NULL;
|
||||||
|
foreach_ptr(directoryElement, CreatedResultsDirectories)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* The shared directory is renamed before deleting it. Otherwise it
|
* The shared directory is renamed before deleting it. Otherwise it
|
||||||
|
@ -710,7 +713,7 @@ RemoveIntermediateResultsDirectory(void)
|
||||||
* that's not possible. The current PID is included in the new
|
* that's not possible. The current PID is included in the new
|
||||||
* filename, so there can be no collisions with other backends.
|
* filename, so there can be no collisions with other backends.
|
||||||
*/
|
*/
|
||||||
char *sharedName = IntermediateResultsDirectory();
|
char *sharedName = directoryElement;
|
||||||
StringInfo privateName = makeStringInfo();
|
StringInfo privateName = makeStringInfo();
|
||||||
appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid);
|
appendStringInfo(privateName, "%s.removed-by-%d", sharedName, MyProcPid);
|
||||||
if (rename(sharedName, privateName->data))
|
if (rename(sharedName, privateName->data))
|
||||||
|
@ -730,9 +733,12 @@ RemoveIntermediateResultsDirectory(void)
|
||||||
{
|
{
|
||||||
PathNameDeleteTemporaryDir(privateName->data);
|
PathNameDeleteTemporaryDir(privateName->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
CreatedResultsDirectory = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* cleanup */
|
||||||
|
list_free_deep(CreatedResultsDirectories);
|
||||||
|
|
||||||
|
CreatedResultsDirectories = NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -338,7 +338,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
/* stop propagating notices from workers, we know the query is failed */
|
/* stop propagating notices from workers, we know the query is failed */
|
||||||
DisableWorkerMessagePropagation();
|
DisableWorkerMessagePropagation();
|
||||||
|
|
||||||
RemoveIntermediateResultsDirectory();
|
RemoveIntermediateResultsDirectories();
|
||||||
|
|
||||||
/* handles both already prepared and open transactions */
|
/* handles both already prepared and open transactions */
|
||||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
|
||||||
|
@ -438,7 +438,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* existing folders that are associated with distributed transaction
|
* existing folders that are associated with distributed transaction
|
||||||
* ids on the worker nodes.
|
* ids on the worker nodes.
|
||||||
*/
|
*/
|
||||||
RemoveIntermediateResultsDirectory();
|
RemoveIntermediateResultsDirectories();
|
||||||
|
|
||||||
UnSetDistributedTransactionId();
|
UnSetDistributedTransactionId();
|
||||||
break;
|
break;
|
||||||
|
@ -450,10 +450,10 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
||||||
* If the distributed query involves 2PC, we already removed
|
* If the distributed query involves 2PC, we already removed
|
||||||
* the intermediate result directory on XACT_EVENT_PREPARE. However,
|
* the intermediate result directory on XACT_EVENT_PREPARE. However,
|
||||||
* if not, we should remove it here on the COMMIT. Since
|
* if not, we should remove it here on the COMMIT. Since
|
||||||
* RemoveIntermediateResultsDirectory() is idempotent, we're safe
|
* RemoveIntermediateResultsDirectories() is idempotent, we're safe
|
||||||
* to call it here again even if the transaction involves 2PC.
|
* to call it here again even if the transaction involves 2PC.
|
||||||
*/
|
*/
|
||||||
RemoveIntermediateResultsDirectory();
|
RemoveIntermediateResultsDirectories();
|
||||||
|
|
||||||
/* nothing further to do if there's no managed remote xacts */
|
/* nothing further to do if there's no managed remote xacts */
|
||||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
|
||||||
|
|
|
@ -79,7 +79,7 @@ extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
||||||
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
|
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
|
||||||
extern void SendQueryResultViaCopy(const char *resultId);
|
extern void SendQueryResultViaCopy(const char *resultId);
|
||||||
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
||||||
extern void RemoveIntermediateResultsDirectory(void);
|
extern void RemoveIntermediateResultsDirectories(void);
|
||||||
extern int64 IntermediateResultSize(const char *resultId);
|
extern int64 IntermediateResultSize(const char *resultId);
|
||||||
extern char * QueryResultFileName(const char *resultId);
|
extern char * QueryResultFileName(const char *resultId);
|
||||||
extern char * CreateIntermediateResultsDirectory(void);
|
extern char * CreateIntermediateResultsDirectory(void);
|
||||||
|
|
|
@ -569,9 +569,111 @@ WARNING: Query could not find the intermediate result file "squares_2", it was
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
-- test refreshing mat views
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(localhost,57637,t,"CREATE ROLE")
|
||||||
|
(localhost,57638,t,"CREATE ROLE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
CREATE USER some_other_user;
|
||||||
|
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(localhost,57637,t,GRANT)
|
||||||
|
(localhost,57638,t,GRANT)
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
GRANT ALL ON DATABASE regression TO some_other_user;
|
||||||
|
RESET client_min_messages;
|
||||||
|
\c - some_other_user
|
||||||
|
CREATE SCHEMA other_schema;
|
||||||
|
SET search_path TO other_schema;
|
||||||
|
CREATE TABLE dist_table (a int, b int);
|
||||||
|
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
|
||||||
|
SELECT create_distributed_table('dist_table', 'a');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$other_schema.dist_table$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW mat_view AS
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT * FROM dist_table
|
||||||
|
LIMIT 50000
|
||||||
|
) q;
|
||||||
|
CREATE MATERIALIZED VIEW mat_view_2 AS
|
||||||
|
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||||
|
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
|
||||||
|
-- which in turn executes a repartition join query.
|
||||||
|
\c - postgres
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||||
|
\c - some_other_user
|
||||||
|
-- test security definer funcs
|
||||||
|
CREATE FUNCTION security_definer_in_files()
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE passed BOOLEAN;
|
||||||
|
BEGIN
|
||||||
|
SELECT count(*) > 0 INTO passed
|
||||||
|
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||||
|
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||||
|
WHERE foo.a > bar.a;
|
||||||
|
|
||||||
|
RETURN passed;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
SECURITY DEFINER;
|
||||||
|
SELECT security_definer_in_files();
|
||||||
|
security_definer_in_files
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - postgres
|
||||||
|
SELECT security_definer_in_files();
|
||||||
|
security_definer_in_files
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE FUNCTION security_definer_in_files_2()
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE passed BOOLEAN;
|
||||||
|
BEGIN
|
||||||
|
SELECT count(*) > 0 INTO passed
|
||||||
|
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||||
|
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||||
|
WHERE foo.a > bar.a;
|
||||||
|
|
||||||
|
RETURN passed;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
SECURITY DEFINER;
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||||
|
security_definer_in_files_2 | security_definer_in_files
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f | f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||||
|
security_definer_in_files_2 | security_definer_in_files
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f | f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- cleanup
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA other_schema CASCADE;
|
||||||
DROP SCHEMA intermediate_results CASCADE;
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
|
||||||
DETAIL: drop cascades to table interesting_squares
|
|
||||||
drop cascades to type square_type
|
|
||||||
drop cascades to table stored_squares
|
|
||||||
drop cascades to table squares
|
|
||||||
|
|
|
@ -252,4 +252,85 @@ END;
|
||||||
-- results should have been deleted after transaction commit
|
-- results should have been deleted after transaction commit
|
||||||
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
|
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
|
||||||
|
|
||||||
|
-- test refreshing mat views
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
SELECT run_command_on_workers($$CREATE USER some_other_user;$$);
|
||||||
|
CREATE USER some_other_user;
|
||||||
|
SELECT run_command_on_workers($$GRANT ALL ON DATABASE regression TO some_other_user;$$);
|
||||||
|
GRANT ALL ON DATABASE regression TO some_other_user;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
\c - some_other_user
|
||||||
|
|
||||||
|
CREATE SCHEMA other_schema;
|
||||||
|
SET search_path TO other_schema;
|
||||||
|
|
||||||
|
CREATE TABLE dist_table (a int, b int);
|
||||||
|
INSERT INTO dist_table(a, b) SELECT n, n+1 FROM generate_series(1, 10) n;
|
||||||
|
SELECT create_distributed_table('dist_table', 'a');
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW mat_view AS
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT * FROM dist_table
|
||||||
|
LIMIT 50000
|
||||||
|
) q;
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW mat_view_2 AS
|
||||||
|
SELECT count(*) FROM (SELECT * FROM dist_table LIMIT 50000) q, (SELECT * FROM dist_table LIMIT 100) r WHERE q.a > r.a;
|
||||||
|
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||||
|
|
||||||
|
-- Now connect back as a different user and run REFRESH MATERIALIZED VIEW command,
|
||||||
|
-- which in turn executes a repartition join query.
|
||||||
|
\c - postgres
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view;
|
||||||
|
REFRESH MATERIALIZED VIEW other_schema.mat_view_2;
|
||||||
|
|
||||||
|
\c - some_other_user
|
||||||
|
|
||||||
|
-- test security definer funcs
|
||||||
|
CREATE FUNCTION security_definer_in_files()
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE passed BOOLEAN;
|
||||||
|
BEGIN
|
||||||
|
SELECT count(*) > 0 INTO passed
|
||||||
|
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||||
|
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||||
|
WHERE foo.a > bar.a;
|
||||||
|
|
||||||
|
RETURN passed;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
SECURITY DEFINER;
|
||||||
|
|
||||||
|
SELECT security_definer_in_files();
|
||||||
|
|
||||||
|
\c - postgres
|
||||||
|
|
||||||
|
SELECT security_definer_in_files();
|
||||||
|
|
||||||
|
CREATE FUNCTION security_definer_in_files_2()
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
DECLARE passed BOOLEAN;
|
||||||
|
BEGIN
|
||||||
|
SELECT count(*) > 0 INTO passed
|
||||||
|
FROM (SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as foo,
|
||||||
|
(SELECT * FROM other_schema.dist_table ORDER BY a LIMIT 1) as bar
|
||||||
|
WHERE foo.a > bar.a;
|
||||||
|
|
||||||
|
RETURN passed;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
SECURITY DEFINER;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||||
|
SELECT * FROM security_definer_in_files_2(), security_definer_in_files();
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- cleanup
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA other_schema CASCADE;
|
||||||
DROP SCHEMA intermediate_results CASCADE;
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue