From 6fc545c87b34145286a87722d726123d4457edf6 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 22 Aug 2017 22:25:54 +0200 Subject: [PATCH] Support temporary distributed tables --- .../commands/create_distributed_table.c | 19 ++++++- src/backend/distributed/commands/multi_copy.c | 13 ++++- .../distributed/executor/multi_utility.c | 28 ++++++++++ .../distributed/master/master_node_protocol.c | 15 +++-- .../master/master_stage_protocol.c | 9 ++- .../distributed/planner/deparse_shard_query.c | 9 ++- .../planner/multi_physical_planner.c | 4 +- .../distributed/relay/relay_event_utility.c | 5 +- .../transaction/transaction_management.c | 56 +++++++++++++++++++ .../distributed/utils/citus_ruleutils.c | 11 ++++ src/backend/distributed/utils/ruleutils_10.c | 12 +++- src/backend/distributed/utils/ruleutils_96.c | 4 ++ .../distributed/transaction_management.h | 2 + 13 files changed, 174 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 247e41277..82d6f3b03 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -28,6 +28,7 @@ #include "catalog/pg_trigger.h" #include "commands/defrem.h" #include "commands/extension.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" @@ -44,6 +45,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" +#include "distributed/transaction_management.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "executor/executor.h" @@ -295,6 +297,13 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio Relation colocatedRelation = NULL; + char relationPersistence = get_rel_persistence(relationId); + if (relationPersistence == RELPERSISTENCE_TEMP && !KnownTemporaryTable(relationId)) + { + ereport(ERROR, (errmsg("can only distribute temporary tables with " + "ON COMMIT DROP"))); + } + replicationModel = AppropriateReplicationModel(distributionMethod, viaDeprecatedAPI); /* @@ -800,9 +809,17 @@ EnsureSchemaExistsOnAllNodes(Oid relationId) StringInfo applySchemaCreationDDL = makeStringInfo(); Oid schemaId = get_rel_namespace(relationId); - const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId); + const char *createSchemaDDL = NULL; uint64 connectionFlag = FORCE_NEW_CONNECTION; + char tablePersistence = get_rel_persistence(relationId); + if (tablePersistence == RELPERSISTENCE_TEMP) + { + /* cannot create temporary table schemas */ + return; + } + + createSchemaDDL = CreateSchemaDDLCommand(schemaId); if (createSchemaDDL == NULL) { return; diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index e1dd6dc13..74a4dc3fb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1062,13 +1062,24 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop char *schemaName = copyStatement->relation->schemaname; char *relationName = copyStatement->relation->relname; + Oid tableId = InvalidOid; + char tablePersistence = 0; char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; AppendShardIdToName(&shardName, shardId); - shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + tablePersistence = get_rel_persistence(tableId); + if (tablePersistence != RELPERSISTENCE_TEMP) + { + shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + } + else + { + shardQualifiedName = (char *) quote_identifier(shardName); + } appendStringInfo(command, "COPY %s ", shardQualifiedName); diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 636073c1b..5c5d48cea 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -519,6 +519,34 @@ multi_ProcessUtility(PlannedStmt *pstmt, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } + /* keep track of transaction-scoped temporary tables */ + if (IsA(parsetree, CreateStmt)) + { + CreateStmt *createStatement = (CreateStmt *) parsetree; + + if (createStatement->oncommit == ONCOMMIT_DROP) + { + bool missingOK = false; + Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock, + missingOK); + + RegisterTemporaryTable(relationId); + } + } + else if (IsA(parsetree, CreateTableAsStmt)) + { + CreateTableAsStmt *createTableAsStmt = (CreateTableAsStmt *) parsetree; + IntoClause *into = createTableAsStmt->into; + + if (into->onCommit == ONCOMMIT_DROP) + { + bool missingOK = false; + Oid relationId = RangeVarGetRelid(into->rel, NoLock, missingOK); + + RegisterTemporaryTable(relationId); + } + } + /* after local command has completed, finish by executing worker DDLJobs, if any */ if (ddlJobs != NIL) { diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index bcc89b281..51d22d5aa 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -496,6 +496,7 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults) { List *tableDDLEventList = NIL; char tableType = 0; + char tablePersistence = 0; char *tableSchemaDef = NULL; char *tableColumnOptionsDef = NULL; char *createSchemaCommand = NULL; @@ -525,12 +526,16 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults) tableDDLEventList = lappend(tableDDLEventList, serverDef); } - /* create schema if the table is not in the default namespace (public) */ - schemaId = get_rel_namespace(relationId); - createSchemaCommand = CreateSchemaDDLCommand(schemaId); - if (createSchemaCommand != NULL) + /* create schema if the table is not temporary */ + tablePersistence = get_rel_persistence(relationId); + if (tablePersistence != RELPERSISTENCE_TEMP) { - tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand); + schemaId = get_rel_namespace(relationId); + createSchemaCommand = CreateSchemaDDLCommand(schemaId); + if (createSchemaCommand != NULL) + { + tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand); + } } /* fetch table schema and column option definitions */ diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 2f964d603..777b6aa4c 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -573,6 +573,13 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma char *escapedSchemaName = quote_literal_cstr(schemaName); ListCell *ddlCommandCell = NULL; ListCell *foreignConstraintCommandCell = NULL; + bool temporaryTable = false; + + char tablePersistence = get_rel_persistence(relationId); + if (tablePersistence == RELPERSISTENCE_TEMP) + { + temporaryTable = true; + } foreach(ddlCommandCell, ddlCommandList) { @@ -580,7 +587,7 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma char *escapedDDLCommand = quote_literal_cstr(ddlCommand); StringInfo applyDDLCommand = makeStringInfo(); - if (strcmp(schemaName, "public") != 0) + if (strcmp(schemaName, "public") != 0 && !temporaryTable) { appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedSchemaName, escapedDDLCommand); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index dc06ff81a..d8df574d6 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -174,6 +174,7 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) Oid schemaId = InvalidOid; char *relationName = NULL; char *schemaName = NULL; + char relationPersistence = 0; bool replaceRteWithNullValues = false; ListCell *relationShardCell = NULL; RelationShard *relationShard = NULL; @@ -236,8 +237,12 @@ UpdateRelationToShardNames(Node *node, List *relationShardList) relationName = get_rel_name(relationId); AppendShardIdToName(&relationName, shardId); - schemaId = get_rel_namespace(relationId); - schemaName = get_namespace_name(schemaId); + relationPersistence = get_rel_persistence(relationId); + if (relationPersistence != RELPERSISTENCE_TEMP) + { + schemaId = get_rel_namespace(relationId); + schemaName = get_namespace_name(schemaId); + } ModifyRangeTblExtraData(newRte, CITUS_RTE_SHARD, schemaName, relationName, NIL); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 9c0d1341f..698ec1c32 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3978,6 +3978,7 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment) Oid relationId = rangeTableEntry->relid; char *relationName = get_rel_name(relationId); + char relationPersistence = get_rel_persistence(relationId); /* * If the table is not in the default namespace (public), we include it in @@ -3985,7 +3986,8 @@ FragmentAlias(RangeTblEntry *rangeTableEntry, RangeTableFragment *fragment) */ Oid schemaId = get_rel_namespace(relationId); schemaName = get_namespace_name(schemaId); - if (strncmp(schemaName, "public", NAMEDATALEN) == 0) + if (strncmp(schemaName, "public", NAMEDATALEN) == 0 || + relationPersistence == RELPERSISTENCE_TEMP) { schemaName = NULL; } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index b09e20df7..8a576d0c5 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -174,7 +174,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) char **relationSchemaName = &(createStmt->relation->schemaname); /* prefix with schema name if it is not added already */ - SetSchemaNameIfNotExist(relationSchemaName, schemaName); + if (createStmt->relation->relpersistence != RELPERSISTENCE_TEMP) + { + SetSchemaNameIfNotExist(relationSchemaName, schemaName); + } AppendShardIdToName(relationName, shardId); break; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 9d7d0b30c..3bea3b200 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -22,11 +22,13 @@ #include "distributed/backend_data.h" #include "distributed/connection_management.h" #include "distributed/hash_helpers.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_shard_transaction.h" #include "distributed/transaction_management.h" #include "distributed/placement_connection.h" #include "utils/hsearch.h" #include "utils/guc.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" @@ -45,6 +47,9 @@ dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions); /* stack of active sub-transactions */ static List *activeSubXacts = NIL; +/* list of temporary tables in the current transaction */ +static List *temporaryTables = NIL; + /* * Should this coordinated transaction use 2PC? Set by * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and @@ -61,6 +66,11 @@ static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransaction static void AdjustMaxPreparedTransactions(void); static void PushSubXact(SubTransactionId subId); static void PopSubXact(SubTransactionId subId); +static void DropTemporaryTables(void); + + +/* TODO: make a proper function that we can call */ +extern Datum worker_drop_distributed_table(PG_FUNCTION_ARGS); /* @@ -211,6 +221,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) dlist_init(&InProgressTransactions); CoordinatedTransactionUses2PC = false; UnSetDistributedTransactionId(); + + temporaryTables = NIL; break; } @@ -228,6 +240,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_PRE_COMMIT: { + DropTemporaryTables(); + /* nothing further to do if there's no managed remote xacts */ if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) { @@ -409,3 +423,45 @@ ActiveSubXacts(void) return activeSubXactsReversed; } + + +void +RegisterTemporaryTable(Oid relationId) +{ + MemoryContext oldContext = MemoryContextSwitchTo(CurTransactionContext); + temporaryTables = lappend_oid(temporaryTables, relationId); + MemoryContextSwitchTo(oldContext); +} + + +bool +KnownTemporaryTable(Oid relationId) +{ + return list_member_oid(temporaryTables, relationId); +} + + +static void +DropTemporaryTables(void) +{ + ListCell *tableCell = NULL; + + foreach(tableCell, temporaryTables) + { + Oid relationId = lfirst_oid(tableCell); + + if (get_rel_name(relationId) == NULL) + { + /* temp table was already dropped */ + continue; + } + + if (IsDistributedTable(relationId)) + { + Datum relationIdDatum = DatumGetObjectId(relationId); + DirectFunctionCall1(worker_drop_distributed_table, relationIdDatum); + } + } + + temporaryTables = NIL; +} diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index 4cd3b7a1a..5df44796d 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -315,6 +315,10 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) { appendStringInfoString(&buffer, "UNLOGGED "); } + else if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP) + { + appendStringInfoString(&buffer, "TEMPORARY "); + } appendStringInfo(&buffer, "TABLE %s (", relationName); } @@ -473,6 +477,13 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation); } #endif + + if (relation->rd_rel->relpersistence == RELPERSISTENCE_TEMP) + { + /* we always drop temporary tables at the end of the transaction */ + appendStringInfoString(&buffer, " ON COMMIT DROP"); + } + relation_close(relation, AccessShareLock); return (buffer.data); diff --git a/src/backend/distributed/utils/ruleutils_10.c b/src/backend/distributed/utils/ruleutils_10.c index 6dfdad67c..6925fc974 100644 --- a/src/backend/distributed/utils/ruleutils_10.c +++ b/src/backend/distributed/utils/ruleutils_10.c @@ -7496,7 +7496,13 @@ generate_relation_or_shard_name(Oid relid, Oid distrelid, int64 shardid, if (shardid > 0) { Oid schemaOid = get_rel_namespace(relid); - char *schemaName = get_namespace_name(schemaOid); + char *schemaName = NULL; + + char relpersistence = get_rel_persistence(relid); + if (relpersistence != RELPERSISTENCE_TEMP) + { + schemaName = get_namespace_name(schemaOid); + } AppendShardIdToName(&relname, shardid); @@ -7563,6 +7569,10 @@ generate_relation_name(Oid relid, List *namespaces) if (!need_qual) need_qual = !RelationIsVisible(relid); + /* temporary tables should never be qualified */ + if (reltup->relpersistence == RELPERSISTENCE_TEMP) + need_qual = false; + if (need_qual) nspname = get_namespace_name(reltup->relnamespace); else diff --git a/src/backend/distributed/utils/ruleutils_96.c b/src/backend/distributed/utils/ruleutils_96.c index 0f19fe73e..f8e93f08a 100644 --- a/src/backend/distributed/utils/ruleutils_96.c +++ b/src/backend/distributed/utils/ruleutils_96.c @@ -7280,6 +7280,10 @@ generate_relation_name(Oid relid, List *namespaces) if (!need_qual) need_qual = !RelationIsVisible(relid); + /* temporary tables should never be qualified */ + if (reltup->relpersistence == RELPERSISTENCE_TEMP) + need_qual = false; + if (need_qual) nspname = get_namespace_name(reltup->relnamespace); else diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 731befdfc..ff147e6f1 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -80,6 +80,8 @@ extern void InitializeTransactionManagement(void); /* other functions */ extern List * ActiveSubXacts(void); +extern void RegisterTemporaryTable(Oid relationId); +extern bool KnownTemporaryTable(Oid relationId); #endif /* TRANSACTION_MANAGMENT_H */