mirror of https://github.com/citusdata/citus.git
fix indentation via citus_indent
parent
20a8bca426
commit
1e93e15a8d
4
cstore.h
4
cstore.h
|
@ -204,6 +204,7 @@ typedef struct TableReadState
|
||||||
TableMetadata *tableMetadata;
|
TableMetadata *tableMetadata;
|
||||||
TupleDesc tupleDescriptor;
|
TupleDesc tupleDescriptor;
|
||||||
Relation relation;
|
Relation relation;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* List of Var pointers for columns in the query. We use this both for
|
* List of Var pointers for columns in the query. We use this both for
|
||||||
* getting vector of projected columns, and also when we want to build
|
* getting vector of projected columns, and also when we want to build
|
||||||
|
@ -301,7 +302,7 @@ extern StripeSkipList * ReadStripeSkipList(Oid relid, uint64 stripe,
|
||||||
typedef struct SmgrAddr
|
typedef struct SmgrAddr
|
||||||
{
|
{
|
||||||
BlockNumber blockno;
|
BlockNumber blockno;
|
||||||
uint32 offset;
|
uint32 offset;
|
||||||
} SmgrAddr;
|
} SmgrAddr;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -320,4 +321,5 @@ logical_to_smgr(uint64 logicalOffset)
|
||||||
return addr;
|
return addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif /* CSTORE_H */
|
#endif /* CSTORE_H */
|
||||||
|
|
46
cstore_fdw.c
46
cstore_fdw.c
|
@ -319,14 +319,14 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
||||||
}
|
}
|
||||||
else if (nodeTag(parseTree) == T_DropStmt)
|
else if (nodeTag(parseTree) == T_DropStmt)
|
||||||
{
|
{
|
||||||
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
|
List *dropRelids = DroppedCStoreRelidList((DropStmt *) parseTree);
|
||||||
ListCell *lc = NULL;
|
ListCell *lc = NULL;
|
||||||
|
|
||||||
/* drop smgr storage */
|
/* drop smgr storage */
|
||||||
foreach(lc, dropRelids)
|
foreach(lc, dropRelids)
|
||||||
{
|
{
|
||||||
Oid relid = lfirst_oid(lc);
|
Oid relid = lfirst_oid(lc);
|
||||||
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
|
Relation relation = cstore_fdw_open(relid, AccessExclusiveLock);
|
||||||
|
|
||||||
RelationOpenSmgr(relation);
|
RelationOpenSmgr(relation);
|
||||||
RelationDropStorage(relation);
|
RelationDropStorage(relation);
|
||||||
|
@ -334,7 +334,7 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
|
||||||
}
|
}
|
||||||
|
|
||||||
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
|
||||||
destReceiver, completionTag);
|
destReceiver, completionTag);
|
||||||
}
|
}
|
||||||
else if (nodeTag(parseTree) == T_TruncateStmt)
|
else if (nodeTag(parseTree) == T_TruncateStmt)
|
||||||
{
|
{
|
||||||
|
@ -833,6 +833,7 @@ TruncateCStoreTables(List *cstoreRelationList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Version 11 and earlier already assign a relfilenode for foreign
|
* Version 11 and earlier already assign a relfilenode for foreign
|
||||||
* tables. Version 12 and later do not, so we need to create one manually.
|
* tables. Version 12 and later do not, so we need to create one manually.
|
||||||
|
@ -840,26 +841,28 @@ TruncateCStoreTables(List *cstoreRelationList)
|
||||||
static void
|
static void
|
||||||
FdwNewRelFileNode(Relation relation)
|
FdwNewRelFileNode(Relation relation)
|
||||||
{
|
{
|
||||||
Relation pg_class;
|
Relation pg_class;
|
||||||
HeapTuple tuple;
|
HeapTuple tuple;
|
||||||
Form_pg_class classform;
|
Form_pg_class classform;
|
||||||
|
|
||||||
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
|
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
|
||||||
|
|
||||||
tuple = SearchSysCacheCopy1(RELOID,
|
tuple = SearchSysCacheCopy1(RELOID,
|
||||||
ObjectIdGetDatum(RelationGetRelid(relation)));
|
ObjectIdGetDatum(RelationGetRelid(relation)));
|
||||||
if (!HeapTupleIsValid(tuple))
|
if (!HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
elog(ERROR, "could not find tuple for relation %u",
|
elog(ERROR, "could not find tuple for relation %u",
|
||||||
RelationGetRelid(relation));
|
RelationGetRelid(relation));
|
||||||
|
}
|
||||||
classform = (Form_pg_class) GETSTRUCT(tuple);
|
classform = (Form_pg_class) GETSTRUCT(tuple);
|
||||||
|
|
||||||
if (true)
|
if (true)
|
||||||
{
|
{
|
||||||
char persistence = relation->rd_rel->relpersistence;
|
char persistence = relation->rd_rel->relpersistence;
|
||||||
Relation tmprel;
|
Relation tmprel;
|
||||||
Oid tablespace;
|
Oid tablespace;
|
||||||
Oid filenode;
|
Oid filenode;
|
||||||
RelFileNode newrnode;
|
RelFileNode newrnode;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
||||||
|
@ -870,12 +873,18 @@ FdwNewRelFileNode(Relation relation)
|
||||||
heap_close(tmprel, NoLock);
|
heap_close(tmprel, NoLock);
|
||||||
|
|
||||||
if (OidIsValid(relation->rd_rel->relfilenode))
|
if (OidIsValid(relation->rd_rel->relfilenode))
|
||||||
|
{
|
||||||
RelationDropStorage(relation);
|
RelationDropStorage(relation);
|
||||||
|
}
|
||||||
|
|
||||||
if (OidIsValid(relation->rd_rel->reltablespace))
|
if (OidIsValid(relation->rd_rel->reltablespace))
|
||||||
|
{
|
||||||
tablespace = relation->rd_rel->reltablespace;
|
tablespace = relation->rd_rel->reltablespace;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
tablespace = MyDatabaseTableSpace;
|
tablespace = MyDatabaseTableSpace;
|
||||||
|
}
|
||||||
|
|
||||||
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||||
|
|
||||||
|
@ -898,6 +907,7 @@ FdwNewRelFileNode(Relation relation)
|
||||||
heap_close(pg_class, RowExclusiveLock);
|
heap_close(pg_class, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
FdwCreateStorage(Relation relation)
|
FdwCreateStorage(Relation relation)
|
||||||
{
|
{
|
||||||
|
@ -1692,7 +1702,7 @@ ColumnList(RelOptInfo *baserel, Oid foreignTableId)
|
||||||
static void
|
static void
|
||||||
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
|
CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState)
|
||||||
{
|
{
|
||||||
Relation relation = scanState->ss.ss_currentRelation;
|
Relation relation = scanState->ss.ss_currentRelation;
|
||||||
|
|
||||||
cstore_fdw_initrel(relation);
|
cstore_fdw_initrel(relation);
|
||||||
|
|
||||||
|
@ -2187,16 +2197,22 @@ cstore_fdw_initrel(Relation rel)
|
||||||
{
|
{
|
||||||
#if PG_VERSION_NUM >= 120000
|
#if PG_VERSION_NUM >= 120000
|
||||||
if (rel->rd_rel->relfilenode == InvalidOid)
|
if (rel->rd_rel->relfilenode == InvalidOid)
|
||||||
|
{
|
||||||
FdwNewRelFileNode(rel);
|
FdwNewRelFileNode(rel);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copied code from RelationInitPhysicalAddr(), which doesn't
|
* Copied code from RelationInitPhysicalAddr(), which doesn't
|
||||||
* work on foreign tables.
|
* work on foreign tables.
|
||||||
*/
|
*/
|
||||||
if (OidIsValid(rel->rd_rel->reltablespace))
|
if (OidIsValid(rel->rd_rel->reltablespace))
|
||||||
|
{
|
||||||
rel->rd_node.spcNode = rel->rd_rel->reltablespace;
|
rel->rd_node.spcNode = rel->rd_rel->reltablespace;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
rel->rd_node.spcNode = MyDatabaseTableSpace;
|
rel->rd_node.spcNode = MyDatabaseTableSpace;
|
||||||
|
}
|
||||||
|
|
||||||
rel->rd_node.dbNode = MyDatabaseId;
|
rel->rd_node.dbNode = MyDatabaseId;
|
||||||
rel->rd_node.relNode = rel->rd_rel->relfilenode;
|
rel->rd_node.relNode = rel->rd_rel->relfilenode;
|
||||||
|
@ -2204,6 +2220,7 @@ cstore_fdw_initrel(Relation rel)
|
||||||
FdwCreateStorage(rel);
|
FdwCreateStorage(rel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Relation
|
static Relation
|
||||||
cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
|
cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
|
||||||
{
|
{
|
||||||
|
@ -2214,6 +2231,7 @@ cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
|
||||||
return rel;
|
return rel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Relation
|
static Relation
|
||||||
cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
|
cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
|
||||||
{
|
{
|
||||||
|
|
|
@ -692,9 +692,12 @@ create_estate_for_relation(Relation rel)
|
||||||
estate->es_output_cid = GetCurrentCommandId(true);
|
estate->es_output_cid = GetCurrentCommandId(true);
|
||||||
|
|
||||||
#if PG_VERSION_NUM < 120000
|
#if PG_VERSION_NUM < 120000
|
||||||
|
|
||||||
/* Triggers might need a slot */
|
/* Triggers might need a slot */
|
||||||
if (resultRelInfo->ri_TrigDesc)
|
if (resultRelInfo->ri_TrigDesc)
|
||||||
|
{
|
||||||
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
|
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate, NULL);
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Prepare to catch AFTER triggers. */
|
/* Prepare to catch AFTER triggers. */
|
||||||
|
|
|
@ -1037,26 +1037,27 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor
|
||||||
return defaultValue;
|
return defaultValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static StringInfo
|
static StringInfo
|
||||||
ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
|
ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
|
||||||
{
|
{
|
||||||
StringInfo resultBuffer = makeStringInfo();
|
StringInfo resultBuffer = makeStringInfo();
|
||||||
uint64 read = 0;
|
uint64 read = 0;
|
||||||
|
|
||||||
enlargeStringInfo(resultBuffer, size);
|
enlargeStringInfo(resultBuffer, size);
|
||||||
resultBuffer->len = size;
|
resultBuffer->len = size;
|
||||||
|
|
||||||
while (read < size)
|
while (read < size)
|
||||||
{
|
{
|
||||||
Buffer buffer;
|
Buffer buffer;
|
||||||
Page page;
|
Page page;
|
||||||
PageHeader phdr;
|
PageHeader phdr;
|
||||||
uint32 to_read;
|
uint32 to_read;
|
||||||
SmgrAddr addr = logical_to_smgr(offset + read);
|
SmgrAddr addr = logical_to_smgr(offset + read);
|
||||||
|
|
||||||
buffer = ReadBuffer(rel, addr.blockno);
|
buffer = ReadBuffer(rel, addr.blockno);
|
||||||
page = BufferGetPage(buffer);
|
page = BufferGetPage(buffer);
|
||||||
phdr = (PageHeader)page;
|
phdr = (PageHeader) page;
|
||||||
|
|
||||||
to_read = Min(size - read, phdr->pd_upper - addr.offset);
|
to_read = Min(size - read, phdr->pd_upper - addr.offset);
|
||||||
memcpy(resultBuffer->data + read, page + addr.offset, to_read);
|
memcpy(resultBuffer->data + read, page + addr.offset, to_read);
|
||||||
|
@ -1067,6 +1068,7 @@ ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
|
||||||
return resultBuffer;
|
return resultBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ResetUncompressedBlockData iterates over deserialized column block data
|
* ResetUncompressedBlockData iterates over deserialized column block data
|
||||||
* and sets valueBuffer field to empty buffer. This field is allocated in stripe
|
* and sets valueBuffer field to empty buffer. This field is allocated in stripe
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
*
|
*
|
||||||
* cstore_version_compat.h
|
* cstore_version_compat.h
|
||||||
*
|
*
|
||||||
* Compatibility macros for writing code agnostic to PostgreSQL versions
|
* Compatibility macros for writing code agnostic to PostgreSQL versions
|
||||||
*
|
*
|
||||||
* Copyright (c) 2018, Citus Data, Inc.
|
* Copyright (c) 2018, Citus Data, Inc.
|
||||||
*
|
*
|
||||||
|
|
|
@ -363,21 +363,22 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
||||||
return stripeSkipList;
|
return stripeSkipList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||||
{
|
{
|
||||||
uint64 logicalOffset = writeState->currentFileOffset;
|
uint64 logicalOffset = writeState->currentFileOffset;
|
||||||
uint64 remaining = dataLength;
|
uint64 remaining = dataLength;
|
||||||
Relation rel = writeState->relation;
|
Relation rel = writeState->relation;
|
||||||
Buffer buffer;
|
Buffer buffer;
|
||||||
|
|
||||||
while (remaining > 0)
|
while (remaining > 0)
|
||||||
{
|
{
|
||||||
SmgrAddr addr = logical_to_smgr(logicalOffset);
|
SmgrAddr addr = logical_to_smgr(logicalOffset);
|
||||||
BlockNumber nblocks;
|
BlockNumber nblocks;
|
||||||
Page page;
|
Page page;
|
||||||
PageHeader phdr;
|
PageHeader phdr;
|
||||||
uint64 to_write;
|
uint64 to_write;
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
RelationOpenSmgr(rel);
|
||||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||||
|
@ -397,7 +398,9 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||||
page = BufferGetPage(buffer);
|
page = BufferGetPage(buffer);
|
||||||
phdr = (PageHeader) page;
|
phdr = (PageHeader) page;
|
||||||
if (PageIsNew(page))
|
if (PageIsNew(page))
|
||||||
|
{
|
||||||
PageInit(page, BLCKSZ, 0);
|
PageInit(page, BLCKSZ, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/* always appending */
|
/* always appending */
|
||||||
Assert(phdr->pd_lower == addr.offset);
|
Assert(phdr->pd_lower == addr.offset);
|
||||||
|
@ -434,6 +437,7 @@ WriteToSmgr(TableWriteState *writeState, char *data, uint32 dataLength)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FlushStripe flushes current stripe data into the file. The function first ensures
|
* FlushStripe flushes current stripe data into the file. The function first ensures
|
||||||
* the last data block for each column is properly serialized and compressed. Then,
|
* the last data block for each column is properly serialized and compressed. Then,
|
||||||
|
@ -832,6 +836,7 @@ AppendStripeMetadata(TableMetadata *tableMetadata, StripeMetadata stripeMetadata
|
||||||
stripeMetadataCopy);
|
stripeMetadataCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CopyStringInfo creates a deep copy of given source string allocating only needed
|
* CopyStringInfo creates a deep copy of given source string allocating only needed
|
||||||
* amount of memory.
|
* amount of memory.
|
||||||
|
|
Loading…
Reference in New Issue