From 089ef359400edf78f9190b5ab3b996b0cad13795 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 2 Jun 2021 13:10:14 +0300 Subject: [PATCH] Disable dropping and truncating known shards Add test for disabling dropping and truncating known shards --- src/backend/distributed/commands/table.c | 3 ++ src/backend/distributed/commands/truncate.c | 4 ++ .../distributed/executor/local_executor.c | 43 +++++++++++++++++-- src/backend/distributed/shared_library_init.c | 12 ++++++ .../worker/worker_shard_visibility.c | 32 ++++++++++++++ src/include/distributed/local_executor.h | 2 + .../distributed/worker_shard_visibility.h | 2 + src/test/regress/expected/single_node.out | 27 ++++++++++++ .../regress/expected/single_node_truncate.out | 36 ++++++++-------- src/test/regress/pg_regress_multi.pl | 1 + src/test/regress/sql/single_node.sql | 19 ++++++++ 11 files changed, 160 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index ae25e9394..f4958b4bc 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -36,6 +36,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/version_compat.h" +#include "distributed/worker_shard_visibility.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "parser/parse_expr.h" @@ -125,6 +126,8 @@ PreprocessDropTableStmt(Node *node, const char *queryString, Oid relationId = RangeVarGetRelid(tableRangeVar, AccessShareLock, missingOK); + ErrorIfIllegallyChangingKnownShard(relationId); + /* we're not interested in non-valid, non-distributed relations */ if (relationId == InvalidOid || !IsCitusTable(relationId)) { diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index 185f788e5..e4d833672 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -32,6 +32,7 @@ #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/worker_transaction.h" +#include "distributed/worker_shard_visibility.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -266,6 +267,9 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) foreach_ptr(rangeVar, relationList) { Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false); + + ErrorIfIllegallyChangingKnownShard(relationId); + char relationKind = get_rel_relkind(relationId); if (IsCitusTable(relationId) && relationKind == RELKIND_FOREIGN_TABLE) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index d17bfb6f0..a824573ed 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -108,8 +108,15 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; +int LocalExecutorLevel = 0; + static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL; +static uint64 ExecuteLocalTaskListInternal(List *taskList, + ParamListInfo paramListInfo, + DistributedPlan *distributedPlan, + TupleDestination *defaultTupleDest, + bool isUtilityCommand); static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, List **remoteTaskPlacementList); @@ -200,10 +207,8 @@ ExecuteLocalTaskListExtended(List *taskList, TupleDestination *defaultTupleDest, bool isUtilityCommand) { - ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); - int numParams = 0; - Oid *parameterTypes = NULL; uint64 totalRowsProcessed = 0; + ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); /* * Even if we are executing local tasks, we still enable @@ -218,6 +223,38 @@ ExecuteLocalTaskListExtended(List *taskList, */ UseCoordinatedTransaction(); + LocalExecutorLevel++; + PG_TRY(); + { + totalRowsProcessed = ExecuteLocalTaskListInternal(taskList, paramListInfo, + distributedPlan, + defaultTupleDest, + isUtilityCommand); + } + PG_CATCH(); + { + LocalExecutorLevel--; + + PG_RE_THROW(); + } + PG_END_TRY(); + LocalExecutorLevel--; + + return totalRowsProcessed; +} + + +static uint64 +ExecuteLocalTaskListInternal(List *taskList, + ParamListInfo paramListInfo, + DistributedPlan *distributedPlan, + TupleDestination *defaultTupleDest, + bool isUtilityCommand) +{ + uint64 totalRowsProcessed = 0; + int numParams = 0; + Oid *parameterTypes = NULL; + if (paramListInfo != NULL) { /* not used anywhere, so declare here */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3bc921c55..e734b1594 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -691,6 +691,18 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_manual_changes_to_shards", + gettext_noop("Enables dropping and truncating known shards."), + gettext_noop("Set to false by default. If set to true, enables " + "dropping and truncating shards on the coordinator " + "(or the workers with metadata)"), + &EnableManualChangesToShards, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomRealVariable( "citus.distributed_deadlock_detection_factor", gettext_noop("Sets the time to wait before checking for distributed " diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 9f11ebf6d..53132d1db 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -14,6 +14,7 @@ #include "catalog/pg_class.h" #include "distributed/metadata_cache.h" #include "distributed/coordinator_protocol.h" +#include "distributed/local_executor.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "nodes/nodeFuncs.h" @@ -23,6 +24,7 @@ /* Config variable managed via guc.c */ bool OverrideTableVisibility = true; +bool EnableManualChangesToShards = false; static bool ReplaceTableVisibleFunctionWalker(Node *inputNode); @@ -112,10 +114,40 @@ ErrorIfRelationIsAKnownShard(Oid relationId) } const char *relationName = get_rel_name(relationId); + ereport(ERROR, (errmsg("relation \"%s\" is a shard relation ", relationName))); } +/* + * ErrorIfIllegallyChangingKnownShard errors out if the relation with relationId is + * a known shard and manual changes on known shards are disabled. This is + * valid for only non-citus (external) connections. + */ +void +ErrorIfIllegallyChangingKnownShard(Oid relationId) +{ + if (LocalExecutorLevel > 0 || IsCitusInitiatedRemoteBackend() || + EnableManualChangesToShards) + { + return; + } + + /* search the relation in all schemas */ + bool onlySearchPath = true; + if (RelationIsAKnownShard(relationId, onlySearchPath)) + { + const char *relationName = get_rel_name(relationId); + ereport(ERROR, (errmsg("cannot modify \"%s\" because it is a shard of " + "a distributed table", + relationName), + errhint("Use the distributed table or set " + "citus.enable_manual_changes_to_shards to on " + "to modify shards directly"))); + } +} + + /* * RelationIsAKnownShard gets a relationId, check whether it's a shard of * any distributed table. If onlySearchPath is true, then it searches diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index b4c002d9b..7a02be0f6 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -19,6 +19,8 @@ extern bool EnableLocalExecution; extern bool LogLocalCommands; +extern int LocalExecutorLevel; + typedef enum LocalExecutionStatus { LOCAL_EXECUTION_REQUIRED, diff --git a/src/include/distributed/worker_shard_visibility.h b/src/include/distributed/worker_shard_visibility.h index 46f807fd8..85c48a758 100644 --- a/src/include/distributed/worker_shard_visibility.h +++ b/src/include/distributed/worker_shard_visibility.h @@ -14,10 +14,12 @@ #include "nodes/nodes.h" extern bool OverrideTableVisibility; +extern bool EnableManualChangesToShards; extern void ReplaceTableVisibleFunction(Node *inputNode); extern void ErrorIfRelationIsAKnownShard(Oid relationId); +extern void ErrorIfIllegallyChangingKnownShard(Oid relationId); extern bool RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 81189b007..6bf5af799 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1842,6 +1842,33 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co 1001 | 0 (1 row) +-- test disabling drop and truncate for known shards +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_disabling_drop_and_truncate (a int); +SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.enable_manual_changes_to_shards TO off; +-- these should error out +DROP TABLE test_disabling_drop_and_truncate_102040; +ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly +TRUNCATE TABLE test_disabling_drop_and_truncate_102040; +ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table +HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly +RESET citus.enable_manual_changes_to_shards ; +-- these should work as expected +TRUNCATE TABLE test_disabling_drop_and_truncate_102040; +DROP TABLE test_disabling_drop_and_truncate_102040; +RESET citus.shard_replication_factor; +DROP TABLE test_disabling_drop_and_truncate; -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table; diff --git a/src/test/regress/expected/single_node_truncate.out b/src/test/regress/expected/single_node_truncate.out index 245e43774..bf1c99d69 100644 --- a/src/test/regress/expected/single_node_truncate.out +++ b/src/test/regress/expected/single_node_truncate.out @@ -31,9 +31,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102041 | t + citus_local_102045 | t ref | t - ref_102040 | t + ref_102044 | t (4 rows) -- verify that this UDF is noop on Citus local tables @@ -47,9 +47,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102041 | t + citus_local_102045 | t ref | t - ref_102040 | t + ref_102044 | t (4 rows) -- test that we allow cascading truncates to citus local tables @@ -65,9 +65,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102041 | t + citus_local_102045 | t ref | f - ref_102040 | t + ref_102044 | t (4 rows) ROLLBACK; @@ -98,14 +98,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102041 | t + citus_local_102045 | t dist | f - dist_102043 | t - dist_102044 | t - dist_102045 | t - dist_102046 | t + dist_102047 | t + dist_102048 | t + dist_102049 | t + dist_102050 | t ref | f - ref_102040 | t + ref_102044 | t (9 rows) ROLLBACK; @@ -121,14 +121,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102041 | t + citus_local_102045 | t dist | f - dist_102043 | t - dist_102044 | t - dist_102045 | t - dist_102046 | t + dist_102047 | t + dist_102048 | t + dist_102049 | t + dist_102050 | t ref | t - ref_102040 | t + ref_102044 | t (9 rows) ROLLBACK; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index c95bf9e2f..09500007b 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -448,6 +448,7 @@ push(@pgOptions, "citus.sort_returning='on'"); push(@pgOptions, "citus.shard_replication_factor=2"); push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}"); push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); +push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); # we disable slow start by default to encourage parallelism within tests push(@pgOptions, "citus.executor_slow_start_interval=0ms"); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 47efdc9f9..228039d34 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -905,6 +905,25 @@ WITH cte_1 AS (INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z) SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1; +-- test disabling drop and truncate for known shards +SET citus.shard_replication_factor TO 1; +CREATE TABLE test_disabling_drop_and_truncate (a int); +SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a'); +SET citus.enable_manual_changes_to_shards TO off; + +-- these should error out +DROP TABLE test_disabling_drop_and_truncate_102040; +TRUNCATE TABLE test_disabling_drop_and_truncate_102040; + +RESET citus.enable_manual_changes_to_shards ; + +-- these should work as expected +TRUNCATE TABLE test_disabling_drop_and_truncate_102040; +DROP TABLE test_disabling_drop_and_truncate_102040; + +RESET citus.shard_replication_factor; +DROP TABLE test_disabling_drop_and_truncate; + -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table;