Locally create objects having a dependency that we cannot distribute

We were already doing so for functions & types believing that
this cannot be the case for other object types.

However, as in #5830, we cannot distribute an object that user
attempts creating in temp schema. Even more, this doesn't only
apply to functions and types but also to many other object types.

So with this commit, we teach preprocess/postprocess functions
(that need to create dependencies on worker nodes) how to skip
trying to distribute such objects.

We also start identifying temp schemas as the objects that we
don't know how to propagate to worker nodes so that we can
simply create objects locally if user attempts creating them
in a temp schema.

There are 36 callers of `EnsureDependenciesExistOnAllNodes` in
the codebase atm and for the most we still need to throw a hard
error (i.e.: not use `DeferErrorIfHasUnsupportedDependency`
beforehand), such as:

i) user explicitly wants to create a distributed object
* CreateCitusLocalTable
* CreateDistributedTable
* master_create_worker_shards
* master_create_empty_shard
* create_distributed_function
* EnsureExtensionFunctionCanBeDistributed

ii) we don't want to skip altering distributed table on worker nodes
* PostprocessIndexStmt
* PostprocessCreateTriggerStmt
* PostprocessCreateStatisticsStmt

iii) object is already distributed / handled by Citus before, so we
aren't okay with not propagating the ALTER command
* PostprocessAlterTableSchemaStmt
* PostprocessAlterCollationOwnerStmt
* PostprocessAlterCollationSchemaStmt
* PostprocessAlterDatabaseOwnerStmt
* PostprocessAlterExtensionSchemaStmt
* PostprocessAlterFunctionOwnerStmt
* PostprocessAlterFunctionSchemaStmt
* PostprocessAlterSequenceOwnerStmt
* PostprocessAlterSequenceSchemaStmt
* PostprocessAlterStatisticsSchemaStmt
* PostprocessAlterStatisticsOwnerStmt
* PostprocessAlterTextSearchConfigurationSchemaStmt
* PostprocessAlterTextSearchDictionarySchemaStmt
* PostprocessAlterTextSearchConfigurationOwnerStmt
* PostprocessAlterTextSearchDictionaryOwnerStmt
* PostprocessAlterTypeSchemaStmt
* PostprocessAlterForeignServerOwnerStmt

iv) we already cannot create those objects in temp schemas, so skipping
for now
* PostprocessCreateExtensionStmt
* PostprocessCreateForeignServerStmt

Also note that there are 3 more callers of
`EnsureDependenciesExistOnAllNodes` in enterprise in addition to those
36 but we don't need to do anything specific about them due to the same
reasoning given in iii).
pull/5836/head
Onur Tirtir 2022-03-21 18:33:53 +03:00
parent 001551d732
commit dc31102630
23 changed files with 229 additions and 16 deletions

View File

@ -19,6 +19,7 @@
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
@ -588,6 +589,14 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
ObjectAddress collationAddress =
DefineCollationStmtObjectAddress(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(
&collationAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&collationAddress);
/* to prevent recursion with mx we disable ddl propagation */

View File

@ -158,6 +158,8 @@ EnsureDependenciesCanBeDistributed(const ObjectAddress *objectAddress)
if (depError != NULL)
{
/* override error detail as it is not applicable here*/
depError->detail = NULL;
RaiseDeferredError(depError, ERROR);
}
}

View File

@ -34,6 +34,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
@ -91,6 +92,14 @@ PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString
EnsureSequentialMode(OBJECT_TSCONFIGURATION);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
/*
@ -132,6 +141,14 @@ PostprocessCreateTextSearchDictionaryStmt(Node *node, const char *queryString)
EnsureSequentialMode(OBJECT_TSDICTIONARY);
ObjectAddress address = GetObjectAddressFromParseTree((Node *) stmt, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&address);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&address);
QualifyTreeNode(node);

View File

@ -297,6 +297,14 @@ PostprocessCreateEnumStmt(Node *node, const char *queryString)
/* lookup type address of just created type */
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
DeferredErrorMessage *errMsg = DeferErrorIfHasUnsupportedDependency(&typeAddress);
if (errMsg != NULL)
{
RaiseDeferredError(errMsg, WARNING);
return NIL;
}
EnsureDependenciesExistOnAllNodes(&typeAddress);
return NIL;

View File

@ -600,7 +600,7 @@ SupportedDependencyByCitus(const ObjectAddress *address)
{
case OCLASS_SCHEMA:
{
return true;
return !isTempNamespace(address->objectId);
}
default:
@ -631,11 +631,15 @@ SupportedDependencyByCitus(const ObjectAddress *address)
}
case OCLASS_COLLATION:
case OCLASS_SCHEMA:
{
return true;
}
case OCLASS_SCHEMA:
{
return !isTempNamespace(address->objectId);
}
case OCLASS_PROC:
{
return true;
@ -776,15 +780,16 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
#endif
/*
* If the given object is a procedure or type, we want to create it locally,
* so provide that information in the error detail.
* We expect callers to interpret the error returned from this function
* as a warning if the object itself is just being created. In that case,
* we expect them to report below error detail as well to indicate that
* object itself will not be propagated but will still be created locally.
*
* Otherwise, callers are expected to throw the error returned from this
* function as a hard one by ignoring the detail part.
*/
if (getObjectClass(objectAddress) == OCLASS_PROC ||
getObjectClass(objectAddress) == OCLASS_TYPE)
{
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
objectDescription);
}
appendStringInfo(detailInfo, "\"%s\" will be created only locally",
objectDescription);
if (SupportedDependencyByCitus(undistributableDependency))
{
@ -800,9 +805,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
objectDescription);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorInfo->data,
strlen(detailInfo->data) == 0 ? NULL : detailInfo->data,
hintInfo->data);
errorInfo->data, detailInfo->data, hintInfo->data);
}
appendStringInfo(errorInfo, "\"%s\" has dependency on unsupported "
@ -810,9 +813,7 @@ DeferErrorIfHasUnsupportedDependency(const ObjectAddress *objectAddress)
dependencyDescription);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorInfo->data,
strlen(detailInfo->data) == 0 ? NULL : detailInfo->data,
NULL);
errorInfo->data, detailInfo->data, NULL);
}

View File

@ -263,3 +263,23 @@ s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_b
# node id in run_command_on_all_nodes warning
s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g
# Temp schema names in error messages regarding dependencies that we cannot distribute
#
# 1) Schema of the depending object in the error message:
#
# e.g.:
# WARNING: "function pg_temp_3.f(bigint)" has dependency on unsupported object "<foo>"
# will be replaced with
# WARNING: "function pg_temp_xxx.f(bigint)" has dependency on unsupported object "<foo>"
s/^(WARNING|ERROR)(: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" has dependency on unsupported object ".*")$/\1\2pg_temp_xxx\3/g
# 2) Schema of the depending object in the error detail:
s/^(DETAIL: "[a-z\ ]+ )pg_temp_[0-9]+(\..*" will be created only locally)$/\1pg_temp_xxx\2/g
# 3) Schema that the object depends in the error message:
# e.g.:
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_3"
# will be replaced with
# WARNING: "function func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
s/^(WARNING|ERROR)(: "[a-z\ ]+ .*" has dependency on unsupported object) "schema pg_temp_[0-9]+"$/\1\2 "schema pg_temp_xxx"/g

View File

@ -179,3 +179,8 @@ HINT: Connect to the coordinator and run it again.
SET citus.enable_ddl_propagation TO off;
DROP SCHEMA collation_creation_on_worker;
SET citus.enable_ddl_propagation TO on;
\c - - - :master_port
-- will skip trying to propagate the collation due to temp schema
CREATE COLLATION pg_temp.temp_collation (provider = icu, locale = 'de-u-co-phonebk');
WARNING: "collation pg_temp_xxx.temp_collation" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "collation pg_temp_xxx.temp_collation" will be created only locally

View File

@ -80,6 +80,33 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION agg_dummy_func(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
SET client_min_messages TO WARNING;
-- will skip trying to propagate the aggregate due to temp schema
CREATE AGGREGATE pg_temp.dummy_agg(int) (
sfunc = agg_dummy_func,
stype = int,
sspace = 8,
finalfunc = agg_dummy_func,
finalfunc_extra,
initcond = '5',
msfunc = agg_dummy_func,
mstype = int,
msspace = 12,
minvfunc = agg_dummy_func,
mfinalfunc = agg_dummy_func,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
WARNING: "function pg_temp_xxx.dummy_agg(integer)" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "function pg_temp_xxx.dummy_agg(integer)" will be created only locally
RESET client_min_messages;
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution

View File

@ -597,6 +597,18 @@ DETAIL: "type default_test_row" will be created only locally
CREATE TABLE table_text_local_def(id int, col_1 default_test_row);
SELECT create_distributed_table('table_text_local_def','id');
ERROR: "table table_text_local_def" has dependency on unsupported object "type text_local_def"
-- will skip trying to propagate the type/enum due to temp schema
CREATE TYPE pg_temp.temp_type AS (int_field int);
WARNING: "type temp_type" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "type temp_type" will be created only locally
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
WARNING: "type temp_enum" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "type temp_enum" will be created only locally
WARNING: cannot PREPARE a transaction that has operated on temporary objects
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: another command is already in progress
ERROR: cannot PREPARE a transaction that has operated on temporary objects
CONTEXT: while executing command on localhost:xxxxx
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;

View File

@ -1279,6 +1279,17 @@ ALTER TABLE table_1_for_circ_dep_3 ADD COLUMN col_2 table_2_for_circ_dep_3;
SELECT create_distributed_table('table_1_for_circ_dep_3','id');
ERROR: Citus can not handle circular dependencies between distributed objects
DETAIL: "table table_1_for_circ_dep_3" circularly depends itself, resolve circular dependency first
-- will skip trying to propagate the function due to temp schema
CREATE FUNCTION pg_temp.temp_func(group_size BIGINT) RETURNS SETOF integer[]
AS $$
SELECT array_agg(s) OVER w
FROM generate_series(1,5) s
WINDOW w AS (ORDER BY s ROWS BETWEEN CURRENT ROW AND GROUP_SIZE FOLLOWING)
$$ LANGUAGE SQL STABLE;
WARNING: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "function pg_temp_xxx.temp_func(bigint)" will be created only locally
SELECT create_distributed_function('pg_temp.temp_func(BIGINT)');
ERROR: "function pg_temp_xxx.temp_func(bigint)" has dependency on unsupported object "schema pg_temp_xxx"
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA function_propagation_schema CASCADE;

View File

@ -163,6 +163,13 @@ ALTER TABLE test_table ADD COLUMN id3 bigserial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
CREATE SEQUENCE pg_temp.temp_sequence;
CREATE TABLE table_with_temp_sequence (
dist_key int,
seq_col bigint default nextval('pg_temp.temp_sequence')
);
SELECT create_distributed_table('table_with_temp_sequence', 'dist_key');
ERROR: "table table_with_temp_sequence" has dependency on unsupported object "schema pg_temp_xxx"
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;

View File

@ -628,5 +628,10 @@ objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
(1 row)
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
ERROR: schema "pg_temp" does not exist
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -634,5 +634,10 @@ objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
(1 row)
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
ERROR: schema "pg_temp" does not exist
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -15,6 +15,8 @@ SELECT create_distributed_table('test_stats', 'a');
(1 row)
CREATE STATISTICS pg_temp.s1 (dependencies) ON a, b FROM test_stats;
ERROR: "statistics object s1" has dependency on unsupported object "schema pg_temp_xxx"
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics
CREATE TABLE "test'stats2" (

View File

@ -696,6 +696,18 @@ $$);
t
(1 row)
-- will skip trying to propagate the text search configuration due to temp schema
CREATE TEXT SEARCH CONFIGURATION pg_temp.temp_text_search_config ( parser = default );
WARNING: "text search configuration pg_temp_xxx.temp_text_search_config" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "text search configuration pg_temp_xxx.temp_text_search_config" will be created only locally
-- will skip trying to propagate the text search dictionary due to temp schema
CREATE TEXT SEARCH DICTIONARY pg_temp.temp_text_search_dict (
template = snowball,
language = english,
stopwords = english
);
WARNING: "text search dictionary pg_temp_xxx.temp_text_search_dict" has dependency on unsupported object "schema pg_temp_xxx"
DETAIL: "text search dictionary pg_temp_xxx.temp_text_search_dict" will be created only locally
SET client_min_messages TO 'warning';
DROP SCHEMA text_search, text_search2, "Text Search Requiring Quote's" CASCADE;
DROP ROLE text_search_owner;

View File

@ -109,3 +109,8 @@ CREATE COLLATION collation_creation_on_worker.another_german_phonebook (provider
SET citus.enable_ddl_propagation TO off;
DROP SCHEMA collation_creation_on_worker;
SET citus.enable_ddl_propagation TO on;
\c - - - :master_port
-- will skip trying to propagate the collation due to temp schema
CREATE COLLATION pg_temp.temp_collation (provider = icu, locale = 'de-u-co-phonebk');

View File

@ -61,6 +61,33 @@ CREATE FUNCTION add_polygons(polygon, polygon) RETURNS int
IMMUTABLE
RETURNS NULL ON NULL INPUT;
CREATE FUNCTION agg_dummy_func(state int, item int)
RETURNS int IMMUTABLE LANGUAGE plpgsql AS $$
begin
return state + item;
end;
$$;
SET client_min_messages TO WARNING;
-- will skip trying to propagate the aggregate due to temp schema
CREATE AGGREGATE pg_temp.dummy_agg(int) (
sfunc = agg_dummy_func,
stype = int,
sspace = 8,
finalfunc = agg_dummy_func,
finalfunc_extra,
initcond = '5',
msfunc = agg_dummy_func,
mstype = int,
msspace = 12,
minvfunc = agg_dummy_func,
mfinalfunc = agg_dummy_func,
mfinalfunc_extra,
minitcond = '1',
sortop = ">"
);
RESET client_min_messages;
-- Test some combination of functions without ddl propagation
-- This will prevent the workers from having those types created. They are
-- created just-in-time on function distribution

View File

@ -359,6 +359,10 @@ CREATE TYPE default_test_row AS (f1 text_local_def, f2 int4);
CREATE TABLE table_text_local_def(id int, col_1 default_test_row);
SELECT create_distributed_table('table_text_local_def','id');
-- will skip trying to propagate the type/enum due to temp schema
CREATE TYPE pg_temp.temp_type AS (int_field int);
CREATE TYPE pg_temp.temp_enum AS ENUM ('one', 'two', 'three');
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;

View File

@ -832,6 +832,16 @@ ALTER TABLE table_1_for_circ_dep_3 ADD COLUMN col_2 table_2_for_circ_dep_3;
-- It should error out due to circular dependency
SELECT create_distributed_table('table_1_for_circ_dep_3','id');
-- will skip trying to propagate the function due to temp schema
CREATE FUNCTION pg_temp.temp_func(group_size BIGINT) RETURNS SETOF integer[]
AS $$
SELECT array_agg(s) OVER w
FROM generate_series(1,5) s
WINDOW w AS (ORDER BY s ROWS BETWEEN CURRENT ROW AND GROUP_SIZE FOLLOWING)
$$ LANGUAGE SQL STABLE;
SELECT create_distributed_function('pg_temp.temp_func(BIGINT)');
RESET search_path;
SET client_min_messages TO WARNING;
DROP SCHEMA function_propagation_schema CASCADE;

View File

@ -122,6 +122,13 @@ ALTER TABLE test_table ALTER COLUMN id3 SET DEFAULT nextval('test_sequence_1'),
ALTER TABLE test_table ADD COLUMN id3 bigserial;
ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
CREATE SEQUENCE pg_temp.temp_sequence;
CREATE TABLE table_with_temp_sequence (
dist_key int,
seq_col bigint default nextval('pg_temp.temp_sequence')
);
SELECT create_distributed_table('table_with_temp_sequence', 'dist_key');
DROP TABLE test_table CASCADE;
DROP SEQUENCE test_sequence_0;
DROP SEQUENCE test_sequence_1;

View File

@ -371,5 +371,10 @@ SELECT distribution_argument_index FROM pg_catalog.pg_dist_object WHERE classid
objid = (SELECT oid FROM pg_proc WHERE prosrc = 'cube_a_f8_f8');
ROLLBACK;
-- Postgres already doesn't allow creating extensions in temp schema but
-- let's have a test for that to track any furher changes in postgres.
DROP EXTENSION isn CASCADE;
CREATE EXTENSION isn WITH SCHEMA pg_temp;
-- drop the schema and all the objects
DROP SCHEMA "extension'test" CASCADE;

View File

@ -14,6 +14,8 @@ CREATE TABLE test_stats (
SELECT create_distributed_table('test_stats', 'a');
CREATE STATISTICS pg_temp.s1 (dependencies) ON a, b FROM test_stats;
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics

View File

@ -402,6 +402,16 @@ SELECT COUNT(DISTINCT result)=1 FROM run_command_on_all_nodes($$
WHERE dictname = 'snowball_dict';
$$);
-- will skip trying to propagate the text search configuration due to temp schema
CREATE TEXT SEARCH CONFIGURATION pg_temp.temp_text_search_config ( parser = default );
-- will skip trying to propagate the text search dictionary due to temp schema
CREATE TEXT SEARCH DICTIONARY pg_temp.temp_text_search_dict (
template = snowball,
language = english,
stopwords = english
);
SET client_min_messages TO 'warning';
DROP SCHEMA text_search, text_search2, "Text Search Requiring Quote's" CASCADE;
DROP ROLE text_search_owner;