Support truncate command for distributed relations

pull/729/head
Murat Tuncer 2016-08-09 18:40:37 +03:00
parent 3ea352e5f9
commit cf48dfe5f5
5 changed files with 576 additions and 20 deletions

View File

@ -53,6 +53,7 @@
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "server/fmgr.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "tcop/dest.h"
@ -103,6 +104,7 @@ static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand,
bool isTopLevel);
static Node * ProcessTruncateStmt(TruncateStmt *truncateStatement);
/* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
@ -201,6 +203,16 @@ multi_ProcessUtility(Node *parsetree,
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
}
if (IsA(parsetree, TruncateStmt))
{
parsetree = ProcessTruncateStmt((TruncateStmt *) parsetree);
if (parsetree == NULL)
{
return;
}
}
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
if (EnableDDLPropagation)
{
@ -747,6 +759,112 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
}
/*
* ProcessTruncateStmt processes truncate statement for distributed relations.
* The function first checks if the truncate statement is issued on a distributed
* relation, and bails out if it cannot find one. It expects to find a single
* distributed relation to be truncated. the function errors out if there are more
* than one distributed relation, or there is a mixture of distributed/local
* relations in the truncate target list.
*
* Actual truncate operation differs based on the distribution method. The function
* calls master_drop_all_shards to for append distributed relation to drop all shards.
* Truncate on hash or range distributed tables is handled by the function
* master_modify_multiple_shards with truncate table statement as a parameter.
*
* The function disregards any parameters that truncate command may have.
*/
static Node *
ProcessTruncateStmt(TruncateStmt *truncateStatement)
{
List *relationList = truncateStatement->relations;
List *distributedRelationList = NIL;
ListCell *relationCell = NULL;
RangeVar *relationRangeVar = NULL;
bool missingOK = false;
Oid relationId = InvalidOid;
char partitionMethod = '\0';
char *schemaName = NULL;
Oid schemaId = InvalidOid;
char *relationName = NULL;
foreach(relationCell, relationList)
{
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
relationId = RangeVarGetRelid(rangeVar, NoLock, true);
if (IsDistributedTable(relationId))
{
distributedRelationList = lappend(distributedRelationList, rangeVar);
}
}
/* nothing to do if no distributed table is present */
if (distributedRelationList == NIL)
{
return (Node *) truncateStatement;
}
/* error if there are more than one table to be truncated */
if (list_length(distributedRelationList) != 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("truncate on multiple distributed relations is "
"currently unsupported"),
errhint("Truncate one distributed table at a time.")));
}
/* distributed table must be the only table in the truncate command */
if (list_length(relationList) != 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("truncate including distributed and regular relations is "
"currently unsupported"),
errhint("Truncate them separately.")));
}
relationRangeVar = (RangeVar *) linitial(distributedRelationList);
/* verify relation exists */
relationId = RangeVarGetRelid(relationRangeVar, NoLock, missingOK);
schemaId = get_rel_namespace(relationId);
schemaName = get_namespace_name(schemaId);
relationName = get_rel_name(relationId);
partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
text *relationNameText = cstring_to_text(relationName);
Datum relationNameDatum = PointerGetDatum(relationNameText);
text *schemaNameText = cstring_to_text(schemaName);
Datum schemaNameDatum = PointerGetDatum(schemaNameText);
Datum relationIdDatum = ObjectIdGetDatum(relationId);
DirectFunctionCall3(master_drop_all_shards, relationIdDatum, relationNameDatum,
schemaNameDatum);
}
else
{
StringInfo truncateString = makeStringInfo();
char *qualifiedName = NULL;
text *truncateText = NULL;
Datum truncateTextDatum = Int32GetDatum(0);
qualifiedName = quote_qualified_identifier(schemaName, relationName);
appendStringInfo(truncateString, "TRUNCATE TABLE %s", qualifiedName);
truncateText = cstring_to_text(truncateString->data);
truncateTextDatum = PointerGetDatum(truncateText);
DirectFunctionCall1(master_modify_multiple_shards, truncateTextDatum);
}
return NULL;
}
/*
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
* supported for distributed tables and errors out if it is not.

View File

@ -59,7 +59,10 @@ static void LockShardsForModify(List *shardIntervalList);
static bool HasReplication(List *shardIntervalList);
static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
static int SendQueryToPlacements(char *shardQueryString,
ShardConnections *shardConnections);
ShardConnections *shardConnections,
bool returnTupleCount);
static void deparse_truncate_query(Query *query, Oid distrelid, int64 shardid, StringInfo
buffer);
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
@ -88,6 +91,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
List *shardIntervalList = NIL;
List *prunedShardIntervalList = NIL;
int32 affectedTupleCount = 0;
bool validateModifyQuery = true;
PreventTransactionChain(isTopLevel, "master_modify_multiple_shards");
@ -104,10 +108,29 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
relationId = RangeVarGetRelid(updateStatement->relation, NoLock, failOK);
EnsureTablePermissions(relationId, ACL_UPDATE);
}
else if (IsA(queryTreeNode, TruncateStmt))
{
TruncateStmt *truncateStatement = (TruncateStmt *) queryTreeNode;
List *relationList = truncateStatement->relations;
RangeVar *rangeVar = NULL;
if (list_length(relationList) != 1)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("master_modify_multiple_shards() can truncate only "
"one table")));
}
rangeVar = (RangeVar *) linitial(relationList);
relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
EnsureTablePermissions(relationId, ACL_TRUNCATE);
validateModifyQuery = false;
}
else
{
ereport(ERROR, (errmsg("query \"%s\" is not a delete nor update statement",
queryString)));
ereport(ERROR, (errmsg("query \"%s\" is not a delete, update, or truncate "
"statement", queryString)));
}
CheckDistributedTable(relationId);
@ -115,7 +138,10 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
queryTreeList = pg_analyze_and_rewrite(queryTreeNode, queryString, NULL, 0);
modifyQuery = (Query *) linitial(queryTreeList);
if (validateModifyQuery)
{
ErrorIfModifyQueryNotSupported(modifyQuery);
}
/* reject queries with a returning list */
if (list_length(modifyQuery->returningList) > 0)
@ -216,6 +242,9 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
char *relationOwner = TableOwner(relationId);
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL;
bool truncateCommand = false;
bool requestTupleCount = true;
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
@ -224,6 +253,12 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
MemoryContextSwitchTo(oldContext);
if (query->commandType == CMD_UTILITY && IsA(query->utilityStmt, TruncateStmt))
{
truncateCommand = true;
requestTupleCount = false;
}
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(
@ -241,10 +276,19 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
&shardConnectionsFound);
Assert(shardConnectionsFound);
if (truncateCommand)
{
deparse_truncate_query(query, relationId, shardId, shardQueryString);
}
else
{
deparse_shard_query(query, relationId, shardId, shardQueryString);
}
shardQueryStringData = shardQueryString->data;
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
shardConnections);
shardConnections,
requestTupleCount);
affectedTupleCount += shardAffectedTupleCount;
}
@ -255,13 +299,41 @@ SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
}
/*
* deparse_truncate_query creates sql representation of a truncate statement. The
* function only generated basic truncate statement of the form
* 'truncate table <table_name>' it ignores all options. It also assumes that
* there is only one relation in the relation list.
*/
void
deparse_truncate_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer)
{
TruncateStmt *truncateStatement = NULL;
RangeVar *relation = NULL;
char *qualifiedName = NULL;
Assert(query->commandType == CMD_UTILITY);
Assert(IsA(query->utilityStmt, TruncateStmt));
truncateStatement = (TruncateStmt *) query->utilityStmt;
Assert(list_length(truncateStatement->relations) == 1);
relation = (RangeVar *) linitial(truncateStatement->relations);
qualifiedName = quote_qualified_identifier(relation->schemaname,
relation->relname);
appendStringInfo(buffer, "TRUNCATE TABLE %s_" UINT64_FORMAT, qualifiedName, shardid);
}
/*
* SendQueryToPlacements sends the given query string to all given placement
* connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
* should be called after all queries have been sent successfully.
*/
static int
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections,
bool returnTupleCount)
{
uint64 shardId = shardConnections->shardId;
List *connectionList = shardConnections->connectionList;
@ -270,6 +342,11 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
Assert(connectionList != NIL);
if (!returnTupleCount)
{
shardAffectedTupleCount = 0;
}
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
@ -290,6 +367,9 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
}
placementAffectedTupleString = PQcmdTuples(result);
if (returnTupleCount)
{
placementAffectedTupleCount = pg_atoi(placementAffectedTupleString,
sizeof(int32), 0);
@ -306,6 +386,7 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
errdetail("Affected tuple counts at placements of shard "
UINT64_FORMAT " are different.", shardId)));
}
}
PQclear(result);

View File

@ -0,0 +1,219 @@
--
-- MULTI_TRUNCATE
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
--
-- truncate for append distribution
-- expect all shards to be dropped
--
CREATE TABLE test_truncate_append(a int);
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_append;
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT count(*) FROM test_truncate_append;
count
-------
0
(1 row)
INSERT INTO test_truncate_append values (1);
SELECT count(*) FROM test_truncate_append;
count
-------
1
(1 row)
-- create some more shards
SELECT master_create_empty_shard('test_truncate_append');
master_create_empty_shard
---------------------------
1210001
(1 row)
SELECT master_create_empty_shard('test_truncate_append');
master_create_empty_shard
---------------------------
1210002
(1 row)
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
shardid
---------
1210000
1210001
1210002
(3 rows)
TRUNCATE TABLE test_truncate_append;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_append;
count
-------
(1 row)
-- verify no shard exists anymore
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
shardid
---------
(0 rows)
DROP TABLE test_truncate_append;
--
-- truncate for range distribution
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_range(a int);
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
master_create_distributed_table
---------------------------------
(1 row)
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_range;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
WHERE shardid = :new_shard_id;
SELECT count(*) FROM test_truncate_range;
count
-------
0
(1 row)
INSERT INTO test_truncate_range values (1);
INSERT INTO test_truncate_range values (1001);
INSERT INTO test_truncate_range values (2000);
INSERT INTO test_truncate_range values (100);
SELECT count(*) FROM test_truncate_range;
count
-------
4
(1 row)
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
shardid
---------
1210003
1210004
1210005
(3 rows)
TRUNCATE TABLE test_truncate_range;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_range;
count
-------
0
(1 row)
-- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
shardid
---------
1210003
1210004
1210005
(3 rows)
DROP TABLE test_truncate_range;
--
-- truncate for hash distribution.
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_hash(a int);
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash;
count
-------
0
(1 row)
INSERT INTO test_truncate_hash values (1);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (1001);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (2000);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (100);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT count(*) FROM test_truncate_hash;
count
-------
0
(1 row)
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
shardid
---------
(0 rows)
TRUNCATE TABLE test_truncate_hash;
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
count
-------
4
(1 row)
TRUNCATE TABLE test_truncate_hash;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_hash;
count
-------
0
(1 row)
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
shardid
---------
1210006
1210007
1210008
1210009
(4 rows)
DROP TABLE test_truncate_hash;

View File

@ -160,3 +160,8 @@ test: multi_schema_support
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation
# ----------
test: multi_function_evaluation
# ----------
# multi_truncate tests truncate functionality for distributed tables
# ----------
test: multi_truncate

View File

@ -0,0 +1,133 @@
--
-- MULTI_TRUNCATE
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1210000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1210000;
--
-- truncate for append distribution
-- expect all shards to be dropped
--
CREATE TABLE test_truncate_append(a int);
SELECT master_create_distributed_table('test_truncate_append', 'a', 'append');
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_append;
SELECT master_create_empty_shard('test_truncate_append') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT count(*) FROM test_truncate_append;
INSERT INTO test_truncate_append values (1);
SELECT count(*) FROM test_truncate_append;
-- create some more shards
SELECT master_create_empty_shard('test_truncate_append');
SELECT master_create_empty_shard('test_truncate_append');
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
TRUNCATE TABLE test_truncate_append;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_append;
-- verify no shard exists anymore
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass;
DROP TABLE test_truncate_append;
--
-- truncate for range distribution
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_range(a int);
SELECT master_create_distributed_table('test_truncate_range', 'a', 'range');
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_range;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('test_truncate_range') AS new_shard_id \gset
UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2500
WHERE shardid = :new_shard_id;
SELECT count(*) FROM test_truncate_range;
INSERT INTO test_truncate_range values (1);
INSERT INTO test_truncate_range values (1001);
INSERT INTO test_truncate_range values (2000);
INSERT INTO test_truncate_range values (100);
SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
TRUNCATE TABLE test_truncate_range;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass;
DROP TABLE test_truncate_range;
--
-- truncate for hash distribution.
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_hash(a int);
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash;
INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
TRUNCATE TABLE test_truncate_hash;
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
TRUNCATE TABLE test_truncate_hash;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass;
DROP TABLE test_truncate_hash;