Disable dropping and truncating known shards

Add test for disabling dropping and truncating known shards
pull/5020/head
Ahmet Gedemenli 2021-06-02 13:10:14 +03:00 committed by Marco Slot
parent c113cb3198
commit 089ef35940
11 changed files with 160 additions and 21 deletions

View File

@ -36,6 +36,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "parser/parse_expr.h"
@ -125,6 +126,8 @@ PreprocessDropTableStmt(Node *node, const char *queryString,
Oid relationId = RangeVarGetRelid(tableRangeVar, AccessShareLock, missingOK);
ErrorIfIllegallyChangingKnownShard(relationId);
/* we're not interested in non-valid, non-distributed relations */
if (relationId == InvalidOid || !IsCitusTable(relationId))
{

View File

@ -32,6 +32,7 @@
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_transaction.h"
#include "distributed/worker_shard_visibility.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -266,6 +267,9 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
foreach_ptr(rangeVar, relationList)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
ErrorIfIllegallyChangingKnownShard(relationId);
char relationKind = get_rel_relkind(relationId);
if (IsCitusTable(relationId) &&
relationKind == RELKIND_FOREIGN_TABLE)

View File

@ -108,8 +108,15 @@
bool EnableLocalExecution = true;
bool LogLocalCommands = false;
int LocalExecutorLevel = 0;
static LocalExecutionStatus CurrentLocalExecutionStatus = LOCAL_EXECUTION_OPTIONAL;
static uint64 ExecuteLocalTaskListInternal(List *taskList,
ParamListInfo paramListInfo,
DistributedPlan *distributedPlan,
TupleDestination *defaultTupleDest,
bool isUtilityCommand);
static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **localTaskPlacementList,
List **remoteTaskPlacementList);
@ -200,10 +207,8 @@ ExecuteLocalTaskListExtended(List *taskList,
TupleDestination *defaultTupleDest,
bool isUtilityCommand)
{
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
int numParams = 0;
Oid *parameterTypes = NULL;
uint64 totalRowsProcessed = 0;
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
/*
* Even if we are executing local tasks, we still enable
@ -218,6 +223,38 @@ ExecuteLocalTaskListExtended(List *taskList,
*/
UseCoordinatedTransaction();
LocalExecutorLevel++;
PG_TRY();
{
totalRowsProcessed = ExecuteLocalTaskListInternal(taskList, paramListInfo,
distributedPlan,
defaultTupleDest,
isUtilityCommand);
}
PG_CATCH();
{
LocalExecutorLevel--;
PG_RE_THROW();
}
PG_END_TRY();
LocalExecutorLevel--;
return totalRowsProcessed;
}
static uint64
ExecuteLocalTaskListInternal(List *taskList,
ParamListInfo paramListInfo,
DistributedPlan *distributedPlan,
TupleDestination *defaultTupleDest,
bool isUtilityCommand)
{
uint64 totalRowsProcessed = 0;
int numParams = 0;
Oid *parameterTypes = NULL;
if (paramListInfo != NULL)
{
/* not used anywhere, so declare here */

View File

@ -691,6 +691,18 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_manual_changes_to_shards",
gettext_noop("Enables dropping and truncating known shards."),
gettext_noop("Set to false by default. If set to true, enables "
"dropping and truncating shards on the coordinator "
"(or the workers with metadata)"),
&EnableManualChangesToShards,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomRealVariable(
"citus.distributed_deadlock_detection_factor",
gettext_noop("Sets the time to wait before checking for distributed "

View File

@ -14,6 +14,7 @@
#include "catalog/pg_class.h"
#include "distributed/metadata_cache.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/local_executor.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "nodes/nodeFuncs.h"
@ -23,6 +24,7 @@
/* Config variable managed via guc.c */
bool OverrideTableVisibility = true;
bool EnableManualChangesToShards = false;
static bool ReplaceTableVisibleFunctionWalker(Node *inputNode);
@ -112,10 +114,40 @@ ErrorIfRelationIsAKnownShard(Oid relationId)
}
const char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("relation \"%s\" is a shard relation ", relationName)));
}
/*
* ErrorIfIllegallyChangingKnownShard errors out if the relation with relationId is
* a known shard and manual changes on known shards are disabled. This is
* valid for only non-citus (external) connections.
*/
void
ErrorIfIllegallyChangingKnownShard(Oid relationId)
{
if (LocalExecutorLevel > 0 || IsCitusInitiatedRemoteBackend() ||
EnableManualChangesToShards)
{
return;
}
/* search the relation in all schemas */
bool onlySearchPath = true;
if (RelationIsAKnownShard(relationId, onlySearchPath))
{
const char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot modify \"%s\" because it is a shard of "
"a distributed table",
relationName),
errhint("Use the distributed table or set "
"citus.enable_manual_changes_to_shards to on "
"to modify shards directly")));
}
}
/*
* RelationIsAKnownShard gets a relationId, check whether it's a shard of
* any distributed table. If onlySearchPath is true, then it searches

View File

@ -19,6 +19,8 @@
extern bool EnableLocalExecution;
extern bool LogLocalCommands;
extern int LocalExecutorLevel;
typedef enum LocalExecutionStatus
{
LOCAL_EXECUTION_REQUIRED,

View File

@ -14,10 +14,12 @@
#include "nodes/nodes.h"
extern bool OverrideTableVisibility;
extern bool EnableManualChangesToShards;
extern void ReplaceTableVisibleFunction(Node *inputNode);
extern void ErrorIfRelationIsAKnownShard(Oid relationId);
extern void ErrorIfIllegallyChangingKnownShard(Oid relationId);
extern bool RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath);

View File

@ -1842,6 +1842,33 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co
1001 | 0
(1 row)
-- test disabling drop and truncate for known shards
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_disabling_drop_and_truncate (a int);
SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) ');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
(1 row)
SET citus.enable_manual_changes_to_shards TO off;
-- these should error out
DROP TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040;
RESET citus.shard_replication_factor;
DROP TABLE test_disabling_drop_and_truncate;
-- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table;

View File

@ -31,9 +31,9 @@ SELECT * FROM table_sizes;
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102041 | t
citus_local_102045 | t
ref | t
ref_102040 | t
ref_102044 | t
(4 rows)
-- verify that this UDF is noop on Citus local tables
@ -47,9 +47,9 @@ SELECT * FROM table_sizes;
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102041 | t
citus_local_102045 | t
ref | t
ref_102040 | t
ref_102044 | t
(4 rows)
-- test that we allow cascading truncates to citus local tables
@ -65,9 +65,9 @@ SELECT * FROM table_sizes;
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102041 | t
citus_local_102045 | t
ref | f
ref_102040 | t
ref_102044 | t
(4 rows)
ROLLBACK;
@ -98,14 +98,14 @@ SELECT * FROM table_sizes;
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102041 | t
citus_local_102045 | t
dist | f
dist_102043 | t
dist_102044 | t
dist_102045 | t
dist_102046 | t
dist_102047 | t
dist_102048 | t
dist_102049 | t
dist_102050 | t
ref | f
ref_102040 | t
ref_102044 | t
(9 rows)
ROLLBACK;
@ -121,14 +121,14 @@ SELECT * FROM table_sizes;
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102041 | t
citus_local_102045 | t
dist | f
dist_102043 | t
dist_102044 | t
dist_102045 | t
dist_102046 | t
dist_102047 | t
dist_102048 | t
dist_102049 | t
dist_102050 | t
ref | t
ref_102040 | t
ref_102044 | t
(9 rows)
ROLLBACK;

View File

@ -448,6 +448,7 @@ push(@pgOptions, "citus.sort_returning='on'");
push(@pgOptions, "citus.shard_replication_factor=2");
push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}");
push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
# we disable slow start by default to encourage parallelism within tests
push(@pgOptions, "citus.executor_slow_start_interval=0ms");

View File

@ -905,6 +905,25 @@ WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
-- test disabling drop and truncate for known shards
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_disabling_drop_and_truncate (a int);
SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a');
SET citus.enable_manual_changes_to_shards TO off;
-- these should error out
DROP TABLE test_disabling_drop_and_truncate_102040;
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040;
RESET citus.shard_replication_factor;
DROP TABLE test_disabling_drop_and_truncate;
-- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table;