Support temporary distributed tables

temp_tables
Marco Slot 2017-08-22 22:25:54 +02:00
parent c68bd7efa7
commit 6fc545c87b
13 changed files with 174 additions and 13 deletions

View File

@ -28,6 +28,7 @@
#include "catalog/pg_trigger.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
@ -44,6 +45,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/executor.h"
@ -295,6 +297,13 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
Relation colocatedRelation = NULL;
char relationPersistence = get_rel_persistence(relationId);
if (relationPersistence == RELPERSISTENCE_TEMP && !KnownTemporaryTable(relationId))
{
ereport(ERROR, (errmsg("can only distribute temporary tables with "
"ON COMMIT DROP")));
}
replicationModel = AppropriateReplicationModel(distributionMethod, viaDeprecatedAPI);
/*
@ -800,9 +809,17 @@ EnsureSchemaExistsOnAllNodes(Oid relationId)
StringInfo applySchemaCreationDDL = makeStringInfo();
Oid schemaId = get_rel_namespace(relationId);
const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId);
const char *createSchemaDDL = NULL;
uint64 connectionFlag = FORCE_NEW_CONNECTION;
char tablePersistence = get_rel_persistence(relationId);
if (tablePersistence == RELPERSISTENCE_TEMP)
{
/* cannot create temporary table schemas */
return;
}
createSchemaDDL = CreateSchemaDDLCommand(schemaId);
if (createSchemaDDL == NULL)
{
return;

View File

@ -1062,13 +1062,24 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
char *schemaName = copyStatement->relation->schemaname;
char *relationName = copyStatement->relation->relname;
Oid tableId = InvalidOid;
char tablePersistence = 0;
char *shardName = pstrdup(relationName);
char *shardQualifiedName = NULL;
AppendShardIdToName(&shardName, shardId);
tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
tablePersistence = get_rel_persistence(tableId);
if (tablePersistence != RELPERSISTENCE_TEMP)
{
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
}
else
{
shardQualifiedName = (char *) quote_identifier(shardName);
}
appendStringInfo(command, "COPY %s ", shardQualifiedName);

View File

@ -519,6 +519,34 @@ multi_ProcessUtility(PlannedStmt *pstmt,
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}
/* keep track of transaction-scoped temporary tables */
if (IsA(parsetree, CreateStmt))
{
CreateStmt *createStatement = (CreateStmt *) parsetree;
if (createStatement->oncommit == ONCOMMIT_DROP)
{
bool missingOK = false;
Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock,
missingOK);
RegisterTemporaryTable(relationId);
}
}
else if (IsA(parsetree, CreateTableAsStmt))
{
CreateTableAsStmt *createTableAsStmt = (CreateTableAsStmt *) parsetree;
IntoClause *into = createTableAsStmt->into;
if (into->onCommit == ONCOMMIT_DROP)
{
bool missingOK = false;
Oid relationId = RangeVarGetRelid(into->rel, NoLock, missingOK);
RegisterTemporaryTable(relationId);
}
}
/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{

View File

@ -496,6 +496,7 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
{
List *tableDDLEventList = NIL;
char tableType = 0;
char tablePersistence = 0;
char *tableSchemaDef = NULL;
char *tableColumnOptionsDef = NULL;
char *createSchemaCommand = NULL;
@ -525,13 +526,17 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
tableDDLEventList = lappend(tableDDLEventList, serverDef);
}
/* create schema if the table is not in the default namespace (public) */
/* create schema if the table is not temporary */
tablePersistence = get_rel_persistence(relationId);
if (tablePersistence != RELPERSISTENCE_TEMP)
{
schemaId = get_rel_namespace(relationId);
createSchemaCommand = CreateSchemaDDLCommand(schemaId);
if (createSchemaCommand != NULL)
{
tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand);
}
}
/* fetch table schema and column option definitions */
tableSchemaDef = pg_get_tableschemadef_string(relationId, includeSequenceDefaults);

View File

@ -573,6 +573,13 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
char *escapedSchemaName = quote_literal_cstr(schemaName);
ListCell *ddlCommandCell = NULL;
ListCell *foreignConstraintCommandCell = NULL;
bool temporaryTable = false;
char tablePersistence = get_rel_persistence(relationId);
if (tablePersistence == RELPERSISTENCE_TEMP)
{
temporaryTable = true;
}
foreach(ddlCommandCell, ddlCommandList)
{
@ -580,7 +587,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
char *escapedDDLCommand = quote_literal_cstr(ddlCommand);
StringInfo applyDDLCommand = makeStringInfo();
if (strcmp(schemaName, "public") != 0)
if (strcmp(schemaName, "public") != 0 && !temporaryTable)
{
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand);

View File

@ -174,6 +174,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
Oid schemaId = InvalidOid;
char *relationName = NULL;
char *schemaName = NULL;
char relationPersistence = 0;
bool replaceRteWithNullValues = false;
ListCell *relationShardCell = NULL;
RelationShard *relationShard = NULL;
@ -236,8 +237,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList)
relationName = get_rel_name(relationId);
AppendShardIdToName(&relationName, shardId);
relationPersistence = get_rel_persistence(relationId);
if (relationPersistence != RELPERSISTENCE_TEMP)
{
schemaId = get_rel_namespace(relationId);
schemaName = get_namespace_name(schemaId);
}
ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL);

View File

@ -3978,6 +3978,7 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
Oid relationId = rangeTableEntry->relid;
char *relationName = get_rel_name(relationId);
char relationPersistence = get_rel_persistence(relationId);
/*
* If the table is not in the default namespace (public), we include it in
@ -3985,7 +3986,8 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment)
*/
Oid schemaId = get_rel_namespace(relationId);
schemaName = get_namespace_name(schemaId);
if (strncmp(schemaName, "public", NAMEDATALEN) == 0)
if (strncmp(schemaName, "public", NAMEDATALEN) == 0 ||
relationPersistence == RELPERSISTENCE_TEMP)
{
schemaName = NULL;
}

View File

@ -174,7 +174,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
char **relationSchemaName = &(createStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
if (createStmt->relation->relpersistence != RELPERSISTENCE_TEMP)
{
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
}
AppendShardIdToName(relationName, shardId);
break;

View File

@ -22,11 +22,13 @@
#include "distributed/backend_data.h"
#include "distributed/connection_management.h"
#include "distributed/hash_helpers.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/transaction_management.h"
#include "distributed/placement_connection.h"
#include "utils/hsearch.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -45,6 +47,9 @@ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
/* stack of active sub-transactions */
static List *activeSubXacts = NIL;
/* list of temporary tables in the current transaction */
static List *temporaryTables = NIL;
/*
* Should this coordinated transaction use 2PC? Set by
* CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
@ -61,6 +66,11 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction
static void AdjustMaxPreparedTransactions(void);
static void PushSubXact(SubTransactionId subId);
static void PopSubXact(SubTransactionId subId);
static void DropTemporaryTables(void);
/* TODO: make a proper function that we can call */
extern Datum worker_drop_distributed_table(PG_FUNCTION_ARGS);
/*
@ -211,6 +221,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
dlist_init(&InProgressTransactions);
CoordinatedTransactionUses2PC = false;
UnSetDistributedTransactionId();
temporaryTables = NIL;
break;
}
@ -228,6 +240,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
case XACT_EVENT_PRE_COMMIT:
{
DropTemporaryTables();
/* nothing further to do if there's no managed remote xacts */
if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
{
@ -409,3 +423,45 @@ ActiveSubXacts(void)
return activeSubXactsReversed;
}
void
RegisterTemporaryTable(Oid relationId)
{
MemoryContext oldContext = MemoryContextSwitchTo(CurTransactionContext);
temporaryTables = lappend_oid(temporaryTables, relationId);
MemoryContextSwitchTo(oldContext);
}
bool
KnownTemporaryTable(Oid relationId)
{
return list_member_oid(temporaryTables, relationId);
}
static void
DropTemporaryTables(void)
{
ListCell *tableCell = NULL;
foreach(tableCell, temporaryTables)
{
Oid relationId = lfirst_oid(tableCell);
if (get_rel_name(relationId) == NULL)
{
/* temp table was already dropped */
continue;
}
if (IsDistributedTable(relationId))
{
Datum relationIdDatum = DatumGetObjectId(relationId);
DirectFunctionCall1(worker_drop_distributed_table, relationIdDatum);
}
}
temporaryTables = NIL;
}

View File

@ -315,6 +315,10 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
{
appendStringInfoString(&buffer, "UNLOGGED ");
}
else if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
{
appendStringInfoString(&buffer, "TEMPORARY ");
}
appendStringInfo(&buffer, "TABLE %s (", relationName);
}
@ -473,6 +477,13 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation);
}
#endif
if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
{
/* we always drop temporary tables at the end of the transaction */
appendStringInfoString(&buffer, " ON COMMIT DROP");
}
relation_close(relation, AccessShareLock);
return (buffer.data);

View File

@ -7496,7 +7496,13 @@ generate_relation_or_shard_name(Oid relid, Oid distrelid, int64 shardid,
if (shardid > 0)
{
Oid schemaOid = get_rel_namespace(relid);
char *schemaName = get_namespace_name(schemaOid);
char *schemaName = NULL;
char relpersistence = get_rel_persistence(relid);
if (relpersistence != RELPERSISTENCE_TEMP)
{
schemaName = get_namespace_name(schemaOid);
}
AppendShardIdToName(&relname, shardid);
@ -7563,6 +7569,10 @@ generate_relation_name(Oid relid, List *namespaces)
if (!need_qual)
need_qual = !RelationIsVisible(relid);
/* temporary tables should never be qualified */
if (reltup->relpersistence == RELPERSISTENCE_TEMP)
need_qual = false;
if (need_qual)
nspname = get_namespace_name(reltup->relnamespace);
else

View File

@ -7280,6 +7280,10 @@ generate_relation_name(Oid relid, List *namespaces)
if (!need_qual)
need_qual = !RelationIsVisible(relid);
/* temporary tables should never be qualified */
if (reltup->relpersistence == RELPERSISTENCE_TEMP)
need_qual = false;
if (need_qual)
nspname = get_namespace_name(reltup->relnamespace);
else

View File

@ -80,6 +80,8 @@ extern void InitializeTransactionManagement(void);
/* other functions */
extern List * ActiveSubXacts(void);
extern void RegisterTemporaryTable(Oid relationId);
extern bool KnownTemporaryTable(Oid relationId);
#endif /* TRANSACTION_MANAGMENT_H */