mirror of https://github.com/citusdata/citus.git
Add support for truncate statement
parent
ad9d0bb69c
commit
2eec0167be
|
@ -7,7 +7,7 @@ MODULE_big = citus
|
||||||
EXTENSION = citus
|
EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1
|
5.2-1 5.2-2
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -51,8 +51,11 @@ $(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql
|
||||||
$(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
|
$(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql
|
$(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
SHLIB_LINK = $(libpq)
|
SHLIB_LINK = $(libpq)
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_truncate_trigger()
|
||||||
|
RETURNS trigger
|
||||||
|
LANGUAGE plpgsql
|
||||||
|
SET search_path = 'pg_catalog'
|
||||||
|
AS $cdbtt$
|
||||||
|
DECLARE
|
||||||
|
partitionType char;
|
||||||
|
commandText text;
|
||||||
|
BEGIN
|
||||||
|
SELECT partmethod INTO partitionType
|
||||||
|
FROM pg_dist_partition WHERE logicalrelid = TG_RELID;
|
||||||
|
IF FOUND THEN
|
||||||
|
IF (partitionType = 'a') THEN
|
||||||
|
PERFORM master_drop_all_shards(TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME);
|
||||||
|
ELSE
|
||||||
|
SELECT format('truncate table %s.%s', TG_TABLE_SCHEMA, TG_TABLE_NAME)
|
||||||
|
INTO commandText;
|
||||||
|
PERFORM master_modify_multiple_shards(commandText);
|
||||||
|
END IF;
|
||||||
|
END IF;
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$cdbtt$;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '5.2-1'
|
default_version = '5.2-2'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -23,8 +23,10 @@
|
||||||
#include "catalog/pg_enum.h"
|
#include "catalog/pg_enum.h"
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "catalog/pg_opclass.h"
|
#include "catalog/pg_opclass.h"
|
||||||
|
#include "catalog/pg_trigger.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
|
#include "commands/trigger.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
|
@ -36,6 +38,7 @@
|
||||||
#include "parser/parse_expr.h"
|
#include "parser/parse_expr.h"
|
||||||
#include "parser/parse_node.h"
|
#include "parser/parse_node.h"
|
||||||
#include "parser/parse_relation.h"
|
#include "parser/parse_relation.h"
|
||||||
|
#include "parser/parser.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -51,6 +54,7 @@ static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
||||||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||||
int16 supportFunctionNumber);
|
int16 supportFunctionNumber);
|
||||||
static bool LocalTableEmpty(Oid tableId);
|
static bool LocalTableEmpty(Oid tableId);
|
||||||
|
static void CreateTruncateTrigger(Oid relationId, char *qualifiedRelationName);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -78,6 +82,9 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
TupleDesc relationDesc = NULL;
|
TupleDesc relationDesc = NULL;
|
||||||
char *distributedRelationName = NULL;
|
char *distributedRelationName = NULL;
|
||||||
|
Oid distributedRelationSchemaOid = InvalidOid;
|
||||||
|
char *distributedRelationSchema = NULL;
|
||||||
|
char *qualifiedRelationName = NULL;
|
||||||
char relationKind = '\0';
|
char relationKind = '\0';
|
||||||
|
|
||||||
Relation pgDistPartition = NULL;
|
Relation pgDistPartition = NULL;
|
||||||
|
@ -102,6 +109,11 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock);
|
distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock);
|
||||||
relationDesc = RelationGetDescr(distributedRelation);
|
relationDesc = RelationGetDescr(distributedRelation);
|
||||||
distributedRelationName = RelationGetRelationName(distributedRelation);
|
distributedRelationName = RelationGetRelationName(distributedRelation);
|
||||||
|
distributedRelationSchemaOid = RelationGetNamespace(distributedRelation);
|
||||||
|
distributedRelationSchema = get_namespace_name(distributedRelationSchemaOid);
|
||||||
|
|
||||||
|
qualifiedRelationName = quote_qualified_identifier(distributedRelationSchema,
|
||||||
|
distributedRelationName);
|
||||||
|
|
||||||
EnsureTableOwner(distributedRelationId);
|
EnsureTableOwner(distributedRelationId);
|
||||||
|
|
||||||
|
@ -297,6 +309,15 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
heap_close(pgDistPartition, NoLock);
|
heap_close(pgDistPartition, NoLock);
|
||||||
relation_close(distributedRelation, NoLock);
|
relation_close(distributedRelation, NoLock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PostgreSQL supports truncate trigger for regular relations only.
|
||||||
|
* Truncate on foreign tables is not supported.
|
||||||
|
*/
|
||||||
|
if (relationKind == RELKIND_RELATION)
|
||||||
|
{
|
||||||
|
CreateTruncateTrigger(distributedRelationId, qualifiedRelationName);
|
||||||
|
}
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,3 +496,32 @@ LocalTableEmpty(Oid tableId)
|
||||||
|
|
||||||
return localTableEmpty;
|
return localTableEmpty;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
|
||||||
|
* and assigns citus_truncate_trigger() as handler. The new trigger is named as
|
||||||
|
* citus_truncate_trigger_on_ + schemaName.tableName. Trigger name for relation my_table
|
||||||
|
* from schema my_schema will be citus_truncate_trigger_on_my_schema.my_table to prevent
|
||||||
|
* name conflicts.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CreateTruncateTrigger(Oid relationId, char *qualifiedRelationName)
|
||||||
|
{
|
||||||
|
CreateTrigStmt *trigger = NULL;
|
||||||
|
StringInfo triggerName = makeStringInfo();
|
||||||
|
appendStringInfo(triggerName, "citus_truncate_trigger_on_%s", qualifiedRelationName);
|
||||||
|
|
||||||
|
trigger = makeNode(CreateTrigStmt);
|
||||||
|
trigger->trigname = triggerName->data;
|
||||||
|
trigger->relation = NULL;
|
||||||
|
trigger->funcname = SystemFuncName("citus_truncate_trigger");
|
||||||
|
trigger->args = NIL;
|
||||||
|
trigger->row = false;
|
||||||
|
trigger->timing = TRIGGER_TYPE_BEFORE;
|
||||||
|
trigger->events = TRIGGER_TYPE_TRUNCATE;
|
||||||
|
trigger->columns = NIL;
|
||||||
|
trigger->whenClause = NULL;
|
||||||
|
trigger->isconstraint = false;
|
||||||
|
|
||||||
|
CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, false);
|
||||||
|
}
|
||||||
|
|
|
@ -110,6 +110,7 @@ static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
|
||||||
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
|
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
|
||||||
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
||||||
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
||||||
|
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
||||||
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
||||||
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
||||||
|
|
||||||
|
@ -201,6 +202,11 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
|
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsA(parsetree, TruncateStmt))
|
||||||
|
{
|
||||||
|
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
|
||||||
|
}
|
||||||
|
|
||||||
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
|
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
|
||||||
if (EnableDDLPropagation)
|
if (EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
|
@ -1044,6 +1050,33 @@ ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfUnsupportedTruncateStmt errors out if the command attempts to
|
||||||
|
* truncate a distributed foreign table.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
||||||
|
{
|
||||||
|
List *relationList = truncateStatement->relations;
|
||||||
|
ListCell *relationCell = NULL;
|
||||||
|
foreach(relationCell, relationList)
|
||||||
|
{
|
||||||
|
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||||
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, true);
|
||||||
|
char relationKind = get_rel_relkind(relationId);
|
||||||
|
if (IsDistributedTable(relationId) &&
|
||||||
|
relationKind == RELKIND_FOREIGN_TABLE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("truncating distributed foreign tables is "
|
||||||
|
"currently unsupported"),
|
||||||
|
errhint("Use master_drop_all_shards to remove "
|
||||||
|
"foreign table's shards.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
|
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
|
||||||
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
|
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
|
||||||
|
|
|
@ -59,7 +59,10 @@ static void LockShardsForModify(List *shardIntervalList);
|
||||||
static bool HasReplication(List *shardIntervalList);
|
static bool HasReplication(List *shardIntervalList);
|
||||||
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
|
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
|
||||||
static int SendQueryToPlacements(char *shardQueryString,
|
static int SendQueryToPlacements(char *shardQueryString,
|
||||||
ShardConnections *shardConnections);
|
ShardConnections *shardConnections,
|
||||||
|
bool returnTupleCount);
|
||||||
|
static void deparse_truncate_query(Query *query, Oid distrelid, int64 shardid, StringInfo
|
||||||
|
buffer);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
||||||
|
|
||||||
|
@ -88,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
List *shardIntervalList = NIL;
|
List *shardIntervalList = NIL;
|
||||||
List *prunedShardIntervalList = NIL;
|
List *prunedShardIntervalList = NIL;
|
||||||
int32 affectedTupleCount = 0;
|
int32 affectedTupleCount = 0;
|
||||||
|
bool validateModifyQuery = true;
|
||||||
|
|
||||||
PreventTransactionChain(isTopLevel, "master_modify_multiple_shards");
|
PreventTransactionChain(isTopLevel, "master_modify_multiple_shards");
|
||||||
|
|
||||||
|
@ -104,10 +108,29 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
relationId = RangeVarGetRelid(updateStatement->relation, NoLock, failOK);
|
relationId = RangeVarGetRelid(updateStatement->relation, NoLock, failOK);
|
||||||
EnsureTablePermissions(relationId, ACL_UPDATE);
|
EnsureTablePermissions(relationId, ACL_UPDATE);
|
||||||
}
|
}
|
||||||
|
else if (IsA(queryTreeNode, TruncateStmt))
|
||||||
|
{
|
||||||
|
TruncateStmt *truncateStatement = (TruncateStmt *) queryTreeNode;
|
||||||
|
List *relationList = truncateStatement->relations;
|
||||||
|
RangeVar *rangeVar = NULL;
|
||||||
|
|
||||||
|
if (list_length(relationList) != 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("master_modify_multiple_shards() can truncate only "
|
||||||
|
"one table")));
|
||||||
|
}
|
||||||
|
|
||||||
|
rangeVar = (RangeVar *) linitial(relationList);
|
||||||
|
relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
||||||
|
EnsureTablePermissions(relationId, ACL_TRUNCATE);
|
||||||
|
validateModifyQuery = false;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("query \"%s\" is not a delete nor update statement",
|
ereport(ERROR, (errmsg("query \"%s\" is not a delete, update, or truncate "
|
||||||
queryString)));
|
"statement", queryString)));
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckDistributedTable(relationId);
|
CheckDistributedTable(relationId);
|
||||||
|
@ -115,7 +138,10 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
|
queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
|
||||||
modifyQuery = (Query *) linitial(queryTreeList);
|
modifyQuery = (Query *) linitial(queryTreeList);
|
||||||
|
|
||||||
ErrorIfModifyQueryNotSupported(modifyQuery);
|
if (validateModifyQuery)
|
||||||
|
{
|
||||||
|
ErrorIfModifyQueryNotSupported(modifyQuery);
|
||||||
|
}
|
||||||
|
|
||||||
/* reject queries with a returning list */
|
/* reject queries with a returning list */
|
||||||
if (list_length(modifyQuery->returningList) > 0)
|
if (list_length(modifyQuery->returningList) > 0)
|
||||||
|
@ -215,9 +241,17 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
int affectedTupleCount = 0;
|
int affectedTupleCount = 0;
|
||||||
char *relationOwner = TableOwner(relationId);
|
char *relationOwner = TableOwner(relationId);
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
bool truncateCommand = false;
|
||||||
|
bool requestTupleCount = true;
|
||||||
|
|
||||||
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
OpenTransactionsToAllShardPlacements(shardIntervalList, relationOwner);
|
||||||
|
|
||||||
|
if (query->commandType == CMD_UTILITY && IsA(query->utilityStmt, TruncateStmt))
|
||||||
|
{
|
||||||
|
truncateCommand = true;
|
||||||
|
requestTupleCount = false;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(shardIntervalCell, shardIntervalList)
|
foreach(shardIntervalCell, shardIntervalList)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(
|
||||||
|
@ -233,10 +267,19 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
||||||
Assert(shardConnectionsFound);
|
Assert(shardConnectionsFound);
|
||||||
|
|
||||||
deparse_shard_query(query, relationId, shardId, shardQueryString);
|
if (truncateCommand)
|
||||||
|
{
|
||||||
|
deparse_truncate_query(query, relationId, shardId, shardQueryString);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
deparse_shard_query(query, relationId, shardId, shardQueryString);
|
||||||
|
}
|
||||||
|
|
||||||
shardQueryStringData = shardQueryString->data;
|
shardQueryStringData = shardQueryString->data;
|
||||||
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
|
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
|
||||||
shardConnections);
|
shardConnections,
|
||||||
|
requestTupleCount);
|
||||||
affectedTupleCount += shardAffectedTupleCount;
|
affectedTupleCount += shardAffectedTupleCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -247,13 +290,41 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* deparse_truncate_query creates sql representation of a truncate statement. The
|
||||||
|
* function only generated basic truncate statement of the form
|
||||||
|
* 'truncate table <table_name>' it ignores all options. It also assumes that
|
||||||
|
* there is only one relation in the relation list.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
deparse_truncate_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer)
|
||||||
|
{
|
||||||
|
TruncateStmt *truncateStatement = NULL;
|
||||||
|
RangeVar *relation = NULL;
|
||||||
|
char *qualifiedName = NULL;
|
||||||
|
|
||||||
|
Assert(query->commandType == CMD_UTILITY);
|
||||||
|
Assert(IsA(query->utilityStmt, TruncateStmt));
|
||||||
|
|
||||||
|
truncateStatement = (TruncateStmt *) query->utilityStmt;
|
||||||
|
|
||||||
|
Assert(list_length(truncateStatement->relations) == 1);
|
||||||
|
|
||||||
|
relation = (RangeVar *) linitial(truncateStatement->relations);
|
||||||
|
qualifiedName = quote_qualified_identifier(relation->schemaname,
|
||||||
|
relation->relname);
|
||||||
|
appendStringInfo(buffer, "TRUNCATE TABLE %s_" UINT64_FORMAT, qualifiedName, shardid);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendQueryToPlacements sends the given query string to all given placement
|
* SendQueryToPlacements sends the given query string to all given placement
|
||||||
* connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
|
* connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
|
||||||
* should be called after all queries have been sent successfully.
|
* should be called after all queries have been sent successfully.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
|
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections,
|
||||||
|
bool returnTupleCount)
|
||||||
{
|
{
|
||||||
uint64 shardId = shardConnections->shardId;
|
uint64 shardId = shardConnections->shardId;
|
||||||
List *connectionList = shardConnections->connectionList;
|
List *connectionList = shardConnections->connectionList;
|
||||||
|
@ -262,6 +333,11 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
|
|
||||||
Assert(connectionList != NIL);
|
Assert(connectionList != NIL);
|
||||||
|
|
||||||
|
if (!returnTupleCount)
|
||||||
|
{
|
||||||
|
shardAffectedTupleCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(connectionCell, connectionList)
|
foreach(connectionCell, connectionList)
|
||||||
{
|
{
|
||||||
TransactionConnection *transactionConnection =
|
TransactionConnection *transactionConnection =
|
||||||
|
@ -282,21 +358,25 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
|
||||||
}
|
}
|
||||||
|
|
||||||
placementAffectedTupleString = PQcmdTuples(result);
|
placementAffectedTupleString = PQcmdTuples(result);
|
||||||
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
|
||||||
sizeof(int32), 0);
|
|
||||||
|
|
||||||
if ((shardAffectedTupleCount == -1) ||
|
if (returnTupleCount)
|
||||||
(shardAffectedTupleCount == placementAffectedTupleCount))
|
|
||||||
{
|
{
|
||||||
shardAffectedTupleCount = placementAffectedTupleCount;
|
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
|
||||||
}
|
sizeof(int32), 0);
|
||||||
else
|
|
||||||
{
|
if ((shardAffectedTupleCount == -1) ||
|
||||||
ereport(ERROR,
|
(shardAffectedTupleCount == placementAffectedTupleCount))
|
||||||
(errmsg("modified %d tuples, but expected to modify %d",
|
{
|
||||||
placementAffectedTupleCount, shardAffectedTupleCount),
|
shardAffectedTupleCount = placementAffectedTupleCount;
|
||||||
errdetail("Affected tuple counts at placements of shard "
|
}
|
||||||
UINT64_FORMAT " are different.", shardId)));
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errmsg("modified %d tuples, but expected to modify %d",
|
||||||
|
placementAffectedTupleCount, shardAffectedTupleCount),
|
||||||
|
errdetail("Affected tuple counts at placements of shard "
|
||||||
|
UINT64_FORMAT " are different.", shardId)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
|
@ -172,6 +172,8 @@ COMMIT;
|
||||||
--------+--------+-----------
|
--------+--------+-----------
|
||||||
id | bigint | not null
|
id | bigint | not null
|
||||||
name | text | not null
|
name | text | not null
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.labs" BEFORE TRUNCATE ON labs FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
SELECT * FROM labs WHERE id = 6;
|
SELECT * FROM labs WHERE id = 6;
|
||||||
id | name
|
id | name
|
||||||
|
@ -223,6 +225,8 @@ COMMIT;
|
||||||
--------+--------+-----------
|
--------+--------+-----------
|
||||||
id | bigint | not null
|
id | bigint | not null
|
||||||
name | text | not null
|
name | text | not null
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.labs" BEFORE TRUNCATE ON labs FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
SELECT * FROM labs WHERE id = 12;
|
SELECT * FROM labs WHERE id = 12;
|
||||||
id | name
|
id | name
|
||||||
|
@ -242,6 +246,8 @@ COMMIT;
|
||||||
--------+--------+-----------
|
--------+--------+-----------
|
||||||
id | bigint | not null
|
id | bigint | not null
|
||||||
name | text | not null
|
name | text | not null
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.labs" BEFORE TRUNCATE ON labs FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
SELECT * FROM labs WHERE id = 12;
|
SELECT * FROM labs WHERE id = 12;
|
||||||
id | name
|
id | name
|
||||||
|
|
|
@ -1534,7 +1534,7 @@ DROP MATERIALIZED VIEW mv_articles_hash;
|
||||||
DEBUG: drop auto-cascades to type mv_articles_hash
|
DEBUG: drop auto-cascades to type mv_articles_hash
|
||||||
DEBUG: drop auto-cascades to type mv_articles_hash[]
|
DEBUG: drop auto-cascades to type mv_articles_hash[]
|
||||||
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
|
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
|
||||||
DEBUG: EventTriggerInvoke 16727
|
DEBUG: EventTriggerInvoke 16729
|
||||||
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
|
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
|
||||||
SELECT * FROM articles_hash WHERE author_id in (1,2);
|
SELECT * FROM articles_hash WHERE author_id in (1,2);
|
||||||
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
|
||||||
|
|
|
@ -572,6 +572,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
new_col | integer |
|
new_col | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON test_schema_support.nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -599,6 +601,8 @@ ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
|
||||||
n_name | character(25) | not null
|
n_name | character(25) | not null
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON test_schema_support.nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -626,6 +630,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
new_col | integer |
|
new_col | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -654,6 +660,8 @@ ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
|
||||||
n_name | character(25) | not null
|
n_name | character(25) | not null
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -683,6 +691,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
Indexes:
|
Indexes:
|
||||||
"index1" btree (n_name)
|
"index1" btree (n_name)
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON test_schema_support.nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -710,6 +720,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_name | character(25) | not null
|
n_name | character(25) | not null
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON test_schema_support.nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -739,6 +751,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
Indexes:
|
Indexes:
|
||||||
"index1" btree (n_name)
|
"index1" btree (n_name)
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
@ -767,6 +781,8 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
n_name | character(25) | not null
|
n_name | character(25) | not null
|
||||||
n_regionkey | integer | not null
|
n_regionkey | integer | not null
|
||||||
n_comment | character varying(152) |
|
n_comment | character varying(152) |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_test_schema_support.nation_hash" BEFORE TRUNCATE ON nation_hash FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
\d test_schema_support.nation_hash_1190003;
|
\d test_schema_support.nation_hash_1190003;
|
||||||
|
|
|
@ -0,0 +1,219 @@
|
||||||
|
--
|
||||||
|
-- MULTI_TRUNCATE
|
||||||
|
--
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
|
||||||
|
--
|
||||||
|
-- truncate for append distribution
|
||||||
|
-- expect all shards to be dropped
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_append(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_append;
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_append values (1);
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create some more shards
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
1210001
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append');
|
||||||
|
master_create_empty_shard
|
||||||
|
---------------------------
|
||||||
|
1210002
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify 3 shards are presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
1210000
|
||||||
|
1210001
|
||||||
|
1210002
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_append;
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify no shard exists anymore
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_append;
|
||||||
|
--
|
||||||
|
-- truncate for range distribution
|
||||||
|
-- expect shard to be present, data to be truncated
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_range(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_range;
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_range values (1);
|
||||||
|
INSERT INTO test_truncate_range values (1001);
|
||||||
|
INSERT INTO test_truncate_range values (2000);
|
||||||
|
INSERT INTO test_truncate_range values (100);
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify 3 shards are presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
1210003
|
||||||
|
1210004
|
||||||
|
1210005
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_range;
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify 3 shards are still present
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
1210003
|
||||||
|
1210004
|
||||||
|
1210005
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_range;
|
||||||
|
--
|
||||||
|
-- truncate for hash distribution.
|
||||||
|
-- expect shard to be present, data to be truncated
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_hash(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_hash values (1);
|
||||||
|
ERROR: could not find any shards
|
||||||
|
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||||
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
|
INSERT INTO test_truncate_hash values (1001);
|
||||||
|
ERROR: could not find any shards
|
||||||
|
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||||
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
|
INSERT INTO test_truncate_hash values (2000);
|
||||||
|
ERROR: could not find any shards
|
||||||
|
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||||
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
|
INSERT INTO test_truncate_hash values (100);
|
||||||
|
ERROR: could not find any shards
|
||||||
|
DETAIL: No shards exist for distributed table "test_truncate_hash".
|
||||||
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify 4 shards are present
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_hash values (1);
|
||||||
|
INSERT INTO test_truncate_hash values (1001);
|
||||||
|
INSERT INTO test_truncate_hash values (2000);
|
||||||
|
INSERT INTO test_truncate_hash values (100);
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify 4 shards are still presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||||
|
shardid
|
||||||
|
---------
|
||||||
|
1210006
|
||||||
|
1210007
|
||||||
|
1210008
|
||||||
|
1210009
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_hash;
|
|
@ -160,3 +160,8 @@ test: multi_schema_support
|
||||||
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
|
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_function_evaluation
|
test: multi_function_evaluation
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# multi_truncate tests truncate functionality for distributed tables
|
||||||
|
# ----------
|
||||||
|
test: multi_truncate
|
||||||
|
|
|
@ -103,6 +103,8 @@ ORDER BY attnum;
|
||||||
int_column1 | integer | default 1
|
int_column1 | integer | default 1
|
||||||
int_column2 | integer | default 2
|
int_column2 | integer | default 2
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
SELECT float_column, count(*) FROM lineitem_alter GROUP BY float_column;
|
SELECT float_column, count(*) FROM lineitem_alter GROUP BY float_column;
|
||||||
float_column | count
|
float_column | count
|
||||||
|
@ -164,6 +166,8 @@ ALTER TABLE lineitem_alter ALTER COLUMN int_column2 SET NOT NULL;
|
||||||
int_column1 | integer |
|
int_column1 | integer |
|
||||||
int_column2 | integer | not null default 2
|
int_column2 | integer | not null default 2
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
-- Drop default so that NULLs will be inserted for this column
|
-- Drop default so that NULLs will be inserted for this column
|
||||||
ALTER TABLE lineitem_alter ALTER COLUMN int_column2 DROP DEFAULT;
|
ALTER TABLE lineitem_alter ALTER COLUMN int_column2 DROP DEFAULT;
|
||||||
|
@ -199,6 +203,8 @@ ALTER TABLE lineitem_alter ALTER COLUMN int_column2 DROP NOT NULL;
|
||||||
int_column1 | integer |
|
int_column1 | integer |
|
||||||
int_column2 | integer |
|
int_column2 | integer |
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
-- \copy should succeed now
|
-- \copy should succeed now
|
||||||
\copy lineitem_alter (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
\copy lineitem_alter (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment) FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
|
||||||
|
@ -242,6 +248,8 @@ ALTER TABLE lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE FLOAT;
|
||||||
int_column1 | integer |
|
int_column1 | integer |
|
||||||
int_column2 | double precision |
|
int_column2 | double precision |
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
SELECT int_column2, pg_typeof(int_column2), count(*) from lineitem_alter GROUP BY int_column2;
|
SELECT int_column2, pg_typeof(int_column2), count(*) from lineitem_alter GROUP BY int_column2;
|
||||||
int_column2 | pg_typeof | count
|
int_column2 | pg_typeof | count
|
||||||
|
@ -288,6 +296,8 @@ ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2;
|
||||||
l_shipmode | character(10) | not null
|
l_shipmode | character(10) | not null
|
||||||
l_comment | character varying(44) | not null
|
l_comment | character varying(44) | not null
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
-- Verify that we can execute commands with multiple subcommands
|
-- Verify that we can execute commands with multiple subcommands
|
||||||
ALTER TABLE lineitem_alter ADD COLUMN int_column1 INTEGER,
|
ALTER TABLE lineitem_alter ADD COLUMN int_column1 INTEGER,
|
||||||
|
@ -315,6 +325,8 @@ ALTER TABLE lineitem_alter ADD COLUMN int_column1 INTEGER,
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
int_column1 | integer |
|
int_column1 | integer |
|
||||||
int_column2 | integer |
|
int_column2 | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
ALTER TABLE lineitem_alter ADD COLUMN int_column3 INTEGER,
|
ALTER TABLE lineitem_alter ADD COLUMN int_column3 INTEGER,
|
||||||
ALTER COLUMN int_column1 SET STATISTICS 10;
|
ALTER COLUMN int_column1 SET STATISTICS 10;
|
||||||
|
@ -342,6 +354,8 @@ ALTER TABLE lineitem_alter DROP COLUMN int_column1, DROP COLUMN int_column2;
|
||||||
l_shipmode | character(10) | not null
|
l_shipmode | character(10) | not null
|
||||||
l_comment | character varying(44) | not null
|
l_comment | character varying(44) | not null
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
-- Verify that we cannot execute alter commands on the distribution column
|
-- Verify that we cannot execute alter commands on the distribution column
|
||||||
ALTER TABLE lineitem_alter ALTER COLUMN l_orderkey DROP NOT NULL;
|
ALTER TABLE lineitem_alter ALTER COLUMN l_orderkey DROP NOT NULL;
|
||||||
|
@ -411,6 +425,8 @@ ERROR: renaming distributed tables or their objects is currently unsupported
|
||||||
l_shipmode | character(10) | not null
|
l_shipmode | character(10) | not null
|
||||||
l_comment | character varying(44) | not null
|
l_comment | character varying(44) | not null
|
||||||
null_column | integer |
|
null_column | integer |
|
||||||
|
Triggers:
|
||||||
|
"citus_truncate_trigger_on_public.lineitem_alter" BEFORE TRUNCATE ON lineitem_alter FOR EACH STATEMENT EXECUTE PROCEDURE citus_truncate_trigger()
|
||||||
|
|
||||||
-- verify that non-propagated ddl commands are allowed inside a transaction block
|
-- verify that non-propagated ddl commands are allowed inside a transaction block
|
||||||
SET citus.enable_ddl_propagation to false;
|
SET citus.enable_ddl_propagation to false;
|
||||||
|
|
|
@ -846,6 +846,7 @@ FROM
|
||||||
hasdone) AS subquery_top
|
hasdone) AS subquery_top
|
||||||
GROUP BY
|
GROUP BY
|
||||||
hasdone;
|
hasdone;
|
||||||
|
|
||||||
QUERY PLAN
|
QUERY PLAN
|
||||||
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||||
Distributed Query into pg_merge_job_270015
|
Distributed Query into pg_merge_job_270015
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
--
|
||||||
|
-- MULTI_TRUNCATE
|
||||||
|
--
|
||||||
|
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- truncate for append distribution
|
||||||
|
-- expect all shards to be dropped
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_append(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_append;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_append values (1);
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
|
||||||
|
-- create some more shards
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append');
|
||||||
|
SELECT master_create_empty_shard('test_truncate_append');
|
||||||
|
|
||||||
|
-- verify 3 shards are presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_append;
|
||||||
|
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_append;
|
||||||
|
|
||||||
|
-- verify no shard exists anymore
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_append;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- truncate for range distribution
|
||||||
|
-- expect shard to be present, data to be truncated
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_range(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_range;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_range values (1);
|
||||||
|
INSERT INTO test_truncate_range values (1001);
|
||||||
|
INSERT INTO test_truncate_range values (2000);
|
||||||
|
INSERT INTO test_truncate_range values (100);
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
|
||||||
|
-- verify 3 shards are presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_range;
|
||||||
|
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_range;
|
||||||
|
|
||||||
|
-- verify 3 shards are still present
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_range;
|
||||||
|
|
||||||
|
|
||||||
|
--
|
||||||
|
-- truncate for hash distribution.
|
||||||
|
-- expect shard to be present, data to be truncated
|
||||||
|
--
|
||||||
|
CREATE TABLE test_truncate_hash(a int);
|
||||||
|
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
|
||||||
|
|
||||||
|
-- verify no error is thrown when no shards are present
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_hash values (1);
|
||||||
|
INSERT INTO test_truncate_hash values (1001);
|
||||||
|
INSERT INTO test_truncate_hash values (2000);
|
||||||
|
INSERT INTO test_truncate_hash values (100);
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
|
||||||
|
-- verify 4 shards are present
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
|
||||||
|
|
||||||
|
INSERT INTO test_truncate_hash values (1);
|
||||||
|
INSERT INTO test_truncate_hash values (1001);
|
||||||
|
INSERT INTO test_truncate_hash values (2000);
|
||||||
|
INSERT INTO test_truncate_hash values (100);
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
|
||||||
|
TRUNCATE TABLE test_truncate_hash;
|
||||||
|
|
||||||
|
-- verify data is truncated from the table
|
||||||
|
SELECT count(*) FROM test_truncate_hash;
|
||||||
|
|
||||||
|
-- verify 4 shards are still presents
|
||||||
|
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
|
||||||
|
|
||||||
|
DROP TABLE test_truncate_hash;
|
Loading…
Reference in New Issue