Merge pull request #5836 from citusdata/fix/cannot-distribute-tmp-schemas

pull/5840/head
Onur Tirtir 2022-03-22 15:30:31 +03:00 committed by GitHub
commit 11f246785e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 241 additions and 33 deletions

View File

@ -19,6 +19,7 @@
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
@ -588,6 +589,14 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
ObjectAddress collationAddress =
DefineCollationStmtObjectAddress(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(
&collationAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&collationAddress);
/* to prevent recursion with mx we disable ddl propagation */

View File

@ -158,6 +158,8 @@ EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress)
if (depError != NULL)
{
/* override error detail as it is not applicable here*/
depError->detail = NULL;
RaiseDeferredError(depError, ERROR);
}
}

View File

@ -34,6 +34,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
@ -91,6 +92,14 @@ PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
/*
@ -132,6 +141,14 @@ PostprocessCreateTextSearchDictionaryStmt(Node *node, const char *queryString)
EnsureSequentialMode(OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
QualifyTreeNode(node);

View File

@ -260,22 +260,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString,
/* enforce fully qualified typeName for correct deparsing and lookup */
QualifyTreeNode(node);
/* reconstruct creation statement in a portable fashion */
const char *createEnumStmtSql = DeparseCreateEnumStmt(node);
createEnumStmtSql = WrapCreateOrReplace(createEnumStmtSql);
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
*/
EnsureSequentialMode(OBJECT_TYPE);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NIL;
}
@ -297,9 +282,32 @@ PostprocessCreateEnumStmt(Node *node, const char *queryString)
/* lookup type address of just created type */
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
/*
* when we allow propagation within a transaction block we should make sure to only
* allow this in sequential mode
*/
EnsureSequentialMode(OBJECT_TYPE);
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;
/* reconstruct creation statement in a portable fashion */
const char *createEnumStmtSql = DeparseCreateEnumStmt(node);
createEnumStmtSql = WrapCreateOrReplace(createEnumStmtSql);
/* to prevent recursion with mx we disable ddl propagation */
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
(void *) createEnumStmtSql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -600,7 +600,7 @@ SupportedDependencyByCitus(const ObjectAddress *address)
{
case OCLASS_SCHEMA:
{
return true;
return !isTempNamespace(address->objectId);
}
default:
@ -631,11 +631,15 @@ SupportedDependencyByCitus(const ObjectAddress *address)
}
case OCLASS_COLLATION:
case OCLASS_SCHEMA:
{
return true;
}
case OCLASS_SCHEMA:
{
return !isTempNamespace(address->objectId);
}
case OCLASS_PROC:
{
return true;
@ -776,15 +780,16 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
#endif
/*
* If the given object is a procedure or type, we want to create it locally,
* so provide that information in the error detail.
* We expect callers to interpret the error returned from this function
* as a warning if the object itself is just being created. In that case,
* we expect them to report below error detail as well to indicate that
* object itself will not be propagated but will still be created locally.
*
* Otherwise, callers are expected to throw the error returned from this
* function as a hard one by ignoring the detail part.
*/
if (getObjectClass(objectAddress) == OCLASS_PROC ||
getObjectClass(objectAddress) == OCLASS_TYPE)
{
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
objectDescription);
}
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
objectDescription);
if (SupportedDependencyByCitus(undistributableDependency))
{
@ -800,9 +805,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
objectDescription);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorInfo->data,
strlen(detailInfo->data) == 0 ? NULL : detailInfo->data,
hintInfo->data);
errorInfo->data, detailInfo->data, hintInfo->data);
}
appendStringInfo(errorInfo, "\"%s\" has dependency on unsupported "
@ -810,9 +813,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
dependencyDescription);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorInfo->data,
strlen(detailInfo->data) == 0 ? NULL : detailInfo->data,
NULL);
errorInfo->data, detailInfo->data, NULL);
}

View File

@ -263,3 +263,23 @@ s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_b
# node id in run_command_on_all_nodes warning
s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g
# Temp schema names in error messages regarding dependencies that we cannot distribute
#
# 1) Schema of the depending object in the error message:
#
# e.g.:
# WARNING: "function pg_temp_3.f(bigint)" has dependency on unsupported object "<foo>"
# will be replaced with
# WARNING: "function pg_temp_xxx.f(bigint)" has dependency on unsupported object "<foo>"
s/^(WARNING|ERROR)(: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" has dependency on unsupported object ".*")$/\1\2pg_temp_xxx\3/g
# 2) Schema of the depending object in the error detail:
s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg_temp_xxx\2/g
# 3) Schema that the object depends in the error message:
# e.g.:
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_3"
# will be replaced with
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g

View File

@ -179,3 +179,8 @@ HINT: Connect to the coordinator and run it again.
SET citus.enable_ddl_propagation TO off;
DROP SCHEMA collation_creation_on_worker;
SET citus.enable_ddl_propagation TO on;
\c - - - :master_port
-- will skip trying to propagate the collation due to temp schema
CREATE COLLATION pg_temp.temp_collation (provider = icu, locale = 'de-u-co-phonebk');
WARNING: "collation pg_temp_xxx.temp_collation" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "collation pg_temp_xxx.temp_collation" will be created only locally

View File

@ -80,6 +80,33 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION agg_dummy_func(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
SET client_min_messages TO WARNING;
-- will skip trying to propagate the aggregate due to temp schema
CREATE AGGREGATE pg_temp.dummy_agg(int) (
sfunc = agg_dummy_func,
stype = int,
sspace = 8,
finalfunc = agg_dummy_func,
finalfunc_extra,
initcond = '5',
msfunc = agg_dummy_func,
mstype = int,
msspace = 12,
minvfunc = agg_dummy_func,
mfinalfunc = agg_dummy_func,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
WARNING: "function pg_temp_xxx.dummy_agg(integer)" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "function pg_temp_xxx.dummy_agg(integer)" will be created only locally
RESET client_min_messages;
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution

View File

@ -597,6 +597,13 @@ DETAIL: "type default_test_row" will be created only locally
CREATE TABLE table_text_local_def(id int, col_1 default_test_row);
SELECT create_distributed_table('table_text_local_def','id');
ERROR: "table table_text_local_def" has dependency on unsupported object "type text_local_def"
-- will skip trying to propagate the type/enum due to temp schema
CREATE TYPE pg_temp.temp_type AS (int_field int);
WARNING: "type temp_type" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "type temp_type" will be created only locally
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
WARNING: "type temp_enum" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "type temp_enum" will be created only locally
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;

View File

@ -1279,6 +1279,17 @@ ALTER TABLE table_1_for_circ_dep_3 ADD COLUMN col_2 table_2_for_circ_dep_3;
SELECT create_distributed_table('table_1_for_circ_dep_3','id');
ERROR: Citus can not handle circular dependencies between distributed objects
DETAIL: "table table_1_for_circ_dep_3" circularly depends itself, resolve circular dependency first
-- will skip trying to propagate the function due to temp schema
CREATE FUNCTION pg_temp.temp_func(group_size BIGINT) RETURNS SETOF integer[]
AS $$
SELECT array_agg(s) OVER w
FROM generate_series(1,5) s
WINDOW w AS (ORDER BY s ROWS BETWEEN CURRENT ROW AND GROUP_SIZE FOLLOWING)
$$ LANGUAGE SQL STABLE;
WARNING: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "function pg_temp_xxx.temp_func(bigint)" will be created only locally
SELECT create_distributed_function('pg_temp.temp_func(BIGINT)');
ERROR: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA function_propagation_schema CASCADE;

View File

@ -163,6 +163,13 @@ ALTER TABLE test_table ADD COLUMN id3 bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
CREATE SEQUENCE pg_temp.temp_sequence;
CREATE TABLE table_with_temp_sequence (
dist_key int,
seq_col bigint default nextval('pg_temp.temp_sequence')
);
SELECT create_distributed_table('table_with_temp_sequence', 'dist_key');
ERROR: "table table_with_temp_sequence" has dependency on unsupported object "schema pg_temp_xxx"
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;

View File

@ -628,5 +628,10 @@ objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
(1 row)
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
ERROR: schema "pg_temp" does not exist
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -634,5 +634,10 @@ objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
(1 row)
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
ERROR: schema "pg_temp" does not exist
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -15,6 +15,8 @@ SELECT create_distributed_table('test_stats', 'a');
(1 row)
CREATE STATISTICS pg_temp.s1 (dependencies) ON a, b FROM test_stats;
ERROR: "statistics object s1" has dependency on unsupported object "schema pg_temp_xxx"
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics
CREATE TABLE "test'stats2" (

View File

@ -696,6 +696,18 @@ $$);
t
(1 row)
-- will skip trying to propagate the text search configuration due to temp schema
CREATE TEXT SEARCH CONFIGURATION pg_temp.temp_text_search_config ( parser = default );
WARNING: "text search configuration pg_temp_xxx.temp_text_search_config" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "text search configuration pg_temp_xxx.temp_text_search_config" will be created only locally
-- will skip trying to propagate the text search dictionary due to temp schema
CREATE TEXT SEARCH DICTIONARY pg_temp.temp_text_search_dict (
template = snowball,
language = english,
stopwords = english
);
WARNING: "text search dictionary pg_temp_xxx.temp_text_search_dict" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "text search dictionary pg_temp_xxx.temp_text_search_dict" will be created only locally
SET client_min_messages TO 'warning';
DROP SCHEMA text_search, text_search2, "Text Search Requiring Quote's" CASCADE;
DROP ROLE text_search_owner;

View File

@ -109,3 +109,8 @@ CREATE COLLATION collation_creation_on_worker.another_german_phonebook (provider
SET citus.enable_ddl_propagation TO off;
DROP SCHEMA collation_creation_on_worker;
SET citus.enable_ddl_propagation TO on;
\c - - - :master_port
-- will skip trying to propagate the collation due to temp schema
CREATE COLLATION pg_temp.temp_collation (provider = icu, locale = 'de-u-co-phonebk');

View File

@ -61,6 +61,33 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION agg_dummy_func(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
SET client_min_messages TO WARNING;
-- will skip trying to propagate the aggregate due to temp schema
CREATE AGGREGATE pg_temp.dummy_agg(int) (
sfunc = agg_dummy_func,
stype = int,
sspace = 8,
finalfunc = agg_dummy_func,
finalfunc_extra,
initcond = '5',
msfunc = agg_dummy_func,
mstype = int,
msspace = 12,
minvfunc = agg_dummy_func,
mfinalfunc = agg_dummy_func,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
RESET client_min_messages;
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution

View File

@ -359,6 +359,10 @@ CREATE TYPE default_test_row AS (f1 text_local_def, f2 int4);
CREATE TABLE table_text_local_def(id int, col_1 default_test_row);
SELECT create_distributed_table('table_text_local_def','id');
-- will skip trying to propagate the type/enum due to temp schema
CREATE TYPE pg_temp.temp_type AS (int_field int);
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;

View File

@ -832,6 +832,16 @@ ALTER TABLE table_1_for_circ_dep_3 ADD COLUMN col_2 table_2_for_circ_dep_3;
-- It should error out due to circular dependency
SELECT create_distributed_table('table_1_for_circ_dep_3','id');
-- will skip trying to propagate the function due to temp schema
CREATE FUNCTION pg_temp.temp_func(group_size BIGINT) RETURNS SETOF integer[]
AS $$
SELECT array_agg(s) OVER w
FROM generate_series(1,5) s
WINDOW w AS (ORDER BY s ROWS BETWEEN CURRENT ROW AND GROUP_SIZE FOLLOWING)
$$ LANGUAGE SQL STABLE;
SELECT create_distributed_function('pg_temp.temp_func(BIGINT)');
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA function_propagation_schema CASCADE;

View File

@ -122,6 +122,13 @@ ALTER TABLE test_table ALTER COLUMN id3 SET DEFAULT nextval('test_sequence_1'),
ALTER TABLE test_table ADD COLUMN id3 bigserial;
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
CREATE SEQUENCE pg_temp.temp_sequence;
CREATE TABLE table_with_temp_sequence (
dist_key int,
seq_col bigint default nextval('pg_temp.temp_sequence')
);
SELECT create_distributed_table('table_with_temp_sequence', 'dist_key');
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;

View File

@ -371,5 +371,10 @@ SELECT distribution_argument_index FROM pg_catalog.pg_dist_object WHERE classid
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -14,6 +14,8 @@ CREATE TABLE test_stats (
SELECT create_distributed_table('test_stats', 'a');
CREATE STATISTICS pg_temp.s1 (dependencies) ON a, b FROM test_stats;
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics

View File

@ -402,6 +402,16 @@ SELECT COUNT(DISTINCT result)=1 FROM run_command_on_all_nodes($$
WHERE dictname = 'snowball_dict';
$$);
-- will skip trying to propagate the text search configuration due to temp schema
CREATE TEXT SEARCH CONFIGURATION pg_temp.temp_text_search_config ( parser = default );
-- will skip trying to propagate the text search dictionary due to temp schema
CREATE TEXT SEARCH DICTIONARY pg_temp.temp_text_search_dict (
template = snowball,
language = english,
stopwords = english
);
SET client_min_messages TO 'warning';
DROP SCHEMA text_search, text_search2, "Text Search Requiring Quote's" CASCADE;
DROP ROLE text_search_owner;