mirror of https://github.com/citusdata/citus.git
Merge pull request #2724 from citusdata/truncate_cleanup
Stop using master_modify_multiple_shards in TRUNCATEpull/2725/head
commit
7a2e3124f7
|
@ -19,12 +19,17 @@
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
|
static List * TruncateTaskList(Oid relationId);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(citus_truncate_trigger);
|
PG_FUNCTION_INFO_V1(citus_truncate_trigger);
|
||||||
|
|
||||||
|
@ -39,9 +44,6 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
TriggerData *triggerData = NULL;
|
TriggerData *triggerData = NULL;
|
||||||
Relation truncatedRelation = NULL;
|
Relation truncatedRelation = NULL;
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
char *relationName = NULL;
|
|
||||||
Oid schemaId = InvalidOid;
|
|
||||||
char *schemaName = NULL;
|
|
||||||
char partitionMethod = 0;
|
char partitionMethod = 0;
|
||||||
|
|
||||||
if (!CALLED_AS_TRIGGER(fcinfo))
|
if (!CALLED_AS_TRIGGER(fcinfo))
|
||||||
|
@ -53,9 +55,6 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
triggerData = (TriggerData *) fcinfo->context;
|
triggerData = (TriggerData *) fcinfo->context;
|
||||||
truncatedRelation = triggerData->tg_relation;
|
truncatedRelation = triggerData->tg_relation;
|
||||||
relationId = RelationGetRelid(truncatedRelation);
|
relationId = RelationGetRelid(truncatedRelation);
|
||||||
relationName = get_rel_name(relationId);
|
|
||||||
schemaId = get_rel_namespace(relationId);
|
|
||||||
schemaName = get_namespace_name(schemaId);
|
|
||||||
partitionMethod = PartitionMethod(relationId);
|
partitionMethod = PartitionMethod(relationId);
|
||||||
|
|
||||||
if (!EnableDDLPropagation)
|
if (!EnableDDLPropagation)
|
||||||
|
@ -65,6 +64,10 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||||
{
|
{
|
||||||
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
DirectFunctionCall3(master_drop_all_shards,
|
DirectFunctionCall3(master_drop_all_shards,
|
||||||
ObjectIdGetDatum(relationId),
|
ObjectIdGetDatum(relationId),
|
||||||
CStringGetTextDatum(relationName),
|
CStringGetTextDatum(relationName),
|
||||||
|
@ -72,15 +75,68 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
StringInfo truncateStatement = makeStringInfo();
|
List *taskList = TruncateTaskList(relationId);
|
||||||
char *qualifiedTableName = quote_qualified_identifier(schemaName, relationName);
|
|
||||||
|
|
||||||
appendStringInfo(truncateStatement, "TRUNCATE TABLE %s CASCADE",
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
qualifiedTableName);
|
{
|
||||||
|
ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY);
|
||||||
DirectFunctionCall1(master_modify_multiple_shards,
|
}
|
||||||
CStringGetTextDatum(truncateStatement->data));
|
else
|
||||||
|
{
|
||||||
|
ExecuteModifyTasksWithoutResults(taskList);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TruncateTaskList returns a list of tasks to execute a TRUNCATE on a
|
||||||
|
* distributed table. This is handled separately from other DDL commands
|
||||||
|
* because we handle it via the TRUNCATE trigger, which is called whenever
|
||||||
|
* a truncate cascades.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
TruncateTaskList(Oid relationId)
|
||||||
|
{
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
List *taskList = NIL;
|
||||||
|
int taskId = 1;
|
||||||
|
|
||||||
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
|
/* lock metadata before getting placement lists */
|
||||||
|
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||||
|
|
||||||
|
foreach(shardIntervalCell, shardIntervalList)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
StringInfo shardQueryString = makeStringInfo();
|
||||||
|
Task *task = NULL;
|
||||||
|
char *shardName = pstrdup(relationName);
|
||||||
|
|
||||||
|
AppendShardIdToName(&shardName, shardId);
|
||||||
|
|
||||||
|
appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE",
|
||||||
|
quote_qualified_identifier(schemaName, shardName));
|
||||||
|
|
||||||
|
task = CitusMakeNode(Task);
|
||||||
|
task->jobId = INVALID_JOB_ID;
|
||||||
|
task->taskId = taskId++;
|
||||||
|
task->taskType = DDL_TASK;
|
||||||
|
task->queryString = shardQueryString->data;
|
||||||
|
task->dependedTaskList = NULL;
|
||||||
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
|
task->anchorShardId = shardId;
|
||||||
|
task->taskPlacementList = FinalizedShardPlacementList(shardId);
|
||||||
|
|
||||||
|
taskList = lappend(taskList, task);
|
||||||
|
}
|
||||||
|
|
||||||
|
return taskList;
|
||||||
|
}
|
||||||
|
|
|
@ -1111,6 +1111,16 @@ SELECT create_distributed_table('"CiTUS.TEEN2"."CAPITAL_TABLE"', 'i');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- truncate tables with weird names
|
||||||
|
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1);
|
||||||
|
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1);
|
||||||
|
TRUNCATE "CiTuS.TeeN"."TeeNTabLE.1!?!", "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||||
|
SELECT count(*) FROM "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- insert into table with weird names
|
-- insert into table with weird names
|
||||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
||||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
-- MULTI_TRUNCATE
|
-- MULTI_TRUNCATE
|
||||||
--
|
--
|
||||||
SET citus.next_shard_id TO 1210000;
|
SET citus.next_shard_id TO 1210000;
|
||||||
|
CREATE SCHEMA multi_truncate;
|
||||||
|
SET search_path TO multi_truncate;
|
||||||
--
|
--
|
||||||
-- truncate for append distribution
|
-- truncate for append distribution
|
||||||
-- expect all shards to be dropped
|
-- expect all shards to be dropped
|
||||||
|
@ -362,4 +364,5 @@ SELECT * FROM test_local_truncate;
|
||||||
1 | 2
|
1 | 2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE test_local_truncate;
|
DROP SCHEMA multi_truncate CASCADE;
|
||||||
|
NOTICE: drop cascades to table test_local_truncate
|
||||||
|
|
|
@ -798,6 +798,12 @@ CREATE TABLE "CiTUS.TEEN2"."CAPITAL_TABLE"(i int, j int);
|
||||||
SELECT create_distributed_table('"CiTuS.TeeN"."TeeNTabLE.1!?!"', 'TeNANt_Id');
|
SELECT create_distributed_table('"CiTuS.TeeN"."TeeNTabLE.1!?!"', 'TeNANt_Id');
|
||||||
SELECT create_distributed_table('"CiTUS.TEEN2"."CAPITAL_TABLE"', 'i');
|
SELECT create_distributed_table('"CiTUS.TEEN2"."CAPITAL_TABLE"', 'i');
|
||||||
|
|
||||||
|
-- truncate tables with weird names
|
||||||
|
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1);
|
||||||
|
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1);
|
||||||
|
TRUNCATE "CiTuS.TeeN"."TeeNTabLE.1!?!", "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||||
|
SELECT count(*) FROM "CiTUS.TEEN2"."CAPITAL_TABLE";
|
||||||
|
|
||||||
-- insert into table with weird names
|
-- insert into table with weird names
|
||||||
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
INSERT INTO "CiTuS.TeeN"."TeeNTabLE.1!?!" VALUES(1, 1),(1, 0),(0, 1),(2, 3),(3, 2),(4, 4);
|
||||||
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
INSERT INTO "CiTUS.TEEN2"."CAPITAL_TABLE" VALUES(0, 1),(1, 0),(2, 1),(4, 3),(3, 2),(4, 4);
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
|
|
||||||
SET citus.next_shard_id TO 1210000;
|
SET citus.next_shard_id TO 1210000;
|
||||||
|
CREATE SCHEMA multi_truncate;
|
||||||
|
SET search_path TO multi_truncate;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- truncate for append distribution
|
-- truncate for append distribution
|
||||||
|
@ -220,4 +222,4 @@ DELETE FROM pg_dist_partition WHERE logicalrelid = 'test_local_truncate'::regcla
|
||||||
-- Ensure local data is not truncated
|
-- Ensure local data is not truncated
|
||||||
SELECT * FROM test_local_truncate;
|
SELECT * FROM test_local_truncate;
|
||||||
|
|
||||||
DROP TABLE test_local_truncate;
|
DROP SCHEMA multi_truncate CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue