mirror of https://github.com/citusdata/citus.git
Merge pull request #721 from citusdata/feature_truncate
Add truncate support for distributed tables cr: @jasonmp85pull/776/head
commit
33552c2f13
|
@ -7,7 +7,7 @@ MODULE_big = citus
|
|||
EXTENSION = citus
|
||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||
5.2-1
|
||||
5.2-1 5.2-2
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -52,6 +52,8 @@ $(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_truncate_trigger()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
SET search_path = 'pg_catalog'
|
||||
AS $cdbtt$
|
||||
DECLARE
|
||||
partitionType char;
|
||||
commandText text;
|
||||
BEGIN
|
||||
SELECT partmethod INTO partitionType
|
||||
FROM pg_dist_partition WHERE logicalrelid = TG_RELID;
|
||||
IF NOT FOUND THEN
|
||||
RETURN NEW;
|
||||
END IF;
|
||||
|
||||
IF (partitionType = 'a') THEN
|
||||
PERFORM master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME);
|
||||
ELSE
|
||||
SELECT format('TRUNCATE TABLE %I.%I CASCADE', TG_TABLE_SCHEMA, TG_TABLE_NAME)
|
||||
INTO commandText;
|
||||
PERFORM master_modify_multiple_shards(commandText);
|
||||
END IF;
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$cdbtt$;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '5.2-1'
|
||||
default_version = '5.2-2'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -23,8 +23,10 @@
|
|||
#include "catalog/pg_enum.h"
|
||||
#include "catalog/pg_extension.h"
|
||||
#include "catalog/pg_opclass.h"
|
||||
#include "catalog/pg_trigger.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
|
@ -36,6 +38,7 @@
|
|||
#include "parser/parse_expr.h"
|
||||
#include "parser/parse_node.h"
|
||||
#include "parser/parse_relation.h"
|
||||
#include "parser/parser.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
@ -51,7 +54,7 @@ static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
|||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||
int16 supportFunctionNumber);
|
||||
static bool LocalTableEmpty(Oid tableId);
|
||||
|
||||
static void CreateTruncateTrigger(Oid relationId);
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
||||
|
@ -297,6 +300,15 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
|||
heap_close(pgDistPartition, NoLock);
|
||||
relation_close(distributedRelation, NoLock);
|
||||
|
||||
/*
|
||||
* PostgreSQL supports truncate trigger for regular relations only.
|
||||
* Truncate on foreign tables is not supported.
|
||||
*/
|
||||
if (relationKind == RELKIND_RELATION)
|
||||
{
|
||||
CreateTruncateTrigger(distributedRelationId);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
@ -475,3 +487,33 @@ LocalTableEmpty(Oid tableId)
|
|||
|
||||
return localTableEmpty;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
|
||||
* and assigns citus_truncate_trigger() as handler.
|
||||
*/
|
||||
static void
|
||||
CreateTruncateTrigger(Oid relationId)
|
||||
{
|
||||
CreateTrigStmt *trigger = NULL;
|
||||
StringInfo triggerName = makeStringInfo();
|
||||
bool internal = true;
|
||||
|
||||
appendStringInfo(triggerName, "truncate_trigger");
|
||||
|
||||
trigger = makeNode(CreateTrigStmt);
|
||||
trigger->trigname = triggerName->data;
|
||||
trigger->relation = NULL;
|
||||
trigger->funcname = SystemFuncName("citus_truncate_trigger");
|
||||
trigger->args = NIL;
|
||||
trigger->row = false;
|
||||
trigger->timing = TRIGGER_TYPE_BEFORE;
|
||||
trigger->events = TRIGGER_TYPE_TRUNCATE;
|
||||
trigger->columns = NIL;
|
||||
trigger->whenClause = NULL;
|
||||
trigger->isconstraint = false;
|
||||
|
||||
CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
|
||||
internal);
|
||||
}
|
||||
|
|
|
@ -110,6 +110,7 @@ static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
|
|||
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
|
||||
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
||||
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
||||
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
||||
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
||||
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
||||
|
||||
|
@ -201,6 +202,11 @@ multi_ProcessUtility(Node *parsetree,
|
|||
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
|
||||
}
|
||||
|
||||
if (IsA(parsetree, TruncateStmt))
|
||||
{
|
||||
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
|
||||
}
|
||||
|
||||
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
|
||||
if (EnableDDLPropagation)
|
||||
{
|
||||
|
@ -1044,6 +1050,33 @@ ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedTruncateStmt errors out if the command attempts to
|
||||
* truncate a distributed foreign table.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
||||
{
|
||||
List *relationList = truncateStatement->relations;
|
||||
ListCell *relationCell = NULL;
|
||||
foreach(relationCell, relationList)
|
||||
{
|
||||
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, true);
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
if (IsDistributedTable(relationId) &&
|
||||
relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("truncating distributed foreign tables is "
|
||||
"currently unsupported"),
|
||||
errhint("Use master_drop_all_shards to remove "
|
||||
"foreign table's shards.")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
|
||||
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
|
||||
|
|
|
@ -104,10 +104,35 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
|||
relationId = RangeVarGetRelid(updateStatement->relation, NoLock, failOK);
|
||||
EnsureTablePermissions(relationId, ACL_UPDATE);
|
||||
}
|
||||
else if (IsA(queryTreeNode, TruncateStmt))
|
||||
{
|
||||
TruncateStmt *truncateStatement = (TruncateStmt *) queryTreeNode;
|
||||
List *relationList = truncateStatement->relations;
|
||||
RangeVar *rangeVar = NULL;
|
||||
|
||||
if (list_length(relationList) != 1)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("master_modify_multiple_shards() can truncate only "
|
||||
"one table")));
|
||||
}
|
||||
|
||||
rangeVar = (RangeVar *) linitial(relationList);
|
||||
relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
||||
if (rangeVar->schemaname == NULL)
|
||||
{
|
||||
Oid schemaOid = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaOid);
|
||||
rangeVar->schemaname = schemaName;
|
||||
}
|
||||
|
||||
EnsureTablePermissions(relationId, ACL_TRUNCATE);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("query \"%s\" is not a delete nor update statement",
|
||||
queryString)));
|
||||
ereport(ERROR, (errmsg("query \"%s\" is not a delete, update, or truncate "
|
||||
"statement", queryString)));
|
||||
}
|
||||
|
||||
CheckDistributedTable(relationId);
|
||||
|
@ -115,7 +140,10 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
|||
queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
|
||||
modifyQuery = (Query *) linitial(queryTreeList);
|
||||
|
||||
ErrorIfModifyQueryNotSupported(modifyQuery);
|
||||
if (modifyQuery->commandType != CMD_UTILITY)
|
||||
{
|
||||
ErrorIfModifyQueryNotSupported(modifyQuery);
|
||||
}
|
||||
|
||||
/* reject queries with a returning list */
|
||||
if (list_length(modifyQuery->returningList) > 0)
|
||||
|
@ -282,8 +310,17 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
|||
}
|
||||
|
||||
placementAffectedTupleString = PQcmdTuples(result);
|
||||
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
||||
sizeof(int32), 0);
|
||||
|
||||
/* returned tuple count is empty for utility commands, use 0 as affected count */
|
||||
if (*placementAffectedTupleString == '\0')
|
||||
{
|
||||
placementAffectedTupleCount = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
||||
sizeof(int32), 0);
|
||||
}
|
||||
|
||||
if ((shardAffectedTupleCount == -1) ||
|
||||
(shardAffectedTupleCount == placementAffectedTupleCount))
|
||||
|
|
|
@ -3398,6 +3398,42 @@ get_utility_query_def(Query *query, deparse_context *context)
|
|||
simple_quote_literal(buf, stmt->payload);
|
||||
}
|
||||
}
|
||||
else if (query->utilityStmt && IsA(query->utilityStmt, TruncateStmt))
|
||||
{
|
||||
TruncateStmt *stmt = (TruncateStmt *) query->utilityStmt;
|
||||
List *relationList = stmt->relations;
|
||||
ListCell *relationCell = NULL;
|
||||
|
||||
appendContextKeyword(context, "",
|
||||
0, PRETTYINDENT_STD, 1);
|
||||
|
||||
appendStringInfo(buf, "TRUNCATE TABLE");
|
||||
|
||||
foreach(relationCell, relationList)
|
||||
{
|
||||
RangeVar *relationVar = (RangeVar *) lfirst(relationCell);
|
||||
Oid relationId = RangeVarGetRelid(relationVar, NoLock, false);
|
||||
char *relationName = generate_relation_or_shard_name(relationId,
|
||||
context->distrelid,
|
||||
context->shardid, NIL);
|
||||
appendStringInfo(buf, " %s", relationName);
|
||||
|
||||
if (lnext(relationCell) != NULL)
|
||||
{
|
||||
appendStringInfo(buf, ",");
|
||||
}
|
||||
}
|
||||
|
||||
if (stmt->restart_seqs)
|
||||
{
|
||||
appendStringInfo(buf, " RESTART IDENTITY");
|
||||
}
|
||||
|
||||
if (stmt->behavior == DROP_CASCADE)
|
||||
{
|
||||
appendStringInfo(buf, " CASCADE");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Currently only NOTIFY utility commands can appear in rules */
|
||||
|
|
|
@ -1534,7 +1534,7 @@ DROP MATERIALIZED VIEW mv_articles_hash;
|
|||
DEBUG: drop auto-cascades to type mv_articles_hash
|
||||
DEBUG: drop auto-cascades to type mv_articles_hash[]
|
||||
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
|
||||
DEBUG: EventTriggerInvoke 16727
|
||||
DEBUG: EventTriggerInvoke 16729
|
||||
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
|
||||
SELECT * FROM articles_hash WHERE author_id in (1,2);
|
||||
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
||||
|
|
|
@ -0,0 +1,293 @@
|
|||
--
|
||||
-- MULTI_TRUNCATE
|
||||
--
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
|
||||
--
|
||||
-- truncate for append distribution
|
||||
-- expect all shards to be dropped
|
||||
--
|
||||
CREATE TABLE test_truncate_append(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_append;
|
||||
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_truncate_append values (1);
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- create some more shards
|
||||
SELECT master_create_empty_shard('test_truncate_append');
|
||||
master_create_empty_shard
|
||||
---------------------------
|
||||
1210001
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_empty_shard('test_truncate_append');
|
||||
master_create_empty_shard
|
||||
---------------------------
|
||||
1210002
|
||||
(1 row)
|
||||
|
||||
-- verify 3 shards are presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||
shardid
|
||||
---------
|
||||
1210000
|
||||
1210001
|
||||
1210002
|
||||
(3 rows)
|
||||
|
||||
TRUNCATE TABLE test_truncate_append;
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
count
|
||||
-------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- verify no shard exists anymore
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||
shardid
|
||||
---------
|
||||
(0 rows)
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
|
||||
ERROR: DROP distributed table cannot run inside a transaction block
|
||||
CONTEXT: SQL statement "SELECT master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME)"
|
||||
PL/pgSQL function citus_truncate_trigger() line 13 at PERFORM
|
||||
DROP TABLE test_truncate_append;
|
||||
--
|
||||
-- truncate for range distribution
|
||||
-- expect shard to be present, data to be truncated
|
||||
--
|
||||
CREATE TABLE test_truncate_range(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_range;
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_truncate_range values (1);
|
||||
INSERT INTO test_truncate_range values (1001);
|
||||
INSERT INTO test_truncate_range values (2000);
|
||||
INSERT INTO test_truncate_range values (100);
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
-- verify 3 shards are presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||
shardid
|
||||
---------
|
||||
1210003
|
||||
1210004
|
||||
1210005
|
||||
(3 rows)
|
||||
|
||||
TRUNCATE TABLE test_truncate_range;
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- verify 3 shards are still present
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||
shardid
|
||||
---------
|
||||
1210003
|
||||
1210004
|
||||
1210005
|
||||
(3 rows)
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT;
|
||||
ERROR: master_modify_multiple_shards cannot run inside a transaction block
|
||||
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)"
|
||||
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM
|
||||
DROP TABLE test_truncate_range;
|
||||
--
|
||||
-- truncate for hash distribution.
|
||||
-- expect shard to be present, data to be truncated
|
||||
--
|
||||
CREATE TABLE test_truncate_hash(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_truncate_hash values (1);
|
||||
ERROR: could not find any shards
|
||||
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||
HINT: Run master_create_worker_shards to create shards and try again.
|
||||
INSERT INTO test_truncate_hash values (1001);
|
||||
ERROR: could not find any shards
|
||||
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||
HINT: Run master_create_worker_shards to create shards and try again.
|
||||
INSERT INTO test_truncate_hash values (2000);
|
||||
ERROR: could not find any shards
|
||||
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||
HINT: Run master_create_worker_shards to create shards and try again.
|
||||
INSERT INTO test_truncate_hash values (100);
|
||||
ERROR: could not find any shards
|
||||
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||
HINT: Run master_create_worker_shards to create shards and try again.
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- verify 4 shards are present
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||
shardid
|
||||
---------
|
||||
(0 rows)
|
||||
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_truncate_hash values (1);
|
||||
INSERT INTO test_truncate_hash values (1001);
|
||||
INSERT INTO test_truncate_hash values (2000);
|
||||
INSERT INTO test_truncate_hash values (100);
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
count
|
||||
-------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- verify 4 shards are still presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||
shardid
|
||||
---------
|
||||
1210006
|
||||
1210007
|
||||
1210008
|
||||
1210009
|
||||
(4 rows)
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT;
|
||||
ERROR: master_modify_multiple_shards cannot run inside a transaction block
|
||||
CONTEXT: SQL statement "SELECT master_modify_multiple_shards(commandText)"
|
||||
PL/pgSQL function citus_truncate_trigger() line 17 at PERFORM
|
||||
DROP TABLE test_truncate_hash;
|
||||
-- test with table with spaces in it
|
||||
CREATE TABLE "a b hash" (a int, b int);
|
||||
SELECT master_create_distributed_table('"a b hash"', 'a', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('"a b hash"', 4, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO "a b hash" values (1, 0);
|
||||
SELECT * from "a b hash";
|
||||
a | b
|
||||
---+---
|
||||
1 | 0
|
||||
(1 row)
|
||||
|
||||
TRUNCATE TABLE "a b hash";
|
||||
SELECT * from "a b hash";
|
||||
a | b
|
||||
---+---
|
||||
(0 rows)
|
||||
|
||||
DROP TABLE "a b hash";
|
||||
-- now with append
|
||||
CREATE TABLE "a b append" (a int, b int);
|
||||
SELECT master_create_distributed_table('"a b append"', 'a', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_empty_shard('"a b append"') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
SELECT master_create_empty_shard('"a b append"') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||
WHERE shardid = :new_shard_id;
|
||||
INSERT INTO "a b append" values (1, 1);
|
||||
INSERT INTO "a b append" values (600, 600);
|
||||
SELECT * FROM "a b append" ORDER BY a;
|
||||
a | b
|
||||
-----+-----
|
||||
1 | 1
|
||||
600 | 600
|
||||
(2 rows)
|
||||
|
||||
TRUNCATE TABLE "a b append";
|
||||
-- verify all shards are dropped
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass;
|
||||
shardid
|
||||
---------
|
||||
(0 rows)
|
||||
|
||||
DROP TABLE "a b append";
|
|
@ -160,3 +160,8 @@ test: multi_schema_support
|
|||
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
|
||||
# ----------
|
||||
test: multi_function_evaluation
|
||||
|
||||
# ----------
|
||||
# multi_truncate tests truncate functionality for distributed tables
|
||||
# ----------
|
||||
test: multi_truncate
|
||||
|
|
|
@ -0,0 +1,176 @@
|
|||
--
|
||||
-- MULTI_TRUNCATE
|
||||
--
|
||||
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
|
||||
|
||||
--
|
||||
-- truncate for append distribution
|
||||
-- expect all shards to be dropped
|
||||
--
|
||||
CREATE TABLE test_truncate_append(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_append;
|
||||
|
||||
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
|
||||
INSERT INTO test_truncate_append values (1);
|
||||
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
|
||||
-- create some more shards
|
||||
SELECT master_create_empty_shard('test_truncate_append');
|
||||
SELECT master_create_empty_shard('test_truncate_append');
|
||||
|
||||
-- verify 3 shards are presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||
|
||||
TRUNCATE TABLE test_truncate_append;
|
||||
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_append;
|
||||
|
||||
-- verify no shard exists anymore
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_append; COMMIT;
|
||||
|
||||
DROP TABLE test_truncate_append;
|
||||
|
||||
--
|
||||
-- truncate for range distribution
|
||||
-- expect shard to be present, data to be truncated
|
||||
--
|
||||
CREATE TABLE test_truncate_range(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_range;
|
||||
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
|
||||
INSERT INTO test_truncate_range values (1);
|
||||
INSERT INTO test_truncate_range values (1001);
|
||||
INSERT INTO test_truncate_range values (2000);
|
||||
INSERT INTO test_truncate_range values (100);
|
||||
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
|
||||
-- verify 3 shards are presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||
|
||||
TRUNCATE TABLE test_truncate_range;
|
||||
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_range;
|
||||
|
||||
-- verify 3 shards are still present
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_range; COMMIT;
|
||||
|
||||
DROP TABLE test_truncate_range;
|
||||
|
||||
|
||||
--
|
||||
-- truncate for hash distribution.
|
||||
-- expect shard to be present, data to be truncated
|
||||
--
|
||||
CREATE TABLE test_truncate_hash(a int);
|
||||
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
|
||||
|
||||
-- verify no error is thrown when no shards are present
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
|
||||
INSERT INTO test_truncate_hash values (1);
|
||||
INSERT INTO test_truncate_hash values (1001);
|
||||
INSERT INTO test_truncate_hash values (2000);
|
||||
INSERT INTO test_truncate_hash values (100);
|
||||
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
|
||||
-- verify 4 shards are present
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
|
||||
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
|
||||
|
||||
INSERT INTO test_truncate_hash values (1);
|
||||
INSERT INTO test_truncate_hash values (1001);
|
||||
INSERT INTO test_truncate_hash values (2000);
|
||||
INSERT INTO test_truncate_hash values (100);
|
||||
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
|
||||
TRUNCATE TABLE test_truncate_hash;
|
||||
|
||||
-- verify data is truncated from the table
|
||||
SELECT count(*) FROM test_truncate_hash;
|
||||
|
||||
-- verify 4 shards are still presents
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||
|
||||
-- command can not be run inside transaction
|
||||
BEGIN; TRUNCATE TABLE test_truncate_hash; COMMIT;
|
||||
|
||||
DROP TABLE test_truncate_hash;
|
||||
|
||||
-- test with table with spaces in it
|
||||
CREATE TABLE "a b hash" (a int, b int);
|
||||
SELECT master_create_distributed_table('"a b hash"', 'a', 'hash');
|
||||
SELECT master_create_worker_shards('"a b hash"', 4, 1);
|
||||
INSERT INTO "a b hash" values (1, 0);
|
||||
SELECT * from "a b hash";
|
||||
TRUNCATE TABLE "a b hash";
|
||||
SELECT * from "a b hash";
|
||||
|
||||
DROP TABLE "a b hash";
|
||||
|
||||
-- now with append
|
||||
CREATE TABLE "a b append" (a int, b int);
|
||||
SELECT master_create_distributed_table('"a b append"', 'a', 'append');
|
||||
SELECT master_create_empty_shard('"a b append"') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
SELECT master_create_empty_shard('"a b append"') AS new_shard_id \gset
|
||||
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||
WHERE shardid = :new_shard_id;
|
||||
|
||||
INSERT INTO "a b append" values (1, 1);
|
||||
INSERT INTO "a b append" values (600, 600);
|
||||
|
||||
SELECT * FROM "a b append" ORDER BY a;
|
||||
|
||||
TRUNCATE TABLE "a b append";
|
||||
|
||||
-- verify all shards are dropped
|
||||
SELECT shardid FROM pg_dist_shard where logicalrelid = '"a b append"'::regclass;
|
||||
|
||||
DROP TABLE "a b append";
|
Loading…
Reference in New Issue