create relfilenode for FDW

merge-cstore-pykello
Jeff Davis 2020-09-09 12:44:41 -07:00
parent 407892a9dd
commit e9045227cd
1 changed files with 146 additions and 15 deletions

View File

@ -21,9 +21,13 @@
#include "access/heapam.h"
#include "access/reloptions.h"
#include "access/tuptoaster.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_foreign_table.h"
#include "catalog/pg_namespace.h"
#include "catalog/storage.h"
#include "commands/copy.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
@ -50,18 +54,20 @@
#include "parser/parser.h"
#include "parser/parse_coerce.h"
#include "parser/parse_type.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#if PG_VERSION_NUM < 120000
#include "utils/rel.h"
#endif
#if PG_VERSION_NUM >= 120000
#include "utils/snapmgr.h"
#else
#include "utils/tqual.h"
#endif
#if PG_VERSION_NUM < 120000
#include "utils/rel.h"
#endif
#include "utils/syscache.h"
#include "cstore.h"
#include "cstore_fdw.h"
@ -124,6 +130,7 @@ static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
static List * DroppedCStoreFilenameList(DropStmt *dropStatement);
static List * FindCStoreTables(List *tableList);
static List * OpenRelationsForTruncate(List *cstoreTableList);
static void InitializeRelFileNode(Relation relation);
static void TruncateCStoreTables(List *cstoreRelationList);
static bool CStoreTable(Oid relationId);
static bool CStoreServer(ForeignServer *server);
@ -183,6 +190,9 @@ static void CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relatio
static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
#endif
static void cstore_fdw_initrel(Relation rel);
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
PG_FUNCTION_INFO_V1(cstore_table_size);
@ -261,7 +271,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
{
Oid relationId = RangeVarGetRelid(createStatement->base.relation,
AccessShareLock, false);
Relation relation = heap_open(relationId, AccessExclusiveLock);
Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock);
/*
* Make sure database directory exists before creating a table.
@ -368,7 +378,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
foreach(fileListCell, droppedTables)
{
char *fileName = lfirst(fileListCell);
//TODO: relation storage is not dropped
DeleteCStoreTableFiles(fileName);
}
}
@ -562,7 +572,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
* Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow
* concurrent reads, but block concurrent writes.
*/
relation = heap_openrv(copyStatement->relation, ShareUpdateExclusiveLock);
relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock);
relationId = RelationGetRelid(relation);
/* allocate column values and nulls arrays */
@ -850,7 +860,7 @@ OpenRelationsForTruncate(List *cstoreTableList)
foreach(relationCell, cstoreTableList)
{
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
Relation relation = heap_openrv(rangeVar, AccessExclusiveLock);
Relation relation = cstore_fdw_openrv(rangeVar, AccessExclusiveLock);
Oid relationId = relation->rd_id;
AclResult aclresult = pg_class_aclcheck(relationId, GetUserId(),
ACL_TRUNCATE);
@ -889,11 +899,76 @@ TruncateCStoreTables(List *cstoreRelationList)
Assert(CStoreTable(relationId));
cstoreOptions = CStoreGetOptions(relationId);
if (OidIsValid(relation->rd_rel->relfilenode))
{
RelationOpenSmgr(relation);
RelationDropStorage(relation);
}
DeleteCStoreTableFiles(cstoreOptions->filename);
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
}
}
/*
* Version 11 and earlier already create a relfilenode for foreign
* tables. Version 12 and later do not, so we need to create one manually.
*/
static void
InitializeRelFileNode(Relation relation)
{
#if PG_VERSION_NUM >= 120000
Relation pg_class;
HeapTuple tuple;
Form_pg_class classform;
/*
* Get a writable copy of the pg_class tuple for the given relation.
*/
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopy1(RELOID,
ObjectIdGetDatum(RelationGetRelid(relation)));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "could not find tuple for relation %u",
RelationGetRelid(relation));
classform = (Form_pg_class) GETSTRUCT(tuple);
if (!OidIsValid(classform->relfilenode))
{
Oid tablespace;
Oid filenode = relation->rd_id;
char persistence = relation->rd_rel->relpersistence;
RelFileNode newrnode;
SMgrRelation srel;
if (OidIsValid(relation->rd_rel->reltablespace))
tablespace = relation->rd_rel->reltablespace;
else
tablespace = MyDatabaseTableSpace;
newrnode.spcNode = tablespace;
newrnode.dbNode = MyDatabaseId;
newrnode.relNode = filenode;
srel = RelationCreateStorage(newrnode, persistence);
smgrclose(srel);
classform->relfilenode = filenode;
classform->relpages = 0; /* it's empty until further notice */
classform->reltuples = 0;
classform->relallvisible = 0;
classform->relfrozenxid = InvalidTransactionId;
classform->relminmxid = InvalidTransactionId;
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
CommandCounterIncrement();
}
heap_freetuple(tuple);
heap_close(pg_class, RowExclusiveLock);
#endif
}
/*
* CStoreTable checks if the given table name belongs to a foreign columnar store
@ -1179,6 +1254,7 @@ cstore_clean_table_resources(PG_FUNCTION_ARGS)
struct stat fileStat;
int statResult = -1;
//TODO: relation storage is not dropped
appendStringInfo(filePath, "%s/%s/%d/%d", DataDir, CSTORE_FDW_NAME,
(int) MyDatabaseId, (int) relationId);
@ -1402,7 +1478,7 @@ static char *
CStoreDefaultFilePath(Oid foreignTableId)
{
StringInfo cstoreFilePath = NULL;
Relation relation = relation_open(foreignTableId, AccessShareLock);
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
RelFileNode relationFileNode = relation->rd_node;
Oid databaseOid = relationFileNode.dbNode;
Oid relationFileOid = relationFileNode.relNode;
@ -1453,7 +1529,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
{
Path *foreignScanPath = NULL;
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
Relation relation = heap_open(foreignTableId, AccessShareLock);
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
/*
* We skip reading columns that are not in query. Here we assume that all
@ -1659,7 +1735,7 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
List *restrictInfoList = baserel->baserestrictinfo;
ListCell *restrictInfoCell = NULL;
const AttrNumber wholeRow = 0;
Relation relation = heap_open(foreignTableId, AccessShareLock);
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
/* first add the columns used in joins and projections */
@ -1750,10 +1826,13 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
static void
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
{
Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
Relation relation = scanState->ss.ss_currentRelation;
CStoreOptions *cstoreOptions;
Oid foreignTableId;
ExplainPropertyText("CStore File", cstoreOptions->filename, explainState);
cstore_fdw_initrel(relation);
foreignTableId = RelationGetRelid(relation);
cstoreOptions = CStoreGetOptions(foreignTableId);
/* supress file size if we're not showing cost details */
if (explainState->costs)
@ -1784,6 +1863,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
List *foreignPrivateList = NIL;
List *whereClauseList = NIL;
cstore_fdw_initrel(currentRelation);
/* if Explain with no Analyze, do nothing */
if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY)
{
@ -1869,9 +1950,12 @@ CStoreAnalyzeForeignTable(Relation relation,
BlockNumber *totalPageCount)
{
Oid foreignTableId = RelationGetRelid(relation);
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
CStoreOptions *cstoreOptions;
struct stat statBuffer;
cstore_fdw_initrel(relation);
cstoreOptions = CStoreGetOptions(foreignTableId);
int statResult = stat(cstoreOptions->filename, &statBuffer);
if (statResult < 0)
{
@ -1924,6 +2008,7 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
TupleDesc tupleDescriptor = RelationGetDescr(relation);
uint32 columnCount = tupleDescriptor->natts;
cstore_fdw_initrel(relation);
/* create list of columns of the relation */
uint32 columnIndex = 0;
@ -2147,7 +2232,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
Relation relation = NULL;
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock);
relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock);
cstoreOptions = CStoreGetOptions(foreignTableOid);
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
@ -2246,3 +2331,49 @@ CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
#endif
/*
* Versions 12 and later do not initialize rd_node even if the relation has a
* valid relfilenode, so we need to initialize it each time a cstore FDW
* relation is opened.
*/
static void
cstore_fdw_initrel(Relation rel)
{
#if PG_VERSION_NUM >= 120000
if (rel->rd_rel->relfilenode == InvalidOid)
InitializeRelFileNode(rel);
/*
* Copied code from RelationInitPhysicalAddr(), which doesn't
* work on foreign tables.
*/
if (OidIsValid(rel->rd_rel->reltablespace))
rel->rd_node.spcNode = rel->rd_rel->reltablespace;
else
rel->rd_node.spcNode = MyDatabaseTableSpace;
rel->rd_node.dbNode = MyDatabaseId;
rel->rd_node.relNode = rel->rd_rel->relfilenode;
#endif
}
static Relation
cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
{
Relation rel = heap_open(relationId, lockmode);
cstore_fdw_initrel(rel);
return rel;
}
static Relation
cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
{
Relation rel = heap_openrv(relation, lockmode);
cstore_fdw_initrel(rel);
return rel;
}