diff --git a/cstore_fdw.c b/cstore_fdw.c index 8ce3a7296..073a68130 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -21,9 +21,13 @@ #include "access/heapam.h" #include "access/reloptions.h" #include "access/tuptoaster.h" +#include "access/xact.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_foreign_table.h" #include "catalog/pg_namespace.h" +#include "catalog/storage.h" #include "commands/copy.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -50,18 +54,20 @@ #include "parser/parser.h" #include "parser/parse_coerce.h" #include "parser/parse_type.h" +#include "storage/smgr.h" #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#if PG_VERSION_NUM < 120000 +#include "utils/rel.h" +#endif #if PG_VERSION_NUM >= 120000 #include "utils/snapmgr.h" #else #include "utils/tqual.h" #endif -#if PG_VERSION_NUM < 120000 -#include "utils/rel.h" -#endif +#include "utils/syscache.h" #include "cstore.h" #include "cstore_fdw.h" @@ -124,6 +130,7 @@ static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement); static List * DroppedCStoreFilenameList(DropStmt *dropStatement); static List * FindCStoreTables(List *tableList); static List * OpenRelationsForTruncate(List *cstoreTableList); +static void InitializeRelFileNode(Relation relation); static void TruncateCStoreTables(List *cstoreRelationList); static bool CStoreTable(Oid relationId); static bool CStoreServer(ForeignServer *server); @@ -183,6 +190,9 @@ static void CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relatio static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte); #endif +static void cstore_fdw_initrel(Relation rel); +static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode); +static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode); PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger); PG_FUNCTION_INFO_V1(cstore_table_size); @@ -261,7 +271,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS) { Oid relationId = RangeVarGetRelid(createStatement->base.relation, AccessShareLock, false); - Relation relation = heap_open(relationId, AccessExclusiveLock); + Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock); /* * Make sure database directory exists before creating a table. @@ -368,7 +378,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString, foreach(fileListCell, droppedTables) { char *fileName = lfirst(fileListCell); - + //TODO: relation storage is not dropped DeleteCStoreTableFiles(fileName); } } @@ -562,7 +572,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) * Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow * concurrent reads, but block concurrent writes. */ - relation = heap_openrv(copyStatement->relation, ShareUpdateExclusiveLock); + relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock); relationId = RelationGetRelid(relation); /* allocate column values and nulls arrays */ @@ -850,7 +860,7 @@ OpenRelationsForTruncate(List *cstoreTableList) foreach(relationCell, cstoreTableList) { RangeVar *rangeVar = (RangeVar *) lfirst(relationCell); - Relation relation = heap_openrv(rangeVar, AccessExclusiveLock); + Relation relation = cstore_fdw_openrv(rangeVar, AccessExclusiveLock); Oid relationId = relation->rd_id; AclResult aclresult = pg_class_aclcheck(relationId, GetUserId(), ACL_TRUNCATE); @@ -889,11 +899,76 @@ TruncateCStoreTables(List *cstoreRelationList) Assert(CStoreTable(relationId)); cstoreOptions = CStoreGetOptions(relationId); + if (OidIsValid(relation->rd_rel->relfilenode)) + { + RelationOpenSmgr(relation); + RelationDropStorage(relation); + } DeleteCStoreTableFiles(cstoreOptions->filename); InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId)); } } +/* + * Version 11 and earlier already create a relfilenode for foreign + * tables. Version 12 and later do not, so we need to create one manually. + */ +static void +InitializeRelFileNode(Relation relation) +{ +#if PG_VERSION_NUM >= 120000 + Relation pg_class; + HeapTuple tuple; + Form_pg_class classform; + + /* + * Get a writable copy of the pg_class tuple for the given relation. + */ + pg_class = heap_open(RelationRelationId, RowExclusiveLock); + + tuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(RelationGetRelid(relation))); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "could not find tuple for relation %u", + RelationGetRelid(relation)); + classform = (Form_pg_class) GETSTRUCT(tuple); + + if (!OidIsValid(classform->relfilenode)) + { + Oid tablespace; + Oid filenode = relation->rd_id; + char persistence = relation->rd_rel->relpersistence; + RelFileNode newrnode; + SMgrRelation srel; + + if (OidIsValid(relation->rd_rel->reltablespace)) + tablespace = relation->rd_rel->reltablespace; + else + tablespace = MyDatabaseTableSpace; + + newrnode.spcNode = tablespace; + newrnode.dbNode = MyDatabaseId; + newrnode.relNode = filenode; + + srel = RelationCreateStorage(newrnode, persistence); + smgrclose(srel); + + classform->relfilenode = filenode; + classform->relpages = 0; /* it's empty until further notice */ + classform->reltuples = 0; + classform->relallvisible = 0; + classform->relfrozenxid = InvalidTransactionId; + classform->relminmxid = InvalidTransactionId; + + CatalogTupleUpdate(pg_class, &tuple->t_self, tuple); + CommandCounterIncrement(); + } + + heap_freetuple(tuple); + heap_close(pg_class, RowExclusiveLock); +#endif +} + /* * CStoreTable checks if the given table name belongs to a foreign columnar store @@ -1179,6 +1254,7 @@ cstore_clean_table_resources(PG_FUNCTION_ARGS) struct stat fileStat; int statResult = -1; + //TODO: relation storage is not dropped appendStringInfo(filePath, "%s/%s/%d/%d", DataDir, CSTORE_FDW_NAME, (int) MyDatabaseId, (int) relationId); @@ -1402,7 +1478,7 @@ static char * CStoreDefaultFilePath(Oid foreignTableId) { StringInfo cstoreFilePath = NULL; - Relation relation = relation_open(foreignTableId, AccessShareLock); + Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock); RelFileNode relationFileNode = relation->rd_node; Oid databaseOid = relationFileNode.dbNode; Oid relationFileOid = relationFileNode.relNode; @@ -1453,7 +1529,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId { Path *foreignScanPath = NULL; CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); - Relation relation = heap_open(foreignTableId, AccessShareLock); + Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock); /* * We skip reading columns that are not in query. Here we assume that all @@ -1659,7 +1735,7 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId) List *restrictInfoList = baserel->baserestrictinfo; ListCell *restrictInfoCell = NULL; const AttrNumber wholeRow = 0; - Relation relation = heap_open(foreignTableId, AccessShareLock); + Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(relation); /* first add the columns used in joins and projections */ @@ -1750,10 +1826,13 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId) static void CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState) { - Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); - CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); + Relation relation = scanState->ss.ss_currentRelation; + CStoreOptions *cstoreOptions; + Oid foreignTableId; - ExplainPropertyText("CStore File", cstoreOptions->filename, explainState); + cstore_fdw_initrel(relation); + foreignTableId = RelationGetRelid(relation); + cstoreOptions = CStoreGetOptions(foreignTableId); /* supress file size if we're not showing cost details */ if (explainState->costs) @@ -1784,6 +1863,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags) List *foreignPrivateList = NIL; List *whereClauseList = NIL; + cstore_fdw_initrel(currentRelation); + /* if Explain with no Analyze, do nothing */ if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY) { @@ -1869,9 +1950,12 @@ CStoreAnalyzeForeignTable(Relation relation, BlockNumber *totalPageCount) { Oid foreignTableId = RelationGetRelid(relation); - CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId); + CStoreOptions *cstoreOptions; struct stat statBuffer; + cstore_fdw_initrel(relation); + cstoreOptions = CStoreGetOptions(foreignTableId); + int statResult = stat(cstoreOptions->filename, &statBuffer); if (statResult < 0) { @@ -1924,6 +2008,7 @@ CStoreAcquireSampleRows(Relation relation, int logLevel, TupleDesc tupleDescriptor = RelationGetDescr(relation); uint32 columnCount = tupleDescriptor->natts; + cstore_fdw_initrel(relation); /* create list of columns of the relation */ uint32 columnIndex = 0; @@ -2147,7 +2232,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela Relation relation = NULL; foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc); - relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock); + relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock); cstoreOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); @@ -2246,3 +2331,49 @@ CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, #endif + +/* + * Versions 12 and later do not initialize rd_node even if the relation has a + * valid relfilenode, so we need to initialize it each time a cstore FDW + * relation is opened. + */ +static void +cstore_fdw_initrel(Relation rel) +{ +#if PG_VERSION_NUM >= 120000 + if (rel->rd_rel->relfilenode == InvalidOid) + InitializeRelFileNode(rel); + + /* + * Copied code from RelationInitPhysicalAddr(), which doesn't + * work on foreign tables. + */ + if (OidIsValid(rel->rd_rel->reltablespace)) + rel->rd_node.spcNode = rel->rd_rel->reltablespace; + else + rel->rd_node.spcNode = MyDatabaseTableSpace; + + rel->rd_node.dbNode = MyDatabaseId; + rel->rd_node.relNode = rel->rd_rel->relfilenode; +#endif +} + +static Relation +cstore_fdw_open(Oid relationId, LOCKMODE lockmode) +{ + Relation rel = heap_open(relationId, lockmode); + + cstore_fdw_initrel(rel); + + return rel; +} + +static Relation +cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode) +{ + Relation rel = heap_openrv(relation, lockmode); + + cstore_fdw_initrel(rel); + + return rel; +}