mirror of https://github.com/citusdata/citus.git
Stop using master_modify_multiple_shards in TRUNCATE
parent
59e54de54d
commit
7fa5d36057
|
@ -19,12 +19,17 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
|
||||
static List * TruncateTaskList(Oid relationId);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(citus_truncate_trigger);
|
||||
|
||||
|
@ -39,9 +44,6 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
|||
TriggerData *triggerData = NULL;
|
||||
Relation truncatedRelation = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
char *relationName = NULL;
|
||||
Oid schemaId = InvalidOid;
|
||||
char *schemaName = NULL;
|
||||
char partitionMethod = 0;
|
||||
|
||||
if (!CALLED_AS_TRIGGER(fcinfo))
|
||||
|
@ -53,9 +55,6 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
|||
triggerData = (TriggerData *) fcinfo->context;
|
||||
truncatedRelation = triggerData->tg_relation;
|
||||
relationId = RelationGetRelid(truncatedRelation);
|
||||
relationName = get_rel_name(relationId);
|
||||
schemaId = get_rel_namespace(relationId);
|
||||
schemaName = get_namespace_name(schemaId);
|
||||
partitionMethod = PartitionMethod(relationId);
|
||||
|
||||
if (!EnableDDLPropagation)
|
||||
|
@ -65,6 +64,10 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
|||
|
||||
if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
||||
DirectFunctionCall3(master_drop_all_shards,
|
||||
ObjectIdGetDatum(relationId),
|
||||
CStringGetTextDatum(relationName),
|
||||
|
@ -72,15 +75,68 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
|||
}
|
||||
else
|
||||
{
|
||||
StringInfo truncateStatement = makeStringInfo();
|
||||
char *qualifiedTableName = quote_qualified_identifier(schemaName, relationName);
|
||||
List *taskList = TruncateTaskList(relationId);
|
||||
|
||||
appendStringInfo(truncateStatement, "TRUNCATE TABLE %s CASCADE",
|
||||
qualifiedTableName);
|
||||
|
||||
DirectFunctionCall1(master_modify_multiple_shards,
|
||||
CStringGetTextDatum(truncateStatement->data));
|
||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||
{
|
||||
ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY);
|
||||
}
|
||||
else
|
||||
{
|
||||
ExecuteModifyTasksWithoutResults(taskList);
|
||||
}
|
||||
}
|
||||
|
||||
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TruncateTaskList returns a list of tasks to execute a TRUNCATE on a
|
||||
* distributed table. This is handled separately from other DDL commands
|
||||
* because we handle it via the TRUNCATE trigger, which is called whenever
|
||||
* a truncate cascades.
|
||||
*/
|
||||
static List *
|
||||
TruncateTaskList(Oid relationId)
|
||||
{
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
List *taskList = NIL;
|
||||
int taskId = 1;
|
||||
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
||||
/* lock metadata before getting placement lists */
|
||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
StringInfo shardQueryString = makeStringInfo();
|
||||
Task *task = NULL;
|
||||
char *shardName = pstrdup(relationName);
|
||||
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
|
||||
appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE",
|
||||
quote_qualified_identifier(schemaName, shardName));
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
task->taskId = taskId++;
|
||||
task->taskType = DDL_TASK;
|
||||
task->queryString = shardQueryString->data;
|
||||
task->dependedTaskList = NULL;
|
||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||
task->anchorShardId = shardId;
|
||||
task->taskPlacementList = FinalizedShardPlacementList(shardId);
|
||||
|
||||
taskList = lappend(taskList, task);
|
||||
}
|
||||
|
||||
return taskList;
|
||||
}
|
||||
|
|
|
@ -1111,6 +1111,16 @@ SELECT create_distributed_table('"CiTUS.TEEN2"."CAPITAL_TABLE"', 'i');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- truncate tables with weird names
|
||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1);
|
||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1);
|
||||
TRUNCATE "CiTuS.TeeN"."TeeNTabLE.1!?!", "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||
SELECT count(*) FROM "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- insert into table with weird names
|
||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
-- MULTI_TRUNCATE
|
||||
--
|
||||
SET citus.next_shard_id TO 1210000;
|
||||
CREATE SCHEMA multi_truncate;
|
||||
SET search_path TO multi_truncate;
|
||||
--
|
||||
-- truncate for append distribution
|
||||
-- expect all shards to be dropped
|
||||
|
@ -362,4 +364,5 @@ SELECT * FROM test_local_truncate;
|
|||
1 | 2
|
||||
(1 row)
|
||||
|
||||
DROP TABLE test_local_truncate;
|
||||
DROP SCHEMA multi_truncate CASCADE;
|
||||
NOTICE: drop cascades to table test_local_truncate
|
||||
|
|
|
@ -798,6 +798,12 @@ CREATE TABLE "CiTUS.TEEN2"."CAPITAL_TABLE"(i int, j int);
|
|||
SELECT create_distributed_table('"CiTuS.TeeN"."TeeNTabLE.1!?!"', 'TeNANt_Id');
|
||||
SELECT create_distributed_table('"CiTUS.TEEN2"."CAPITAL_TABLE"', 'i');
|
||||
|
||||
-- truncate tables with weird names
|
||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1);
|
||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1);
|
||||
TRUNCATE "CiTuS.TeeN"."TeeNTabLE.1!?!", "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||
SELECT count(*) FROM "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||
|
||||
-- insert into table with weird names
|
||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
|
||||
SET citus.next_shard_id TO 1210000;
|
||||
CREATE SCHEMA multi_truncate;
|
||||
SET search_path TO multi_truncate;
|
||||
|
||||
--
|
||||
-- truncate for append distribution
|
||||
|
@ -220,4 +222,4 @@ DELETE FROM pg_dist_partition WHERE logicalrelid = 'test_local_truncate'::regcla
|
|||
-- Ensure local data is not truncated
|
||||
SELECT * FROM test_local_truncate;
|
||||
|
||||
DROP TABLE test_local_truncate;
|
||||
DROP SCHEMA multi_truncate CASCADE;
|
||||
|
|
Loading…
Reference in New Issue