mirror of https://github.com/citusdata/citus.git
Merge pull request #4 from citusdata/fdw-relfilenode
create relfilenode for FDWmerge-cstore-pykello
commit
b8b5d3aeee
241
cstore_fdw.c
241
cstore_fdw.c
|
@ -21,9 +21,13 @@
|
|||
#include "access/heapam.h"
|
||||
#include "access/reloptions.h"
|
||||
#include "access/tuptoaster.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/catalog.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_foreign_table.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
#include "catalog/storage.h"
|
||||
#include "commands/copy.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "commands/defrem.h"
|
||||
|
@ -50,18 +54,20 @@
|
|||
#include "parser/parser.h"
|
||||
#include "parser/parse_coerce.h"
|
||||
#include "parser/parse_type.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#if PG_VERSION_NUM < 120000
|
||||
#include "utils/rel.h"
|
||||
#endif
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
#include "utils/snapmgr.h"
|
||||
#else
|
||||
#include "utils/tqual.h"
|
||||
#endif
|
||||
#if PG_VERSION_NUM < 120000
|
||||
#include "utils/rel.h"
|
||||
#endif
|
||||
#include "utils/syscache.h"
|
||||
|
||||
#include "cstore.h"
|
||||
#include "cstore_fdw.h"
|
||||
|
@ -121,9 +127,10 @@ static uint64 CopyIntoCStoreTable(const CopyStmt *copyStatement,
|
|||
const char *queryString);
|
||||
static uint64 CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString);
|
||||
static void CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement);
|
||||
static List * DroppedCStoreFilenameList(DropStmt *dropStatement);
|
||||
static List * DroppedCStoreRelidList(DropStmt *dropStatement);
|
||||
static List * FindCStoreTables(List *tableList);
|
||||
static List * OpenRelationsForTruncate(List *cstoreTableList);
|
||||
static void InitializeRelFileNode(Relation relation);
|
||||
static void TruncateCStoreTables(List *cstoreRelationList);
|
||||
static bool CStoreTable(Oid relationId);
|
||||
static bool CStoreServer(ForeignServer *server);
|
||||
|
@ -183,6 +190,9 @@ static void CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relatio
|
|||
static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
|
||||
RangeTblEntry *rte);
|
||||
#endif
|
||||
static void cstore_fdw_initrel(Relation rel);
|
||||
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
|
||||
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
|
||||
|
||||
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
|
||||
PG_FUNCTION_INFO_V1(cstore_table_size);
|
||||
|
@ -261,7 +271,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
|||
{
|
||||
Oid relationId = RangeVarGetRelid(createStatement->base.relation,
|
||||
AccessShareLock, false);
|
||||
Relation relation = heap_open(relationId, AccessExclusiveLock);
|
||||
Relation relation = cstore_fdw_open(relationId, AccessExclusiveLock);
|
||||
|
||||
/*
|
||||
* Make sure database directory exists before creating a table.
|
||||
|
@ -359,17 +369,43 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
|||
}
|
||||
else
|
||||
{
|
||||
ListCell *fileListCell = NULL;
|
||||
List *droppedTables = DroppedCStoreFilenameList((DropStmt *) parseTree);
|
||||
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
|
||||
List *dropFiles = NIL;
|
||||
ListCell *lc = NULL;
|
||||
|
||||
/* drop smgr storage */
|
||||
foreach(lc, dropRelids)
|
||||
{
|
||||
Oid relid = lfirst_oid(lc);
|
||||
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(relid);
|
||||
char *defaultfilename = CStoreDefaultFilePath(relid);
|
||||
|
||||
RelationOpenSmgr(relation);
|
||||
RelationDropStorage(relation);
|
||||
heap_close(relation, AccessExclusiveLock);
|
||||
|
||||
/*
|
||||
* Skip files that are placed in default location, they are handled
|
||||
* by sql drop trigger. Both paths are generated by code, use
|
||||
* of strcmp is safe here.
|
||||
*/
|
||||
if (strcmp(defaultfilename, cstoreOptions->filename) == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
dropFiles = lappend(dropFiles, cstoreOptions->filename);
|
||||
}
|
||||
|
||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
||||
destReceiver, completionTag);
|
||||
|
||||
foreach(fileListCell, droppedTables)
|
||||
/* drop files */
|
||||
foreach(lc, dropFiles)
|
||||
{
|
||||
char *fileName = lfirst(fileListCell);
|
||||
|
||||
DeleteCStoreTableFiles(fileName);
|
||||
char *filename = lfirst(lc);
|
||||
DeleteCStoreTableFiles(filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -562,7 +598,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
* Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow
|
||||
* concurrent reads, but block concurrent writes.
|
||||
*/
|
||||
relation = heap_openrv(copyStatement->relation, ShareUpdateExclusiveLock);
|
||||
relation = cstore_fdw_openrv(copyStatement->relation, ShareUpdateExclusiveLock);
|
||||
relationId = RelationGetRelid(relation);
|
||||
|
||||
/* allocate column values and nulls arrays */
|
||||
|
@ -773,13 +809,13 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
|
||||
|
||||
/*
|
||||
* DropppedCStoreFilenameList extracts and returns the list of cstore file names
|
||||
* DropppedCStoreRelidList extracts and returns the list of cstore relids
|
||||
* from DROP table statement
|
||||
*/
|
||||
static List *
|
||||
DroppedCStoreFilenameList(DropStmt *dropStatement)
|
||||
DroppedCStoreRelidList(DropStmt *dropStatement)
|
||||
{
|
||||
List *droppedCStoreFileList = NIL;
|
||||
List *droppedCStoreRelidList = NIL;
|
||||
|
||||
if (dropStatement->removeType == OBJECT_FOREIGN_TABLE)
|
||||
{
|
||||
|
@ -792,26 +828,13 @@ DroppedCStoreFilenameList(DropStmt *dropStatement)
|
|||
Oid relationId = RangeVarGetRelid(rangeVar, AccessShareLock, true);
|
||||
if (CStoreTable(relationId))
|
||||
{
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(relationId);
|
||||
char *defaultfilename = CStoreDefaultFilePath(relationId);
|
||||
|
||||
/*
|
||||
* Skip files that are placed in default location, they are handled
|
||||
* by sql drop trigger. Both paths are generated by code, use
|
||||
* of strcmp is safe here.
|
||||
*/
|
||||
if (strcmp(defaultfilename, cstoreOptions->filename) == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
droppedCStoreFileList = lappend(droppedCStoreFileList,
|
||||
cstoreOptions->filename);
|
||||
droppedCStoreRelidList = lappend_oid(droppedCStoreRelidList,
|
||||
relationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return droppedCStoreFileList;
|
||||
return droppedCStoreRelidList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -850,7 +873,7 @@ OpenRelationsForTruncate(List *cstoreTableList)
|
|||
foreach(relationCell, cstoreTableList)
|
||||
{
|
||||
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
|
||||
Relation relation = heap_openrv(rangeVar, AccessExclusiveLock);
|
||||
Relation relation = cstore_fdw_openrv(rangeVar, AccessExclusiveLock);
|
||||
Oid relationId = relation->rd_id;
|
||||
AclResult aclresult = pg_class_aclcheck(relationId, GetUserId(),
|
||||
ACL_TRUNCATE);
|
||||
|
@ -889,11 +912,85 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
Assert(CStoreTable(relationId));
|
||||
|
||||
cstoreOptions = CStoreGetOptions(relationId);
|
||||
if (OidIsValid(relation->rd_rel->relfilenode))
|
||||
{
|
||||
RelationOpenSmgr(relation);
|
||||
RelationDropStorage(relation);
|
||||
}
|
||||
DeleteCStoreTableFiles(cstoreOptions->filename);
|
||||
InitializeCStoreTableFile(relationId, relation, CStoreGetOptions(relationId));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Version 11 and earlier already create a relfilenode for foreign
|
||||
* tables. Version 12 and later do not, so we need to create one manually.
|
||||
*/
|
||||
static void
|
||||
InitializeRelFileNode(Relation relation)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
Relation pg_class;
|
||||
HeapTuple tuple;
|
||||
Form_pg_class classform;
|
||||
|
||||
/*
|
||||
* Get a writable copy of the pg_class tuple for the given relation.
|
||||
*/
|
||||
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
|
||||
|
||||
tuple = SearchSysCacheCopy1(RELOID,
|
||||
ObjectIdGetDatum(RelationGetRelid(relation)));
|
||||
if (!HeapTupleIsValid(tuple))
|
||||
elog(ERROR, "could not find tuple for relation %u",
|
||||
RelationGetRelid(relation));
|
||||
classform = (Form_pg_class) GETSTRUCT(tuple);
|
||||
|
||||
if (!OidIsValid(classform->relfilenode))
|
||||
{
|
||||
Relation tmprel;
|
||||
Oid tablespace;
|
||||
Oid filenode = relation->rd_id;
|
||||
char persistence = relation->rd_rel->relpersistence;
|
||||
RelFileNode newrnode;
|
||||
SMgrRelation srel;
|
||||
|
||||
/*
|
||||
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
||||
* transaction. This shouldn't happen during a read, but it's hard to
|
||||
* prove that because it happens lazily.
|
||||
*/
|
||||
tmprel = heap_open(relation->rd_id, AccessExclusiveLock);
|
||||
heap_close(tmprel, NoLock);
|
||||
|
||||
if (OidIsValid(relation->rd_rel->reltablespace))
|
||||
tablespace = relation->rd_rel->reltablespace;
|
||||
else
|
||||
tablespace = MyDatabaseTableSpace;
|
||||
|
||||
newrnode.spcNode = tablespace;
|
||||
newrnode.dbNode = MyDatabaseId;
|
||||
newrnode.relNode = filenode;
|
||||
|
||||
srel = RelationCreateStorage(newrnode, persistence);
|
||||
smgrclose(srel);
|
||||
|
||||
classform->relfilenode = filenode;
|
||||
classform->relpages = 0; /* it's empty until further notice */
|
||||
classform->reltuples = 0;
|
||||
classform->relallvisible = 0;
|
||||
classform->relfrozenxid = InvalidTransactionId;
|
||||
classform->relminmxid = InvalidTransactionId;
|
||||
|
||||
CatalogTupleUpdate(pg_class, &tuple->t_self, tuple);
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
heap_freetuple(tuple);
|
||||
heap_close(pg_class, RowExclusiveLock);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CStoreTable checks if the given table name belongs to a foreign columnar store
|
||||
|
@ -1179,6 +1276,15 @@ cstore_clean_table_resources(PG_FUNCTION_ARGS)
|
|||
struct stat fileStat;
|
||||
int statResult = -1;
|
||||
|
||||
/*
|
||||
* TODO: Event triggers do not offer the relfilenode of the
|
||||
* dropped table, and by the time the sql_drop event trigger
|
||||
* is called, the object is already gone so we can't look it
|
||||
* up. Therefore, we can't drop the Smgr storage here, which
|
||||
* means that cascaded drops of cstore foreign tables will
|
||||
* leak storage.
|
||||
*/
|
||||
|
||||
appendStringInfo(filePath, "%s/%s/%d/%d", DataDir, CSTORE_FDW_NAME,
|
||||
(int) MyDatabaseId, (int) relationId);
|
||||
|
||||
|
@ -1402,7 +1508,7 @@ static char *
|
|||
CStoreDefaultFilePath(Oid foreignTableId)
|
||||
{
|
||||
StringInfo cstoreFilePath = NULL;
|
||||
Relation relation = relation_open(foreignTableId, AccessShareLock);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
RelFileNode relationFileNode = relation->rd_node;
|
||||
Oid databaseOid = relationFileNode.dbNode;
|
||||
Oid relationFileOid = relationFileNode.relNode;
|
||||
|
@ -1453,7 +1559,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId
|
|||
{
|
||||
Path *foreignScanPath = NULL;
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
Relation relation = heap_open(foreignTableId, AccessShareLock);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
|
||||
/*
|
||||
* We skip reading columns that are not in query. Here we assume that all
|
||||
|
@ -1659,7 +1765,7 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
|
|||
List *restrictInfoList = baserel->baserestrictinfo;
|
||||
ListCell *restrictInfoCell = NULL;
|
||||
const AttrNumber wholeRow = 0;
|
||||
Relation relation = heap_open(foreignTableId, AccessShareLock);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||
|
||||
/* first add the columns used in joins and projections */
|
||||
|
@ -1750,10 +1856,13 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
|
|||
static void
|
||||
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
|
||||
{
|
||||
Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
Relation relation = scanState->ss.ss_currentRelation;
|
||||
CStoreOptions *cstoreOptions;
|
||||
Oid foreignTableId;
|
||||
|
||||
ExplainPropertyText("CStore File", cstoreOptions->filename, explainState);
|
||||
cstore_fdw_initrel(relation);
|
||||
foreignTableId = RelationGetRelid(relation);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
|
||||
/* supress file size if we're not showing cost details */
|
||||
if (explainState->costs)
|
||||
|
@ -1784,6 +1893,8 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
|||
List *foreignPrivateList = NIL;
|
||||
List *whereClauseList = NIL;
|
||||
|
||||
cstore_fdw_initrel(currentRelation);
|
||||
|
||||
/* if Explain with no Analyze, do nothing */
|
||||
if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY)
|
||||
{
|
||||
|
@ -1869,9 +1980,12 @@ CStoreAnalyzeForeignTable(Relation relation,
|
|||
BlockNumber *totalPageCount)
|
||||
{
|
||||
Oid foreignTableId = RelationGetRelid(relation);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
CStoreOptions *cstoreOptions;
|
||||
struct stat statBuffer;
|
||||
|
||||
cstore_fdw_initrel(relation);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableId);
|
||||
|
||||
int statResult = stat(cstoreOptions->filename, &statBuffer);
|
||||
if (statResult < 0)
|
||||
{
|
||||
|
@ -1924,6 +2038,7 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
|
|||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
|
||||
cstore_fdw_initrel(relation);
|
||||
|
||||
/* create list of columns of the relation */
|
||||
uint32 columnIndex = 0;
|
||||
|
@ -2147,7 +2262,7 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
|
|||
Relation relation = NULL;
|
||||
|
||||
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
|
||||
relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock);
|
||||
relation = cstore_fdw_open(foreignTableOid, ShareUpdateExclusiveLock);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
|
@ -2246,3 +2361,49 @@ CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
|
|||
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Versions 12 and later do not initialize rd_node even if the relation has a
|
||||
* valid relfilenode, so we need to initialize it each time a cstore FDW
|
||||
* relation is opened.
|
||||
*/
|
||||
static void
|
||||
cstore_fdw_initrel(Relation rel)
|
||||
{
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
if (rel->rd_rel->relfilenode == InvalidOid)
|
||||
InitializeRelFileNode(rel);
|
||||
|
||||
/*
|
||||
* Copied code from RelationInitPhysicalAddr(), which doesn't
|
||||
* work on foreign tables.
|
||||
*/
|
||||
if (OidIsValid(rel->rd_rel->reltablespace))
|
||||
rel->rd_node.spcNode = rel->rd_rel->reltablespace;
|
||||
else
|
||||
rel->rd_node.spcNode = MyDatabaseTableSpace;
|
||||
|
||||
rel->rd_node.dbNode = MyDatabaseId;
|
||||
rel->rd_node.relNode = rel->rd_rel->relfilenode;
|
||||
#endif
|
||||
}
|
||||
|
||||
static Relation
|
||||
cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
|
||||
{
|
||||
Relation rel = heap_open(relationId, lockmode);
|
||||
|
||||
cstore_fdw_initrel(rel);
|
||||
|
||||
return rel;
|
||||
}
|
||||
|
||||
static Relation
|
||||
cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
|
||||
{
|
||||
Relation rel = heap_openrv(relation, lockmode);
|
||||
|
||||
cstore_fdw_initrel(rel);
|
||||
|
||||
return rel;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue