mirror of https://github.com/citusdata/citus.git
fix style
parent
3bb6554976
commit
b6d4a1bbe2
|
@ -136,8 +136,6 @@ static void
|
|||
CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
||||
RangeTblEntry *rte)
|
||||
{
|
||||
Relation relation;
|
||||
|
||||
/* call into previous hook if assigned */
|
||||
if (PreviousSetRelPathlistHook)
|
||||
{
|
||||
|
@ -161,7 +159,7 @@ CStoreSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
|||
* If that is the case we want to insert an extra path that pushes down the projection
|
||||
* into the scan of the table to minimize the data read.
|
||||
*/
|
||||
relation = RelationIdGetRelation(rte->relid);
|
||||
Relation relation = RelationIdGetRelation(rte->relid);
|
||||
if (relation->rd_tableam == GetCstoreTableAmRoutine())
|
||||
{
|
||||
Path *customPath = CreateCStoreScanPath(rel, rte);
|
||||
|
@ -181,19 +179,17 @@ CreateCStoreScanPath(RelOptInfo *rel, RangeTblEntry *rte)
|
|||
{
|
||||
CStoreScanPath *cspath = (CStoreScanPath *) newNode(sizeof(CStoreScanPath),
|
||||
T_CustomPath);
|
||||
CustomPath *cpath;
|
||||
Path *path;
|
||||
|
||||
/*
|
||||
* popuate custom path information
|
||||
*/
|
||||
cpath = &cspath->custom_path;
|
||||
CustomPath *cpath = &cspath->custom_path;
|
||||
cpath->methods = &CStoreScanPathMethods;
|
||||
|
||||
/*
|
||||
* populate generic path information
|
||||
*/
|
||||
path = &cpath->path;
|
||||
Path *path = &cpath->path;
|
||||
path->pathtype = T_CustomScan;
|
||||
path->parent = rel;
|
||||
path->pathtarget = rel->reltarget;
|
||||
|
@ -329,18 +325,14 @@ static TupleTableSlot *
|
|||
CStoreScanNext(CStoreScanState *cstorescanstate)
|
||||
{
|
||||
CustomScanState *node = (CustomScanState *) cstorescanstate;
|
||||
TableScanDesc scandesc;
|
||||
EState *estate;
|
||||
ScanDirection direction;
|
||||
TupleTableSlot *slot;
|
||||
|
||||
/*
|
||||
* get information from the estate and scan state
|
||||
*/
|
||||
scandesc = node->ss.ss_currentScanDesc;
|
||||
estate = node->ss.ps.state;
|
||||
direction = estate->es_direction;
|
||||
slot = node->ss.ss_ScanTupleSlot;
|
||||
TableScanDesc scandesc = node->ss.ss_currentScanDesc;
|
||||
EState *estate = node->ss.ps.state;
|
||||
ScanDirection direction = estate->es_direction;
|
||||
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
||||
|
||||
if (scandesc == NULL)
|
||||
{
|
||||
|
@ -394,12 +386,10 @@ CStoreScan_ExecCustomScan(CustomScanState *node)
|
|||
static void
|
||||
CStoreScan_EndCustomScan(CustomScanState *node)
|
||||
{
|
||||
TableScanDesc scanDesc;
|
||||
|
||||
/*
|
||||
* get information from node
|
||||
*/
|
||||
scanDesc = node->ss.ss_currentScanDesc;
|
||||
TableScanDesc scanDesc = node->ss.ss_currentScanDesc;
|
||||
|
||||
/*
|
||||
* Free the exprcontext
|
||||
|
|
|
@ -255,17 +255,14 @@ cstore_fdw_finish()
|
|||
Datum
|
||||
cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
|
||||
{
|
||||
EventTriggerData *triggerData = NULL;
|
||||
Node *parseTree = NULL;
|
||||
|
||||
/* error if event trigger manager did not call this function */
|
||||
if (!CALLED_AS_EVENT_TRIGGER(fcinfo))
|
||||
{
|
||||
ereport(ERROR, (errmsg("trigger not fired by event trigger manager")));
|
||||
}
|
||||
|
||||
triggerData = (EventTriggerData *) fcinfo->context;
|
||||
parseTree = triggerData->parsetree;
|
||||
EventTriggerData *triggerData = (EventTriggerData *) fcinfo->context;
|
||||
Node *parseTree = triggerData->parsetree;
|
||||
|
||||
if (nodeTag(parseTree) == T_CreateForeignTableStmt)
|
||||
{
|
||||
|
@ -495,16 +492,9 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
{
|
||||
uint64 processedRowCount = 0;
|
||||
Relation relation = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
uint32 columnCount = 0;
|
||||
CopyState copyState = NULL;
|
||||
bool nextRowFound = true;
|
||||
Datum *columnValues = NULL;
|
||||
bool *columnNulls = NULL;
|
||||
TableWriteState *writeState = NULL;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
MemoryContext tupleContext = NULL;
|
||||
|
||||
/* Only superuser can copy from or to local file */
|
||||
CheckSuperuserPrivilegesForCopy(copyStatement);
|
||||
|
@ -516,15 +506,15 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
* concurrent reads and writes.
|
||||
*/
|
||||
relation = cstore_fdw_openrv(copyStatement->relation, RowExclusiveLock);
|
||||
relationId = RelationGetRelid(relation);
|
||||
Oid relationId = RelationGetRelid(relation);
|
||||
|
||||
/* allocate column values and nulls arrays */
|
||||
tupleDescriptor = RelationGetDescr(relation);
|
||||
columnCount = tupleDescriptor->natts;
|
||||
columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(relation);
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
bool *columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
|
||||
cstoreOptions = CStoreGetOptions(relationId);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(relationId);
|
||||
|
||||
/*
|
||||
* We create a new memory context called tuple context, and read and write
|
||||
|
@ -533,9 +523,9 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
|
|||
* allocated for each row, and don't bloat memory usage with large input
|
||||
* files.
|
||||
*/
|
||||
tupleContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CStore COPY Row Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext tupleContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"CStore COPY Row Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
/* init state to read from COPY data source */
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
|
@ -606,10 +596,6 @@ static uint64
|
|||
CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString)
|
||||
{
|
||||
uint64 processedCount = 0;
|
||||
RangeVar *relation = NULL;
|
||||
char *qualifiedName = NULL;
|
||||
List *queryList = NIL;
|
||||
Node *rawQuery = NULL;
|
||||
|
||||
StringInfo newQuerySubstring = makeStringInfo();
|
||||
|
||||
|
@ -621,14 +607,14 @@ CopyOutCStoreTable(CopyStmt *copyStatement, const char *queryString)
|
|||
"...' instead")));
|
||||
}
|
||||
|
||||
relation = copyStatement->relation;
|
||||
qualifiedName = quote_qualified_identifier(relation->schemaname,
|
||||
relation->relname);
|
||||
RangeVar *relation = copyStatement->relation;
|
||||
char *qualifiedName = quote_qualified_identifier(relation->schemaname,
|
||||
relation->relname);
|
||||
appendStringInfo(newQuerySubstring, "select * from %s", qualifiedName);
|
||||
queryList = raw_parser(newQuerySubstring->data);
|
||||
List *queryList = raw_parser(newQuerySubstring->data);
|
||||
|
||||
/* take the first parse tree */
|
||||
rawQuery = linitial(queryList);
|
||||
Node *rawQuery = linitial(queryList);
|
||||
|
||||
/*
|
||||
* Set the relation field to NULL so that COPY command works on
|
||||
|
@ -674,7 +660,6 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
{
|
||||
ObjectType objectType = alterStatement->relkind;
|
||||
RangeVar *relationRangeVar = alterStatement->relation;
|
||||
Oid relationId = InvalidOid;
|
||||
List *commandList = alterStatement->cmds;
|
||||
ListCell *commandCell = NULL;
|
||||
|
||||
|
@ -684,7 +669,7 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
return;
|
||||
}
|
||||
|
||||
relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true);
|
||||
Oid relationId = RangeVarGetRelid(relationRangeVar, AccessShareLock, true);
|
||||
if (!IsCStoreFdwTable(relationId))
|
||||
{
|
||||
return;
|
||||
|
@ -700,7 +685,6 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
Oid targetTypeId = typenameTypeId(NULL, columnDef->typeName);
|
||||
char *typeName = TypeNameToString(columnDef->typeName);
|
||||
AttrNumber attributeNumber = get_attnum(relationId, columnName);
|
||||
Oid currentTypeId = InvalidOid;
|
||||
|
||||
if (attributeNumber <= 0)
|
||||
{
|
||||
|
@ -708,7 +692,7 @@ CStoreProcessAlterTableCommand(AlterTableStmt *alterStatement)
|
|||
continue;
|
||||
}
|
||||
|
||||
currentTypeId = get_atttype(relationId, attributeNumber);
|
||||
Oid currentTypeId = get_atttype(relationId, attributeNumber);
|
||||
|
||||
/*
|
||||
* We are only interested in implicit coersion type compatibility.
|
||||
|
@ -811,34 +795,28 @@ TruncateCStoreTables(List *cstoreRelationList)
|
|||
static void
|
||||
FdwNewRelFileNode(Relation relation)
|
||||
{
|
||||
Relation pg_class;
|
||||
HeapTuple tuple;
|
||||
Form_pg_class classform;
|
||||
Relation pg_class = heap_open(RelationRelationId, RowExclusiveLock);
|
||||
|
||||
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
|
||||
|
||||
tuple = SearchSysCacheCopy1(RELOID,
|
||||
ObjectIdGetDatum(RelationGetRelid(relation)));
|
||||
HeapTuple tuple = SearchSysCacheCopy1(RELOID,
|
||||
ObjectIdGetDatum(RelationGetRelid(relation)));
|
||||
if (!HeapTupleIsValid(tuple))
|
||||
{
|
||||
elog(ERROR, "could not find tuple for relation %u",
|
||||
RelationGetRelid(relation));
|
||||
}
|
||||
classform = (Form_pg_class) GETSTRUCT(tuple);
|
||||
Form_pg_class classform = (Form_pg_class) GETSTRUCT(tuple);
|
||||
|
||||
if (true)
|
||||
{
|
||||
char persistence = relation->rd_rel->relpersistence;
|
||||
Relation tmprel;
|
||||
Oid tablespace;
|
||||
Oid filenode;
|
||||
|
||||
/*
|
||||
* Upgrade to AccessExclusiveLock, and hold until the end of the
|
||||
* transaction. This shouldn't happen during a read, but it's hard to
|
||||
* prove that because it happens lazily.
|
||||
*/
|
||||
tmprel = heap_open(relation->rd_id, AccessExclusiveLock);
|
||||
Relation tmprel = heap_open(relation->rd_id, AccessExclusiveLock);
|
||||
heap_close(tmprel, NoLock);
|
||||
|
||||
if (OidIsValid(relation->rd_rel->relfilenode))
|
||||
|
@ -856,7 +834,7 @@ FdwNewRelFileNode(Relation relation)
|
|||
tablespace = MyDatabaseTableSpace;
|
||||
}
|
||||
|
||||
filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||
Oid filenode = GetNewRelFileNode(tablespace, NULL, persistence);
|
||||
|
||||
classform->relfilenode = filenode;
|
||||
classform->relpages = 0; /* it's empty until further notice */
|
||||
|
@ -886,9 +864,8 @@ FdwCreateStorage(Relation relation)
|
|||
if (!smgrexists(relation->rd_smgr, MAIN_FORKNUM))
|
||||
{
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
SMgrRelation srel;
|
||||
srel = RelationCreateStorage(relation->rd_node,
|
||||
relation->rd_rel->relpersistence);
|
||||
SMgrRelation srel = RelationCreateStorage(relation->rd_node,
|
||||
relation->rd_rel->relpersistence);
|
||||
smgrclose(srel);
|
||||
#else
|
||||
RelationCreateStorage(relation->rd_node,
|
||||
|
@ -906,14 +883,13 @@ bool
|
|||
IsCStoreFdwTable(Oid relationId)
|
||||
{
|
||||
bool cstoreTable = false;
|
||||
char relationKind = 0;
|
||||
|
||||
if (relationId == InvalidOid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
relationKind = get_rel_relkind(relationId);
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||
|
@ -956,13 +932,8 @@ IsCStoreServer(ForeignServer *server)
|
|||
static bool
|
||||
DistributedTable(Oid relationId)
|
||||
{
|
||||
bool distributedTable = false;
|
||||
Oid partitionOid = InvalidOid;
|
||||
Relation heapRelation = NULL;
|
||||
TableScanDesc scanDesc = NULL;
|
||||
const int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
bool missingOK = true;
|
||||
Oid extensionOid = get_extension_oid(CITUS_EXTENSION_NAME, missingOK);
|
||||
|
@ -972,23 +943,25 @@ DistributedTable(Oid relationId)
|
|||
return false;
|
||||
}
|
||||
|
||||
partitionOid = get_relname_relid(CITUS_PARTITION_TABLE_NAME, PG_CATALOG_NAMESPACE);
|
||||
Oid partitionOid = get_relname_relid(CITUS_PARTITION_TABLE_NAME,
|
||||
PG_CATALOG_NAMESPACE);
|
||||
if (partitionOid == InvalidOid)
|
||||
{
|
||||
/* the pg_dist_partition table does not exist */
|
||||
return false;
|
||||
}
|
||||
|
||||
heapRelation = heap_open(partitionOid, AccessShareLock);
|
||||
Relation heapRelation = heap_open(partitionOid, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], ATTR_NUM_PARTITION_RELATION_ID, InvalidStrategy,
|
||||
F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||
|
||||
scanDesc = table_beginscan(heapRelation, SnapshotSelf, scanKeyCount, scanKey);
|
||||
TableScanDesc scanDesc = table_beginscan(heapRelation, SnapshotSelf, scanKeyCount,
|
||||
scanKey);
|
||||
|
||||
heapTuple = heap_getnext(scanDesc, ForwardScanDirection);
|
||||
HeapTuple heapTuple = heap_getnext(scanDesc, ForwardScanDirection);
|
||||
|
||||
distributedTable = HeapTupleIsValid(heapTuple);
|
||||
bool distributedTable = HeapTupleIsValid(heapTuple);
|
||||
|
||||
table_endscan(scanDesc);
|
||||
relation_close(heapRelation, AccessShareLock);
|
||||
|
@ -1027,17 +1000,15 @@ cstore_table_size(PG_FUNCTION_ARGS)
|
|||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool cstoreTable = IsCStoreFdwTable(relationId);
|
||||
Relation relation;
|
||||
BlockNumber nblocks;
|
||||
|
||||
if (!cstoreTable)
|
||||
{
|
||||
ereport(ERROR, (errmsg("relation is not a cstore table")));
|
||||
}
|
||||
|
||||
relation = cstore_fdw_open(relationId, AccessShareLock);
|
||||
Relation relation = cstore_fdw_open(relationId, AccessShareLock);
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
BlockNumber nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
heap_close(relation, AccessShareLock);
|
||||
PG_RETURN_INT64(nblocks * BLCKSZ);
|
||||
}
|
||||
|
@ -1205,20 +1176,16 @@ GetSlotHeapTuple(TupleTableSlot *tts)
|
|||
static CStoreOptions *
|
||||
CStoreGetOptions(Oid foreignTableId)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
CompressionType compressionType = cstore_compression;
|
||||
int32 stripeRowCount = cstore_stripe_row_count;
|
||||
int32 blockRowCount = cstore_block_row_count;
|
||||
char *compressionTypeString = NULL;
|
||||
char *stripeRowCountString = NULL;
|
||||
char *blockRowCountString = NULL;
|
||||
|
||||
compressionTypeString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_COMPRESSION_TYPE);
|
||||
stripeRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_STRIPE_ROW_COUNT);
|
||||
blockRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_BLOCK_ROW_COUNT);
|
||||
char *compressionTypeString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_COMPRESSION_TYPE);
|
||||
char *stripeRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_STRIPE_ROW_COUNT);
|
||||
char *blockRowCountString = CStoreGetOptionValue(foreignTableId,
|
||||
OPTION_NAME_BLOCK_ROW_COUNT);
|
||||
|
||||
ValidateForeignTableOptions(compressionTypeString,
|
||||
stripeRowCountString, blockRowCountString);
|
||||
|
@ -1237,7 +1204,7 @@ CStoreGetOptions(Oid foreignTableId)
|
|||
blockRowCount = pg_atoi(blockRowCountString, sizeof(int32), 0);
|
||||
}
|
||||
|
||||
cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
cstoreOptions->compressionType = compressionType;
|
||||
cstoreOptions->stripeRowCount = stripeRowCount;
|
||||
cstoreOptions->blockRowCount = blockRowCount;
|
||||
|
@ -1254,14 +1221,12 @@ CStoreGetOptions(Oid foreignTableId)
|
|||
static char *
|
||||
CStoreGetOptionValue(Oid foreignTableId, const char *optionName)
|
||||
{
|
||||
ForeignTable *foreignTable = NULL;
|
||||
ForeignServer *foreignServer = NULL;
|
||||
List *optionList = NIL;
|
||||
ListCell *optionCell = NULL;
|
||||
char *optionValue = NULL;
|
||||
|
||||
foreignTable = GetForeignTable(foreignTableId);
|
||||
foreignServer = GetForeignServer(foreignTable->serverid);
|
||||
ForeignTable *foreignTable = GetForeignTable(foreignTableId);
|
||||
ForeignServer *foreignServer = GetForeignServer(foreignTable->serverid);
|
||||
|
||||
optionList = list_concat(optionList, foreignTable->options);
|
||||
optionList = list_concat(optionList, foreignServer->options);
|
||||
|
@ -1451,8 +1416,6 @@ CStoreGetForeignPlan(PlannerInfo * root, RelOptInfo * baserel, Oid foreignTableI
|
|||
#endif
|
||||
{
|
||||
ForeignScan *foreignScan = NULL;
|
||||
List *columnList = NIL;
|
||||
List *foreignPrivateList = NIL;
|
||||
|
||||
/*
|
||||
* Although we skip row blocks that are refuted by the WHERE clause, but
|
||||
|
@ -1469,8 +1432,8 @@ CStoreGetForeignPlan(PlannerInfo * root, RelOptInfo * baserel, Oid foreignTableI
|
|||
* in executor's callback functions, so we get the column list here and put
|
||||
* it into foreign scan node's private list.
|
||||
*/
|
||||
columnList = ColumnList(baserel, foreignTableId);
|
||||
foreignPrivateList = list_make1(columnList);
|
||||
List *columnList = ColumnList(baserel, foreignTableId);
|
||||
List *foreignPrivateList = list_make1(columnList);
|
||||
|
||||
/* create the foreign scan node */
|
||||
#if PG_VERSION_NUM >= 90500
|
||||
|
@ -1525,10 +1488,8 @@ TupleCountEstimate(Relation relation, RelOptInfo *baserel)
|
|||
static BlockNumber
|
||||
PageCount(Relation relation)
|
||||
{
|
||||
BlockNumber nblocks;
|
||||
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
BlockNumber nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
return (nblocks > 0) ? nblocks : 1;
|
||||
}
|
||||
|
@ -1655,9 +1616,8 @@ CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState
|
|||
/* supress file size if we're not showing cost details */
|
||||
if (explainState->costs)
|
||||
{
|
||||
long nblocks;
|
||||
RelationOpenSmgr(relation);
|
||||
nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
long nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
|
||||
ExplainPropertyLong("CStore File Size", (long) (nblocks * BLCKSZ),
|
||||
explainState);
|
||||
}
|
||||
|
@ -1668,15 +1628,8 @@ CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState
|
|||
static void
|
||||
CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
||||
{
|
||||
TableReadState *readState = NULL;
|
||||
Oid foreignTableId = InvalidOid;
|
||||
Relation currentRelation = scanState->ss.ss_currentRelation;
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(currentRelation);
|
||||
List *columnList = NIL;
|
||||
ForeignScan *foreignScan = NULL;
|
||||
List *foreignPrivateList = NIL;
|
||||
List *whereClauseList = NIL;
|
||||
Relation relation = NULL;
|
||||
|
||||
cstore_fdw_initrel(currentRelation);
|
||||
|
||||
|
@ -1686,15 +1639,16 @@ CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags)
|
|||
return;
|
||||
}
|
||||
|
||||
foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
|
||||
Oid foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation);
|
||||
|
||||
foreignScan = (ForeignScan *) scanState->ss.ps.plan;
|
||||
foreignPrivateList = (List *) foreignScan->fdw_private;
|
||||
whereClauseList = foreignScan->scan.plan.qual;
|
||||
ForeignScan *foreignScan = (ForeignScan *) scanState->ss.ps.plan;
|
||||
List *foreignPrivateList = (List *) foreignScan->fdw_private;
|
||||
List *whereClauseList = foreignScan->scan.plan.qual;
|
||||
|
||||
columnList = (List *) linitial(foreignPrivateList);
|
||||
relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
readState = CStoreBeginRead(relation, tupleDescriptor, columnList, whereClauseList);
|
||||
List *columnList = (List *) linitial(foreignPrivateList);
|
||||
Relation relation = cstore_fdw_open(foreignTableId, AccessShareLock);
|
||||
TableReadState *readState = CStoreBeginRead(relation, tupleDescriptor, columnList,
|
||||
whereClauseList);
|
||||
|
||||
scanState->fdw_state = (void *) readState;
|
||||
}
|
||||
|
@ -1710,7 +1664,6 @@ CStoreIterateForeignScan(ForeignScanState *scanState)
|
|||
{
|
||||
TableReadState *readState = (TableReadState *) scanState->fdw_state;
|
||||
TupleTableSlot *tupleSlot = scanState->ss.ss_ScanTupleSlot;
|
||||
bool nextRowFound = false;
|
||||
|
||||
TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor;
|
||||
Datum *columnValues = tupleSlot->tts_values;
|
||||
|
@ -1723,7 +1676,7 @@ CStoreIterateForeignScan(ForeignScanState *scanState)
|
|||
|
||||
ExecClearTuple(tupleSlot);
|
||||
|
||||
nextRowFound = CStoreReadNextRow(readState, columnValues, columnNulls);
|
||||
bool nextRowFound = CStoreReadNextRow(readState, columnValues, columnNulls);
|
||||
if (nextRowFound)
|
||||
{
|
||||
ExecStoreVirtualTuple(tupleSlot);
|
||||
|
@ -1797,13 +1750,9 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
|
|||
double selectionState = 0;
|
||||
MemoryContext oldContext = CurrentMemoryContext;
|
||||
MemoryContext tupleContext = NULL;
|
||||
Datum *columnValues = NULL;
|
||||
bool *columnNulls = NULL;
|
||||
TupleTableSlot *scanTupleSlot = NULL;
|
||||
List *columnList = NIL;
|
||||
List *foreignPrivateList = NULL;
|
||||
ForeignScanState *scanState = NULL;
|
||||
ForeignScan *foreignScan = NULL;
|
||||
char *relationName = NULL;
|
||||
int executorFlags = 0;
|
||||
uint32 columnIndex = 0;
|
||||
|
@ -1829,13 +1778,13 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
|
|||
}
|
||||
|
||||
/* setup foreign scan plan node */
|
||||
foreignPrivateList = list_make1(columnList);
|
||||
foreignScan = makeNode(ForeignScan);
|
||||
List *foreignPrivateList = list_make1(columnList);
|
||||
ForeignScan *foreignScan = makeNode(ForeignScan);
|
||||
foreignScan->fdw_private = foreignPrivateList;
|
||||
|
||||
/* set up tuple slot */
|
||||
columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
Datum *columnValues = palloc0(columnCount * sizeof(Datum));
|
||||
bool *columnNulls = palloc0(columnCount * sizeof(bool));
|
||||
#if PG_VERSION_NUM >= 120000
|
||||
scanTupleSlot = MakeTupleTableSlot(NULL, &TTSOpsVirtual);
|
||||
#elif PG_VERSION_NUM >= 110000
|
||||
|
@ -1968,13 +1917,12 @@ CStorePlanForeignModify(PlannerInfo *plannerInfo, ModifyTable *plan,
|
|||
if (plan->operation == CMD_INSERT)
|
||||
{
|
||||
ListCell *tableCell = NULL;
|
||||
Query *query = NULL;
|
||||
|
||||
/*
|
||||
* Only insert operation with select subquery is supported. Other forms
|
||||
* of insert, update, and delete operations are not supported.
|
||||
*/
|
||||
query = plannerInfo->parse;
|
||||
Query *query = plannerInfo->parse;
|
||||
foreach(tableCell, query->rtable)
|
||||
{
|
||||
RangeTblEntry *tableEntry = lfirst(tableCell);
|
||||
|
@ -2027,22 +1975,16 @@ CStoreBeginForeignModify(ModifyTableState *modifyTableState,
|
|||
static void
|
||||
CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *relationInfo)
|
||||
{
|
||||
Oid foreignTableOid = InvalidOid;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
TableWriteState *writeState = NULL;
|
||||
Relation relation = NULL;
|
||||
Oid foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
|
||||
Relation relation = cstore_fdw_open(foreignTableOid, RowExclusiveLock);
|
||||
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
|
||||
relation = cstore_fdw_open(foreignTableOid, RowExclusiveLock);
|
||||
cstoreOptions = CStoreGetOptions(foreignTableOid);
|
||||
tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
|
||||
|
||||
writeState = CStoreBeginWrite(relation,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupleDescriptor);
|
||||
TableWriteState *writeState = CStoreBeginWrite(relation,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
tupleDescriptor);
|
||||
|
||||
relationInfo->ri_FdwState = (void *) writeState;
|
||||
}
|
||||
|
@ -2057,11 +1999,10 @@ CStoreExecForeignInsert(EState *executorState, ResultRelInfo *relationInfo,
|
|||
TupleTableSlot *tupleSlot, TupleTableSlot *planSlot)
|
||||
{
|
||||
TableWriteState *writeState = (TableWriteState *) relationInfo->ri_FdwState;
|
||||
HeapTuple heapTuple;
|
||||
|
||||
Assert(writeState != NULL);
|
||||
|
||||
heapTuple = GetSlotHeapTuple(tupleSlot);
|
||||
HeapTuple heapTuple = GetSlotHeapTuple(tupleSlot);
|
||||
|
||||
if (HeapTupleHasExternal(heapTuple))
|
||||
{
|
||||
|
|
|
@ -130,9 +130,6 @@ void
|
|||
InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCount,
|
||||
CompressionType compression)
|
||||
{
|
||||
Oid cstoreDataFilesOid = InvalidOid;
|
||||
Relation cstoreDataFiles = NULL;
|
||||
ModifyState *modifyState = NULL;
|
||||
NameData compressionName = { 0 };
|
||||
|
||||
namestrcpy(&compressionName, CompressionTypeStr(compression));
|
||||
|
@ -149,10 +146,10 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCoun
|
|||
|
||||
DeleteDataFileMetadataRowIfExists(relfilenode);
|
||||
|
||||
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
cstoreDataFiles = heap_open(cstoreDataFilesOid, RowExclusiveLock);
|
||||
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
Relation cstoreDataFiles = heap_open(cstoreDataFilesOid, RowExclusiveLock);
|
||||
|
||||
modifyState = StartModifyRelation(cstoreDataFiles);
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreDataFiles);
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
FinishModifyRelation(modifyState);
|
||||
|
||||
|
@ -169,9 +166,6 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
|
|||
const int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
bool indexOK = true;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
Form_cstore_data_files metadata = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
Datum values[Natts_cstore_data_files] = { 0 };
|
||||
bool isnull[Natts_cstore_data_files] = { 0 };
|
||||
bool replace[Natts_cstore_data_files] = { 0 };
|
||||
|
@ -182,19 +176,19 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
|
|||
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode, BTEqualStrategyNumber,
|
||||
F_INT8EQ, ObjectIdGetDatum(relfilenode));
|
||||
|
||||
scanDescriptor = systable_beginscan(cstoreDataFiles,
|
||||
CStoreDataFilesIndexRelationId(),
|
||||
indexOK,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan(cstoreDataFiles,
|
||||
CStoreDataFilesIndexRelationId(),
|
||||
indexOK,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (heapTuple == NULL)
|
||||
{
|
||||
ereport(ERROR, (errmsg("relfilenode %d doesn't belong to a cstore table",
|
||||
relfilenode)));
|
||||
}
|
||||
|
||||
metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
|
||||
Form_cstore_data_files metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
|
||||
|
||||
bool changed = false;
|
||||
if (metadata->block_row_count != blockRowCount)
|
||||
|
@ -250,14 +244,11 @@ SaveStripeSkipList(Oid relfilenode, uint64 stripe, StripeSkipList *stripeSkipLis
|
|||
{
|
||||
uint32 columnIndex = 0;
|
||||
uint32 blockIndex = 0;
|
||||
Oid cstoreSkipNodesOid = InvalidOid;
|
||||
Relation cstoreSkipNodes = NULL;
|
||||
ModifyState *modifyState = NULL;
|
||||
uint32 columnCount = stripeSkipList->columnCount;
|
||||
|
||||
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, RowExclusiveLock);
|
||||
modifyState = StartModifyRelation(cstoreSkipNodes);
|
||||
Oid cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||
Relation cstoreSkipNodes = heap_open(cstoreSkipNodesOid, RowExclusiveLock);
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreSkipNodes);
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
|
@ -316,28 +307,24 @@ StripeSkipList *
|
|||
ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
||||
uint32 blockCount)
|
||||
{
|
||||
StripeSkipList *skipList = NULL;
|
||||
int32 columnIndex = 0;
|
||||
Oid cstoreSkipNodesOid = InvalidOid;
|
||||
Relation cstoreSkipNodes = NULL;
|
||||
Relation index = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
ScanKeyData scanKey[2];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
|
||||
cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||
cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
||||
index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
||||
Oid cstoreSkipNodesOid = CStoreSkipNodesRelationId();
|
||||
Relation cstoreSkipNodes = heap_open(cstoreSkipNodesOid, AccessShareLock);
|
||||
Relation index = index_open(CStoreSkipNodesIndexRelationId(), AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_skipnodes_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
ScanKeyInit(&scanKey[1], Anum_cstore_skipnodes_stripe,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreSkipNodes, index, NULL, 2, scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(cstoreSkipNodes, index, NULL,
|
||||
2, scanKey);
|
||||
|
||||
skipList = palloc0(sizeof(StripeSkipList));
|
||||
StripeSkipList *skipList = palloc0(sizeof(StripeSkipList));
|
||||
skipList->blockCount = blockCount;
|
||||
skipList->columnCount = columnCount;
|
||||
skipList->blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
|
||||
|
@ -349,18 +336,14 @@ ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
|||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
int32 attr = 0;
|
||||
int32 blockIndex = 0;
|
||||
ColumnBlockSkipNode *skipNode = NULL;
|
||||
|
||||
Datum datumArray[Natts_cstore_skipnodes];
|
||||
bool isNullArray[Natts_cstore_skipnodes];
|
||||
|
||||
heap_deform_tuple(heapTuple, RelationGetDescr(cstoreSkipNodes), datumArray,
|
||||
isNullArray);
|
||||
|
||||
attr = DatumGetInt32(datumArray[Anum_cstore_skipnodes_attr - 1]);
|
||||
blockIndex = DatumGetInt32(datumArray[Anum_cstore_skipnodes_block - 1]);
|
||||
int32 attr = DatumGetInt32(datumArray[Anum_cstore_skipnodes_attr - 1]);
|
||||
int32 blockIndex = DatumGetInt32(datumArray[Anum_cstore_skipnodes_block - 1]);
|
||||
|
||||
if (attr <= 0 || attr > columnCount)
|
||||
{
|
||||
|
@ -376,7 +359,8 @@ ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor,
|
|||
|
||||
columnIndex = attr - 1;
|
||||
|
||||
skipNode = &skipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||
ColumnBlockSkipNode *skipNode =
|
||||
&skipList->blockSkipNodeArray[columnIndex][blockIndex];
|
||||
skipNode->rowCount = DatumGetInt64(datumArray[Anum_cstore_skipnodes_row_count -
|
||||
1]);
|
||||
skipNode->valueBlockOffset =
|
||||
|
@ -507,12 +491,11 @@ GetHighestUsedAddressAndId(Oid relfilenode,
|
|||
uint64 *highestUsedId)
|
||||
{
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
List *stripeMetadataList = NIL;
|
||||
|
||||
SnapshotData SnapshotDirty;
|
||||
InitDirtySnapshot(SnapshotDirty);
|
||||
|
||||
stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty);
|
||||
List *stripeMetadataList = ReadDataFileStripeList(relfilenode, &SnapshotDirty);
|
||||
|
||||
*highestUsedId = 0;
|
||||
*highestUsedAddress = 0;
|
||||
|
@ -538,14 +521,7 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
|||
uint64 blockCount, uint64 blockRowCount)
|
||||
{
|
||||
StripeMetadata stripe = { 0 };
|
||||
Oid relfilenode = InvalidOid;
|
||||
uint64 currLogicalHigh = 0;
|
||||
SmgrAddr currSmgrHigh;
|
||||
uint64 nblocks = 0;
|
||||
uint64 resLogicalStart = 0;
|
||||
SmgrAddr resSmgrStart;
|
||||
uint64 resLogicalEnd = 0;
|
||||
SmgrAddr resSmgrEnd;
|
||||
uint64 highestId = 0;
|
||||
|
||||
/*
|
||||
|
@ -556,18 +532,18 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
|||
*/
|
||||
LockRelation(rel, ShareUpdateExclusiveLock);
|
||||
|
||||
relfilenode = rel->rd_node.relNode;
|
||||
Oid relfilenode = rel->rd_node.relNode;
|
||||
GetHighestUsedAddressAndId(relfilenode, &currLogicalHigh, &highestId);
|
||||
currSmgrHigh = logical_to_smgr(currLogicalHigh);
|
||||
SmgrAddr currSmgrHigh = logical_to_smgr(currLogicalHigh);
|
||||
|
||||
resSmgrStart = next_block_start(currSmgrHigh);
|
||||
resLogicalStart = smgr_to_logical(resSmgrStart);
|
||||
SmgrAddr resSmgrStart = next_block_start(currSmgrHigh);
|
||||
uint64 resLogicalStart = smgr_to_logical(resSmgrStart);
|
||||
|
||||
resLogicalEnd = resLogicalStart + sizeBytes - 1;
|
||||
resSmgrEnd = logical_to_smgr(resLogicalEnd);
|
||||
uint64 resLogicalEnd = resLogicalStart + sizeBytes - 1;
|
||||
SmgrAddr resSmgrEnd = logical_to_smgr(resLogicalEnd);
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
uint64 nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
|
||||
while (resSmgrEnd.blockno >= nblocks)
|
||||
{
|
||||
|
@ -602,34 +578,29 @@ static List *
|
|||
ReadDataFileStripeList(Oid relfilenode, Snapshot snapshot)
|
||||
{
|
||||
List *stripeMetadataList = NIL;
|
||||
Oid cstoreStripesOid = InvalidOid;
|
||||
Relation cstoreStripes = NULL;
|
||||
Relation index = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_stripes_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreStripesOid = CStoreStripesRelationId();
|
||||
cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
||||
index = index_open(CStoreStripesIndexRelationId(), AccessShareLock);
|
||||
tupleDescriptor = RelationGetDescr(cstoreStripes);
|
||||
Oid cstoreStripesOid = CStoreStripesRelationId();
|
||||
Relation cstoreStripes = heap_open(cstoreStripesOid, AccessShareLock);
|
||||
Relation index = index_open(CStoreStripesIndexRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreStripes);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreStripes, index, snapshot, 1,
|
||||
scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(cstoreStripes, index,
|
||||
snapshot, 1,
|
||||
scanKey);
|
||||
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
StripeMetadata *stripeMetadata = NULL;
|
||||
Datum datumArray[Natts_cstore_stripes];
|
||||
bool isNullArray[Natts_cstore_stripes];
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
stripeMetadata = palloc0(sizeof(StripeMetadata));
|
||||
StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata));
|
||||
stripeMetadata->id = DatumGetInt64(datumArray[Anum_cstore_stripes_stripe - 1]);
|
||||
stripeMetadata->fileOffset = DatumGetInt64(
|
||||
datumArray[Anum_cstore_stripes_file_offset - 1]);
|
||||
|
@ -663,19 +634,13 @@ static bool
|
|||
ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
||||
{
|
||||
bool found = false;
|
||||
Oid cstoreDataFilesOid = InvalidOid;
|
||||
Relation cstoreDataFiles = NULL;
|
||||
Relation index = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
Relation cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||
if (cstoreDataFiles == NULL)
|
||||
{
|
||||
/*
|
||||
|
@ -685,7 +650,7 @@ ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
|||
return false;
|
||||
}
|
||||
|
||||
index = try_relation_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||
Relation index = try_relation_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||
if (index == NULL)
|
||||
{
|
||||
heap_close(cstoreDataFiles, NoLock);
|
||||
|
@ -694,11 +659,12 @@ ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
|||
return false;
|
||||
}
|
||||
|
||||
tupleDescriptor = RelationGetDescr(cstoreDataFiles);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL,
|
||||
1, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Datum datumArray[Natts_cstore_data_files];
|
||||
|
@ -707,13 +673,11 @@ ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
|||
|
||||
if (metadata)
|
||||
{
|
||||
Name compressionName = NULL;
|
||||
|
||||
metadata->blockRowCount = DatumGetInt32(
|
||||
datumArray[Anum_cstore_data_files_block_row_count - 1]);
|
||||
metadata->stripeRowCount = DatumGetInt32(
|
||||
datumArray[Anum_cstore_data_files_stripe_row_count - 1]);
|
||||
compressionName = DatumGetName(
|
||||
Name compressionName = DatumGetName(
|
||||
datumArray[Anum_cstore_data_files_compression - 1]);
|
||||
metadata->compression = ParseCompressionType(NameStr(*compressionName));
|
||||
}
|
||||
|
@ -734,12 +698,7 @@ ReadCStoreDataFiles(Oid relfilenode, DataFileMetadata *metadata)
|
|||
void
|
||||
DeleteDataFileMetadataRowIfExists(Oid relfilenode)
|
||||
{
|
||||
Oid cstoreDataFilesOid = InvalidOid;
|
||||
Relation cstoreDataFiles = NULL;
|
||||
Relation index = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
/*
|
||||
* During a restore for binary upgrade, metadata tables and indexes may or
|
||||
|
@ -753,19 +712,20 @@ DeleteDataFileMetadataRowIfExists(Oid relfilenode)
|
|||
ScanKeyInit(&scanKey[0], Anum_cstore_data_files_relfilenode,
|
||||
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(relfilenode));
|
||||
|
||||
cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
|
||||
Relation cstoreDataFiles = try_relation_open(cstoreDataFilesOid, AccessShareLock);
|
||||
if (cstoreDataFiles == NULL)
|
||||
{
|
||||
/* extension has been dropped */
|
||||
return;
|
||||
}
|
||||
|
||||
index = index_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||
Relation index = index_open(CStoreDataFilesIndexRelationId(), AccessShareLock);
|
||||
|
||||
scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL, 1, scanKey);
|
||||
SysScanDesc scanDescriptor = systable_beginscan_ordered(cstoreDataFiles, index, NULL,
|
||||
1, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ModifyState *modifyState = StartModifyRelation(cstoreDataFiles);
|
||||
|
@ -785,13 +745,12 @@ DeleteDataFileMetadataRowIfExists(Oid relfilenode)
|
|||
static ModifyState *
|
||||
StartModifyRelation(Relation rel)
|
||||
{
|
||||
ModifyState *modifyState = NULL;
|
||||
EState *estate = create_estate_for_relation(rel);
|
||||
|
||||
/* ExecSimpleRelationInsert, ... require caller to open indexes */
|
||||
ExecOpenIndices(estate->es_result_relation_info, false);
|
||||
|
||||
modifyState = palloc(sizeof(ModifyState));
|
||||
ModifyState *modifyState = palloc(sizeof(ModifyState));
|
||||
modifyState->rel = rel;
|
||||
modifyState->estate = estate;
|
||||
|
||||
|
@ -869,13 +828,11 @@ FinishModifyRelation(ModifyState *state)
|
|||
static EState *
|
||||
create_estate_for_relation(Relation rel)
|
||||
{
|
||||
EState *estate;
|
||||
ResultRelInfo *resultRelInfo;
|
||||
RangeTblEntry *rte;
|
||||
|
||||
estate = CreateExecutorState();
|
||||
EState *estate = CreateExecutorState();
|
||||
|
||||
rte = makeNode(RangeTblEntry);
|
||||
RangeTblEntry *rte = makeNode(RangeTblEntry);
|
||||
rte->rtekind = RTE_RELATION;
|
||||
rte->relid = RelationGetRelid(rel);
|
||||
rte->relkind = rel->rd_rel->relkind;
|
||||
|
|
|
@ -84,23 +84,20 @@ TableReadState *
|
|||
CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
||||
List *projectedColumnList, List *whereClauseList)
|
||||
{
|
||||
TableReadState *readState = NULL;
|
||||
DataFileMetadata *datafileMetadata = NULL;
|
||||
MemoryContext stripeReadContext = NULL;
|
||||
Oid relNode = relation->rd_node.relNode;
|
||||
|
||||
datafileMetadata = ReadDataFileMetadata(relNode, false);
|
||||
DataFileMetadata *datafileMetadata = ReadDataFileMetadata(relNode, false);
|
||||
|
||||
/*
|
||||
* We allocate all stripe specific data in the stripeReadContext, and reset
|
||||
* this memory context before loading a new stripe. This is to avoid memory
|
||||
* leaks.
|
||||
*/
|
||||
stripeReadContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Stripe Read Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext stripeReadContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Stripe Read Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
readState = palloc0(sizeof(TableReadState));
|
||||
TableReadState *readState = palloc0(sizeof(TableReadState));
|
||||
readState->relation = relation;
|
||||
readState->datafileMetadata = datafileMetadata;
|
||||
readState->projectedColumnList = projectedColumnList;
|
||||
|
@ -125,8 +122,6 @@ CStoreBeginRead(Relation relation, TupleDesc tupleDescriptor,
|
|||
bool
|
||||
CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNulls)
|
||||
{
|
||||
uint32 blockIndex = 0;
|
||||
uint32 blockRowIndex = 0;
|
||||
StripeMetadata *stripeMetadata = readState->currentStripeMetadata;
|
||||
MemoryContext oldContext = NULL;
|
||||
|
||||
|
@ -138,7 +133,6 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
|||
*/
|
||||
while (readState->stripeBuffers == NULL)
|
||||
{
|
||||
StripeBuffers *stripeBuffers = NULL;
|
||||
List *stripeMetadataList = readState->datafileMetadata->stripeMetadataList;
|
||||
uint32 stripeCount = list_length(stripeMetadataList);
|
||||
|
||||
|
@ -153,11 +147,14 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
|||
readState->blockData = NULL;
|
||||
|
||||
stripeMetadata = list_nth(stripeMetadataList, readState->readStripeCount);
|
||||
stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
|
||||
stripeMetadata,
|
||||
readState->tupleDescriptor,
|
||||
readState->projectedColumnList,
|
||||
readState->whereClauseList);
|
||||
StripeBuffers *stripeBuffers = LoadFilteredStripeBuffers(readState->relation,
|
||||
stripeMetadata,
|
||||
readState->
|
||||
tupleDescriptor,
|
||||
readState->
|
||||
projectedColumnList,
|
||||
readState->
|
||||
whereClauseList);
|
||||
readState->readStripeCount++;
|
||||
readState->currentStripeMetadata = stripeMetadata;
|
||||
|
||||
|
@ -172,17 +169,15 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu
|
|||
}
|
||||
}
|
||||
|
||||
blockIndex = readState->stripeReadRowCount / stripeMetadata->blockRowCount;
|
||||
blockRowIndex = readState->stripeReadRowCount % stripeMetadata->blockRowCount;
|
||||
uint32 blockIndex = readState->stripeReadRowCount / stripeMetadata->blockRowCount;
|
||||
uint32 blockRowIndex = readState->stripeReadRowCount % stripeMetadata->blockRowCount;
|
||||
|
||||
if (blockIndex != readState->deserializedBlockIndex)
|
||||
{
|
||||
uint32 lastBlockIndex = 0;
|
||||
uint32 blockRowCount = 0;
|
||||
uint32 stripeRowCount = 0;
|
||||
|
||||
stripeRowCount = stripeMetadata->rowCount;
|
||||
lastBlockIndex = stripeRowCount / stripeMetadata->blockRowCount;
|
||||
uint32 stripeRowCount = stripeMetadata->rowCount;
|
||||
uint32 lastBlockIndex = stripeRowCount / stripeMetadata->blockRowCount;
|
||||
if (blockIndex == lastBlockIndex)
|
||||
{
|
||||
blockRowCount = stripeRowCount % stripeMetadata->blockRowCount;
|
||||
|
@ -317,11 +312,11 @@ FreeBlockData(BlockData *blockData)
|
|||
uint64
|
||||
CStoreTableRowCount(Relation relation)
|
||||
{
|
||||
DataFileMetadata *datafileMetadata = NULL;
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
uint64 totalRowCount = 0;
|
||||
|
||||
datafileMetadata = ReadDataFileMetadata(relation->rd_node.relNode, false);
|
||||
DataFileMetadata *datafileMetadata = ReadDataFileMetadata(relation->rd_node.relNode,
|
||||
false);
|
||||
|
||||
foreach(stripeMetadataCell, datafileMetadata->stripeMetadataList)
|
||||
{
|
||||
|
@ -343,8 +338,6 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
TupleDesc tupleDescriptor, List *projectedColumnList,
|
||||
List *whereClauseList)
|
||||
{
|
||||
StripeBuffers *stripeBuffers = NULL;
|
||||
ColumnBuffers **columnBuffersArray = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
|
||||
|
@ -363,7 +356,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
selectedBlockMask);
|
||||
|
||||
/* load column data for projected columns */
|
||||
columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||
ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||
|
||||
for (columnIndex = 0; columnIndex < stripeMetadata->columnCount; columnIndex++)
|
||||
{
|
||||
|
@ -383,7 +376,7 @@ LoadFilteredStripeBuffers(Relation relation, StripeMetadata *stripeMetadata,
|
|||
}
|
||||
}
|
||||
|
||||
stripeBuffers = palloc0(sizeof(StripeBuffers));
|
||||
StripeBuffers *stripeBuffers = palloc0(sizeof(StripeBuffers));
|
||||
stripeBuffers->columnCount = columnCount;
|
||||
stripeBuffers->rowCount = StripeSkipListRowCount(selectedBlockSkipList);
|
||||
stripeBuffers->columnBuffersArray = columnBuffersArray;
|
||||
|
@ -432,7 +425,6 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
|||
uint32 blockCount, uint64 stripeOffset,
|
||||
Form_pg_attribute attributeForm)
|
||||
{
|
||||
ColumnBuffers *columnBuffers = NULL;
|
||||
uint32 blockIndex = 0;
|
||||
ColumnBlockBuffers **blockBuffersArray =
|
||||
palloc0(blockCount * sizeof(ColumnBlockBuffers *));
|
||||
|
@ -470,7 +462,7 @@ LoadColumnBuffers(Relation relation, ColumnBlockSkipNode *blockSkipNodeArray,
|
|||
blockBuffersArray[blockIndex]->valueCompressionType = compressionType;
|
||||
}
|
||||
|
||||
columnBuffers = palloc0(sizeof(ColumnBuffers));
|
||||
ColumnBuffers *columnBuffers = palloc0(sizeof(ColumnBuffers));
|
||||
columnBuffers->blockBuffersArray = blockBuffersArray;
|
||||
|
||||
return columnBuffers;
|
||||
|
@ -486,34 +478,31 @@ static bool *
|
|||
SelectedBlockMask(StripeSkipList *stripeSkipList, List *projectedColumnList,
|
||||
List *whereClauseList)
|
||||
{
|
||||
bool *selectedBlockMask = NULL;
|
||||
ListCell *columnCell = NULL;
|
||||
uint32 blockIndex = 0;
|
||||
List *restrictInfoList = BuildRestrictInfoList(whereClauseList);
|
||||
|
||||
selectedBlockMask = palloc0(stripeSkipList->blockCount * sizeof(bool));
|
||||
bool *selectedBlockMask = palloc0(stripeSkipList->blockCount * sizeof(bool));
|
||||
memset(selectedBlockMask, true, stripeSkipList->blockCount * sizeof(bool));
|
||||
|
||||
foreach(columnCell, projectedColumnList)
|
||||
{
|
||||
Var *column = lfirst(columnCell);
|
||||
uint32 columnIndex = column->varattno - 1;
|
||||
FmgrInfo *comparisonFunction = NULL;
|
||||
Node *baseConstraint = NULL;
|
||||
|
||||
/* if this column's data type doesn't have a comparator, skip it */
|
||||
comparisonFunction = GetFunctionInfoOrNull(column->vartype, BTREE_AM_OID,
|
||||
BTORDER_PROC);
|
||||
FmgrInfo *comparisonFunction = GetFunctionInfoOrNull(column->vartype,
|
||||
BTREE_AM_OID,
|
||||
BTORDER_PROC);
|
||||
if (comparisonFunction == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
baseConstraint = BuildBaseConstraint(column);
|
||||
Node *baseConstraint = BuildBaseConstraint(column);
|
||||
for (blockIndex = 0; blockIndex < stripeSkipList->blockCount; blockIndex++)
|
||||
{
|
||||
bool predicateRefuted = false;
|
||||
List *constraintList = NIL;
|
||||
ColumnBlockSkipNode *blockSkipNodeArray =
|
||||
stripeSkipList->blockSkipNodeArray[columnIndex];
|
||||
ColumnBlockSkipNode *blockSkipNode = &blockSkipNodeArray[blockIndex];
|
||||
|
@ -530,7 +519,7 @@ SelectedBlockMask(StripeSkipList *stripeSkipList, List *projectedColumnList,
|
|||
UpdateConstraint(baseConstraint, blockSkipNode->minimumValue,
|
||||
blockSkipNode->maximumValue);
|
||||
|
||||
constraintList = list_make1(baseConstraint);
|
||||
List *constraintList = list_make1(baseConstraint);
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
predicateRefuted = predicate_refuted_by(constraintList, restrictInfoList,
|
||||
false);
|
||||
|
@ -558,24 +547,21 @@ FmgrInfo *
|
|||
GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId, int16 procedureId)
|
||||
{
|
||||
FmgrInfo *functionInfo = NULL;
|
||||
Oid operatorClassId = InvalidOid;
|
||||
Oid operatorFamilyId = InvalidOid;
|
||||
Oid operatorId = InvalidOid;
|
||||
|
||||
/* get default operator class from pg_opclass for datum type */
|
||||
operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
|
||||
Oid operatorClassId = GetDefaultOpClass(typeId, accessMethodId);
|
||||
if (operatorClassId == InvalidOid)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
operatorFamilyId = get_opclass_family(operatorClassId);
|
||||
Oid operatorFamilyId = get_opclass_family(operatorClassId);
|
||||
if (operatorFamilyId == InvalidOid)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
operatorId = get_opfamily_proc(operatorFamilyId, typeId, typeId, procedureId);
|
||||
Oid operatorId = get_opfamily_proc(operatorFamilyId, typeId, typeId, procedureId);
|
||||
if (operatorId != InvalidOid)
|
||||
{
|
||||
functionInfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo));
|
||||
|
@ -601,10 +587,9 @@ BuildRestrictInfoList(List *whereClauseList)
|
|||
ListCell *qualCell = NULL;
|
||||
foreach(qualCell, whereClauseList)
|
||||
{
|
||||
RestrictInfo *restrictInfo = NULL;
|
||||
Node *qualNode = (Node *) lfirst(qualCell);
|
||||
|
||||
restrictInfo = make_simple_restrictinfo((Expr *) qualNode);
|
||||
RestrictInfo *restrictInfo = make_simple_restrictinfo((Expr *) qualNode);
|
||||
restrictInfoList = lappend(restrictInfoList, restrictInfo);
|
||||
}
|
||||
|
||||
|
@ -622,14 +607,10 @@ BuildRestrictInfoList(List *whereClauseList)
|
|||
static Node *
|
||||
BuildBaseConstraint(Var *variable)
|
||||
{
|
||||
Node *baseConstraint = NULL;
|
||||
OpExpr *lessThanExpr = NULL;
|
||||
OpExpr *greaterThanExpr = NULL;
|
||||
OpExpr *lessThanExpr = MakeOpExpression(variable, BTLessEqualStrategyNumber);
|
||||
OpExpr *greaterThanExpr = MakeOpExpression(variable, BTGreaterEqualStrategyNumber);
|
||||
|
||||
lessThanExpr = MakeOpExpression(variable, BTLessEqualStrategyNumber);
|
||||
greaterThanExpr = MakeOpExpression(variable, BTGreaterEqualStrategyNumber);
|
||||
|
||||
baseConstraint = make_and_qual((Node *) lessThanExpr, (Node *) greaterThanExpr);
|
||||
Node *baseConstraint = make_and_qual((Node *) lessThanExpr, (Node *) greaterThanExpr);
|
||||
|
||||
return baseConstraint;
|
||||
}
|
||||
|
@ -648,22 +629,19 @@ MakeOpExpression(Var *variable, int16 strategyNumber)
|
|||
Oid collationId = variable->varcollid;
|
||||
|
||||
Oid accessMethodId = BTREE_AM_OID;
|
||||
Oid operatorId = InvalidOid;
|
||||
Const *constantValue = NULL;
|
||||
OpExpr *expression = NULL;
|
||||
|
||||
/* Load the operator from system catalogs */
|
||||
operatorId = GetOperatorByType(typeId, accessMethodId, strategyNumber);
|
||||
Oid operatorId = GetOperatorByType(typeId, accessMethodId, strategyNumber);
|
||||
|
||||
constantValue = makeNullConst(typeId, typeModId, collationId);
|
||||
Const *constantValue = makeNullConst(typeId, typeModId, collationId);
|
||||
|
||||
/* Now make the expression with the given variable and a null constant */
|
||||
expression = (OpExpr *) make_opclause(operatorId,
|
||||
InvalidOid, /* no result type yet */
|
||||
false, /* no return set */
|
||||
(Expr *) variable,
|
||||
(Expr *) constantValue,
|
||||
InvalidOid, collationId);
|
||||
OpExpr *expression = (OpExpr *) make_opclause(operatorId,
|
||||
InvalidOid, /* no result type yet */
|
||||
false, /* no return set */
|
||||
(Expr *) variable,
|
||||
(Expr *) constantValue,
|
||||
InvalidOid, collationId);
|
||||
|
||||
/* Set implementing function id and result type */
|
||||
expression->opfuncid = get_opcode(operatorId);
|
||||
|
@ -707,14 +685,12 @@ UpdateConstraint(Node *baseConstraint, Datum minValue, Datum maxValue)
|
|||
|
||||
Node *minNode = get_rightop((Expr *) greaterThanExpr);
|
||||
Node *maxNode = get_rightop((Expr *) lessThanExpr);
|
||||
Const *minConstant = NULL;
|
||||
Const *maxConstant = NULL;
|
||||
|
||||
Assert(IsA(minNode, Const));
|
||||
Assert(IsA(maxNode, Const));
|
||||
|
||||
minConstant = (Const *) minNode;
|
||||
maxConstant = (Const *) maxNode;
|
||||
Const *minConstant = (Const *) minNode;
|
||||
Const *maxConstant = (Const *) maxNode;
|
||||
|
||||
minConstant->constvalue = minValue;
|
||||
maxConstant->constvalue = maxValue;
|
||||
|
@ -735,8 +711,6 @@ static StripeSkipList *
|
|||
SelectedBlockSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
||||
bool *selectedBlockMask)
|
||||
{
|
||||
StripeSkipList *SelectedBlockSkipList = NULL;
|
||||
ColumnBlockSkipNode **selectedBlockSkipNodeArray = NULL;
|
||||
uint32 selectedBlockCount = 0;
|
||||
uint32 blockIndex = 0;
|
||||
uint32 columnIndex = 0;
|
||||
|
@ -750,7 +724,9 @@ SelectedBlockSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
}
|
||||
}
|
||||
|
||||
selectedBlockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *));
|
||||
ColumnBlockSkipNode **selectedBlockSkipNodeArray = palloc0(columnCount *
|
||||
sizeof(ColumnBlockSkipNode
|
||||
*));
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
uint32 selectedBlockIndex = 0;
|
||||
|
@ -779,7 +755,7 @@ SelectedBlockSkipList(StripeSkipList *stripeSkipList, bool *projectedColumnMask,
|
|||
}
|
||||
}
|
||||
|
||||
SelectedBlockSkipList = palloc0(sizeof(StripeSkipList));
|
||||
StripeSkipList *SelectedBlockSkipList = palloc0(sizeof(StripeSkipList));
|
||||
SelectedBlockSkipList->blockSkipNodeArray = selectedBlockSkipNodeArray;
|
||||
SelectedBlockSkipList->blockCount = selectedBlockCount;
|
||||
SelectedBlockSkipList->columnCount = stripeSkipList->columnCount;
|
||||
|
@ -882,14 +858,12 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou
|
|||
|
||||
for (datumIndex = 0; datumIndex < datumCount; datumIndex++)
|
||||
{
|
||||
char *currentDatumDataPointer = NULL;
|
||||
|
||||
if (!existsArray[datumIndex])
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
currentDatumDataPointer = datumBuffer->data + currentDatumDataOffset;
|
||||
char *currentDatumDataPointer = datumBuffer->data + currentDatumDataOffset;
|
||||
|
||||
datumArray[datumIndex] = fetch_att(currentDatumDataPointer, datumTypeByValue,
|
||||
datumTypeLength);
|
||||
|
@ -940,11 +914,10 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex,
|
|||
{
|
||||
ColumnBlockBuffers *blockBuffers =
|
||||
columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo valueBuffer = NULL;
|
||||
|
||||
/* decompress and deserialize current block's data */
|
||||
valueBuffer = DecompressBuffer(blockBuffers->valueBuffer,
|
||||
blockBuffers->valueCompressionType);
|
||||
StringInfo valueBuffer = DecompressBuffer(blockBuffers->valueBuffer,
|
||||
blockBuffers->valueCompressionType);
|
||||
|
||||
if (blockBuffers->valueCompressionType != COMPRESSION_NONE)
|
||||
{
|
||||
|
@ -1045,17 +1018,13 @@ ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
|
|||
|
||||
while (read < size)
|
||||
{
|
||||
Buffer buffer;
|
||||
Page page;
|
||||
PageHeader phdr;
|
||||
uint32 to_read;
|
||||
SmgrAddr addr = logical_to_smgr(offset + read);
|
||||
|
||||
buffer = ReadBuffer(rel, addr.blockno);
|
||||
page = BufferGetPage(buffer);
|
||||
phdr = (PageHeader) page;
|
||||
Buffer buffer = ReadBuffer(rel, addr.blockno);
|
||||
Page page = BufferGetPage(buffer);
|
||||
PageHeader phdr = (PageHeader) page;
|
||||
|
||||
to_read = Min(size - read, phdr->pd_upper - addr.offset);
|
||||
uint32 to_read = Min(size - read, phdr->pd_upper - addr.offset);
|
||||
memcpy(resultBuffer->data + read, page + addr.offset, to_read);
|
||||
ReleaseBuffer(buffer);
|
||||
read += to_read;
|
||||
|
|
|
@ -125,13 +125,10 @@ CStoreTableAMDefaultOptions()
|
|||
static CStoreOptions *
|
||||
CStoreTableAMGetOptions(Relation rel)
|
||||
{
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
DataFileMetadata *metadata = NULL;
|
||||
|
||||
Assert(rel != NULL);
|
||||
|
||||
cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
|
||||
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
|
||||
cstoreOptions->compressionType = metadata->compression;
|
||||
cstoreOptions->stripeRowCount = metadata->stripeRowCount;
|
||||
cstoreOptions->blockRowCount = metadata->blockRowCount;
|
||||
|
@ -213,15 +210,14 @@ RelationColumnList(Relation rel)
|
|||
int32 vartypmod = tupdesc->attrs[i].atttypmod;
|
||||
Oid varcollid = tupdesc->attrs[i].attcollation;
|
||||
Index varlevelsup = 0;
|
||||
Var *var;
|
||||
|
||||
if (tupdesc->attrs[i].attisdropped)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
Var *var = makeVar(varno, varattno, vartype, vartypmod,
|
||||
varcollid, varlevelsup);
|
||||
columnList = lappend(columnList, var);
|
||||
}
|
||||
|
||||
|
@ -242,7 +238,6 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
ParallelTableScanDesc parallel_scan,
|
||||
uint32 flags)
|
||||
{
|
||||
TableScanDesc scandesc;
|
||||
int natts = relation->rd_att->natts;
|
||||
Bitmapset *attr_needed = NULL;
|
||||
|
||||
|
@ -251,8 +246,9 @@ cstore_beginscan(Relation relation, Snapshot snapshot,
|
|||
/* the cstore access method does not use the flags, they are specific to heap */
|
||||
flags = 0;
|
||||
|
||||
scandesc = cstore_beginscan_extended(relation, snapshot, nkeys, key, parallel_scan,
|
||||
flags, attr_needed, NULL);
|
||||
TableScanDesc scandesc = cstore_beginscan_extended(relation, snapshot, nkeys, key,
|
||||
parallel_scan,
|
||||
flags, attr_needed, NULL);
|
||||
|
||||
pfree(attr_needed);
|
||||
|
||||
|
@ -267,9 +263,7 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
|||
uint32 flags, Bitmapset *attr_needed, List *scanQual)
|
||||
{
|
||||
TupleDesc tupdesc = relation->rd_att;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
|
||||
List *columnList = NIL;
|
||||
List *neededColumnList = NIL;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
||||
ListCell *columnCell = NULL;
|
||||
|
@ -281,7 +275,7 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
|||
scan->cs_base.rs_flags = flags;
|
||||
scan->cs_base.rs_parallel = parallel_scan;
|
||||
|
||||
columnList = RelationColumnList(relation);
|
||||
List *columnList = RelationColumnList(relation);
|
||||
|
||||
/* only collect columns that we need for the scan */
|
||||
foreach(columnCell, columnList)
|
||||
|
@ -293,7 +287,8 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
|
|||
}
|
||||
}
|
||||
|
||||
readState = CStoreBeginRead(relation, tupdesc, neededColumnList, scanQual);
|
||||
TableReadState *readState = CStoreBeginRead(relation, tupdesc, neededColumnList,
|
||||
scanQual);
|
||||
|
||||
scan->cs_readState = readState;
|
||||
|
||||
|
@ -324,13 +319,12 @@ static bool
|
|||
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
|
||||
{
|
||||
CStoreScanDesc scan = (CStoreScanDesc) sscan;
|
||||
bool nextRowFound;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
||||
|
||||
ExecClearTuple(slot);
|
||||
|
||||
nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values,
|
||||
slot->tts_isnull);
|
||||
bool nextRowFound = CStoreReadNextRow(scan->cs_readState, slot->tts_values,
|
||||
slot->tts_isnull);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
|
@ -443,12 +437,11 @@ static void
|
|||
cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||
int options, BulkInsertState bistate)
|
||||
{
|
||||
HeapTuple heapTuple;
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
|
||||
|
||||
cstore_init_write_state(relation);
|
||||
|
||||
heapTuple = ExecCopySlotHeapTuple(slot);
|
||||
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
|
||||
if (HeapTupleHasExternal(heapTuple))
|
||||
{
|
||||
/* detoast any toasted attributes */
|
||||
|
@ -559,7 +552,6 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
TransactionId *freezeXid,
|
||||
MultiXactId *minmulti)
|
||||
{
|
||||
SMgrRelation srel;
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
uint64 blockRowCount = 0;
|
||||
uint64 stripeRowCount = 0;
|
||||
|
@ -587,7 +579,7 @@ cstore_relation_set_new_filenode(Relation rel,
|
|||
Assert(persistence == RELPERSISTENCE_PERMANENT);
|
||||
*freezeXid = RecentXmin;
|
||||
*minmulti = GetOldestMultiXactId();
|
||||
srel = RelationCreateStorage(*newrnode, persistence);
|
||||
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence);
|
||||
InitCStoreDataFileMetadata(newrnode->relNode, blockRowCount, stripeRowCount,
|
||||
compression);
|
||||
smgrclose(srel);
|
||||
|
@ -639,11 +631,6 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
double *tups_vacuumed,
|
||||
double *tups_recently_dead)
|
||||
{
|
||||
TableWriteState *writeState = NULL;
|
||||
TableReadState *readState = NULL;
|
||||
CStoreOptions *cstoreOptions = NULL;
|
||||
Datum *values = NULL;
|
||||
bool *nulls = NULL;
|
||||
TupleDesc sourceDesc = RelationGetDescr(OldHeap);
|
||||
TupleDesc targetDesc = RelationGetDescr(NewHeap);
|
||||
|
||||
|
@ -664,7 +651,7 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
* relation first.
|
||||
*/
|
||||
|
||||
cstoreOptions = CStoreTableAMGetOptions(OldHeap);
|
||||
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap);
|
||||
|
||||
UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode,
|
||||
cstoreOptions->blockRowCount,
|
||||
|
@ -673,16 +660,17 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
|||
|
||||
cstoreOptions = CStoreTableAMGetOptions(NewHeap);
|
||||
|
||||
writeState = CStoreBeginWrite(NewHeap,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
targetDesc);
|
||||
TableWriteState *writeState = CStoreBeginWrite(NewHeap,
|
||||
cstoreOptions->compressionType,
|
||||
cstoreOptions->stripeRowCount,
|
||||
cstoreOptions->blockRowCount,
|
||||
targetDesc);
|
||||
|
||||
readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList(OldHeap), NULL);
|
||||
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList(
|
||||
OldHeap), NULL);
|
||||
|
||||
values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||
nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
||||
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
|
||||
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
|
||||
|
||||
*num_tuples = 0;
|
||||
|
||||
|
@ -727,7 +715,6 @@ cstore_vacuum_rel(Relation rel, VacuumParams *params,
|
|||
static void
|
||||
LogRelationStats(Relation rel, int elevel)
|
||||
{
|
||||
DataFileMetadata *datafileMetadata = NULL;
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
Oid relfilenode = rel->rd_node.relNode;
|
||||
StringInfo infoBuf = makeStringInfo();
|
||||
|
@ -736,13 +723,11 @@ LogRelationStats(Relation rel, int elevel)
|
|||
uint64 totalStripeLength = 0;
|
||||
uint64 tupleCount = 0;
|
||||
uint64 blockCount = 0;
|
||||
uint64 relPages = 0;
|
||||
int stripeCount = 0;
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
uint64 droppedBlocksWithData = 0;
|
||||
|
||||
datafileMetadata = ReadDataFileMetadata(relfilenode, false);
|
||||
stripeCount = list_length(datafileMetadata->stripeMetadataList);
|
||||
DataFileMetadata *datafileMetadata = ReadDataFileMetadata(relfilenode, false);
|
||||
int stripeCount = list_length(datafileMetadata->stripeMetadataList);
|
||||
|
||||
foreach(stripeMetadataCell, datafileMetadata->stripeMetadataList)
|
||||
{
|
||||
|
@ -777,7 +762,7 @@ LogRelationStats(Relation rel, int elevel)
|
|||
}
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
uint64 relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
appendStringInfo(infoBuf, "total file size: %ld, total data size: %ld\n",
|
||||
|
@ -815,9 +800,6 @@ static void
|
|||
TruncateCStore(Relation rel, int elevel)
|
||||
{
|
||||
PGRUsage ru0;
|
||||
BlockNumber old_rel_pages = 0;
|
||||
BlockNumber new_rel_pages = 0;
|
||||
SmgrAddr highestPhysicalAddress;
|
||||
|
||||
pg_rusage_init(&ru0);
|
||||
|
||||
|
@ -851,7 +833,7 @@ TruncateCStore(Relation rel, int elevel)
|
|||
}
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
/*
|
||||
|
@ -859,10 +841,10 @@ TruncateCStore(Relation rel, int elevel)
|
|||
* new stripes be added beyond highestPhysicalAddress while
|
||||
* we're truncating.
|
||||
*/
|
||||
highestPhysicalAddress =
|
||||
SmgrAddr highestPhysicalAddress =
|
||||
logical_to_smgr(GetHighestUsedAddress(rel->rd_node.relNode));
|
||||
|
||||
new_rel_pages = highestPhysicalAddress.blockno + 1;
|
||||
BlockNumber new_rel_pages = highestPhysicalAddress.blockno + 1;
|
||||
if (new_rel_pages == old_rel_pages)
|
||||
{
|
||||
UnlockRelation(rel, AccessExclusiveLock);
|
||||
|
@ -1104,11 +1086,9 @@ CStoreTableAMProcessUtility(PlannedStmt * plannedStatement,
|
|||
if (nodeTag(parseTree) == T_CreateTrigStmt)
|
||||
{
|
||||
CreateTrigStmt *createTrigStmt = (CreateTrigStmt *) parseTree;
|
||||
Relation rel;
|
||||
bool isCStore;
|
||||
|
||||
rel = relation_openrv(createTrigStmt->relation, AccessShareLock);
|
||||
isCStore = rel->rd_tableam == GetCstoreTableAmRoutine();
|
||||
Relation rel = relation_openrv(createTrigStmt->relation, AccessShareLock);
|
||||
bool isCStore = rel->rd_tableam == GetCstoreTableAmRoutine();
|
||||
relation_close(rel, AccessShareLock);
|
||||
|
||||
if (isCStore &&
|
||||
|
@ -1201,9 +1181,6 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
|
|||
static bool
|
||||
IsCStoreTableAmTable(Oid relationId)
|
||||
{
|
||||
bool result;
|
||||
Relation rel;
|
||||
|
||||
if (!OidIsValid(relationId))
|
||||
{
|
||||
return false;
|
||||
|
@ -1213,8 +1190,8 @@ IsCStoreTableAmTable(Oid relationId)
|
|||
* Lock relation to prevent it from being dropped &
|
||||
* avoid race conditions.
|
||||
*/
|
||||
rel = relation_open(relationId, AccessShareLock);
|
||||
result = rel->rd_tableam == GetCstoreTableAmRoutine();
|
||||
Relation rel = relation_open(relationId, AccessShareLock);
|
||||
bool result = rel->rd_tableam == GetCstoreTableAmRoutine();
|
||||
relation_close(rel, NoLock);
|
||||
|
||||
return result;
|
||||
|
@ -1317,9 +1294,6 @@ Datum
|
|||
alter_cstore_table_set(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
int blockRowCount = 0;
|
||||
int stripeRowCount = 0;
|
||||
CompressionType compression = COMPRESSION_TYPE_INVALID;
|
||||
|
||||
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
|
@ -1329,9 +1303,9 @@ alter_cstore_table_set(PG_FUNCTION_ARGS)
|
|||
quote_identifier(RelationGetRelationName(rel)))));
|
||||
}
|
||||
|
||||
blockRowCount = metadata->blockRowCount;
|
||||
stripeRowCount = metadata->stripeRowCount;
|
||||
compression = metadata->compression;
|
||||
int blockRowCount = metadata->blockRowCount;
|
||||
int stripeRowCount = metadata->stripeRowCount;
|
||||
CompressionType compression = metadata->compression;
|
||||
|
||||
/* block_row_count => not null */
|
||||
if (!PG_ARGISNULL(1))
|
||||
|
@ -1375,9 +1349,6 @@ Datum
|
|||
alter_cstore_table_reset(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
int blockRowCount = 0;
|
||||
int stripeRowCount = 0;
|
||||
CompressionType compression = COMPRESSION_TYPE_INVALID;
|
||||
|
||||
Relation rel = table_open(relationId, AccessExclusiveLock); /* ALTER TABLE LOCK */
|
||||
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, true);
|
||||
|
@ -1387,9 +1358,9 @@ alter_cstore_table_reset(PG_FUNCTION_ARGS)
|
|||
quote_identifier(RelationGetRelationName(rel)))));
|
||||
}
|
||||
|
||||
blockRowCount = metadata->blockRowCount;
|
||||
stripeRowCount = metadata->stripeRowCount;
|
||||
compression = metadata->compression;
|
||||
int blockRowCount = metadata->blockRowCount;
|
||||
int stripeRowCount = metadata->stripeRowCount;
|
||||
CompressionType compression = metadata->compression;
|
||||
|
||||
/* block_row_count => true */
|
||||
if (!PG_ARGISNULL(1) && PG_GETARG_BOOL(1))
|
||||
|
|
|
@ -61,17 +61,11 @@ CStoreBeginWrite(Relation relation,
|
|||
uint64 stripeMaxRowCount, uint32 blockRowCount,
|
||||
TupleDesc tupleDescriptor)
|
||||
{
|
||||
TableWriteState *writeState = NULL;
|
||||
FmgrInfo **comparisonFunctionArray = NULL;
|
||||
MemoryContext stripeWriteContext = NULL;
|
||||
uint32 columnCount = 0;
|
||||
uint32 columnIndex = 0;
|
||||
bool *columnMaskArray = NULL;
|
||||
BlockData *blockData = NULL;
|
||||
|
||||
/* get comparison function pointers for each of the columns */
|
||||
columnCount = tupleDescriptor->natts;
|
||||
comparisonFunctionArray = palloc0(columnCount * sizeof(FmgrInfo *));
|
||||
uint32 columnCount = tupleDescriptor->natts;
|
||||
FmgrInfo **comparisonFunctionArray = palloc0(columnCount * sizeof(FmgrInfo *));
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
FmgrInfo *comparisonFunction = NULL;
|
||||
|
@ -94,16 +88,17 @@ CStoreBeginWrite(Relation relation,
|
|||
* reset this memory context once we have flushed the stripe to the file.
|
||||
* This is to avoid memory leaks.
|
||||
*/
|
||||
stripeWriteContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Stripe Write Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContext stripeWriteContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"Stripe Write Memory Context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
columnMaskArray = palloc(columnCount * sizeof(bool));
|
||||
bool *columnMaskArray = palloc(columnCount * sizeof(bool));
|
||||
memset(columnMaskArray, true, columnCount);
|
||||
|
||||
blockData = CreateEmptyBlockData(columnCount, columnMaskArray, blockRowCount);
|
||||
BlockData *blockData = CreateEmptyBlockData(columnCount, columnMaskArray,
|
||||
blockRowCount);
|
||||
|
||||
writeState = palloc0(sizeof(TableWriteState));
|
||||
TableWriteState *writeState = palloc0(sizeof(TableWriteState));
|
||||
writeState->relation = relation;
|
||||
writeState->compressionType = compressionType;
|
||||
writeState->stripeMaxRowCount = stripeMaxRowCount;
|
||||
|
@ -132,8 +127,6 @@ void
|
|||
CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNulls)
|
||||
{
|
||||
uint32 columnIndex = 0;
|
||||
uint32 blockIndex = 0;
|
||||
uint32 blockRowIndex = 0;
|
||||
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
|
||||
StripeSkipList *stripeSkipList = writeState->stripeSkipList;
|
||||
uint32 columnCount = writeState->tupleDescriptor->natts;
|
||||
|
@ -161,8 +154,8 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
|
|||
}
|
||||
}
|
||||
|
||||
blockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||
blockRowIndex = stripeBuffers->rowCount % blockRowCount;
|
||||
uint32 blockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||
uint32 blockRowIndex = stripeBuffers->rowCount % blockRowCount;
|
||||
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
|
@ -257,7 +250,6 @@ static StripeBuffers *
|
|||
CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
||||
uint32 columnCount)
|
||||
{
|
||||
StripeBuffers *stripeBuffers = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 maxBlockCount = (stripeMaxRowCount / blockRowCount) + 1;
|
||||
ColumnBuffers **columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *));
|
||||
|
@ -280,7 +272,7 @@ CreateEmptyStripeBuffers(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
|||
columnBuffersArray[columnIndex]->blockBuffersArray = blockBuffersArray;
|
||||
}
|
||||
|
||||
stripeBuffers = palloc0(sizeof(StripeBuffers));
|
||||
StripeBuffers *stripeBuffers = palloc0(sizeof(StripeBuffers));
|
||||
stripeBuffers->columnBuffersArray = columnBuffersArray;
|
||||
stripeBuffers->columnCount = columnCount;
|
||||
stripeBuffers->rowCount = 0;
|
||||
|
@ -298,7 +290,6 @@ static StripeSkipList *
|
|||
CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
||||
uint32 columnCount)
|
||||
{
|
||||
StripeSkipList *stripeSkipList = NULL;
|
||||
uint32 columnIndex = 0;
|
||||
uint32 maxBlockCount = (stripeMaxRowCount / blockRowCount) + 1;
|
||||
|
||||
|
@ -310,7 +301,7 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 blockRowCount,
|
|||
palloc0(maxBlockCount * sizeof(ColumnBlockSkipNode));
|
||||
}
|
||||
|
||||
stripeSkipList = palloc0(sizeof(StripeSkipList));
|
||||
StripeSkipList *stripeSkipList = palloc0(sizeof(StripeSkipList));
|
||||
stripeSkipList->columnCount = columnCount;
|
||||
stripeSkipList->blockCount = 0;
|
||||
stripeSkipList->blockSkipNodeArray = blockSkipNodeArray;
|
||||
|
@ -328,13 +319,9 @@ WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
|||
while (remaining > 0)
|
||||
{
|
||||
SmgrAddr addr = logical_to_smgr(logicalOffset);
|
||||
BlockNumber nblocks;
|
||||
Page page;
|
||||
PageHeader phdr;
|
||||
uint64 to_write;
|
||||
|
||||
RelationOpenSmgr(rel);
|
||||
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
||||
Assert(addr.blockno < nblocks);
|
||||
(void) nblocks; /* keep compiler quiet */
|
||||
RelationCloseSmgr(rel);
|
||||
|
@ -342,8 +329,8 @@ WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
|||
buffer = ReadBuffer(rel, addr.blockno);
|
||||
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
|
||||
|
||||
page = BufferGetPage(buffer);
|
||||
phdr = (PageHeader) page;
|
||||
Page page = BufferGetPage(buffer);
|
||||
PageHeader phdr = (PageHeader) page;
|
||||
if (PageIsNew(page))
|
||||
{
|
||||
PageInit(page, BLCKSZ, 0);
|
||||
|
@ -366,7 +353,7 @@ WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
|||
|
||||
START_CRIT_SECTION();
|
||||
|
||||
to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining);
|
||||
uint64 to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining);
|
||||
memcpy(page + phdr->pd_lower, data, to_write);
|
||||
phdr->pd_lower += to_write;
|
||||
|
||||
|
@ -374,8 +361,6 @@ WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
|||
|
||||
if (RelationNeedsWAL(rel))
|
||||
{
|
||||
XLogRecPtr recptr = 0;
|
||||
|
||||
XLogBeginInsert();
|
||||
|
||||
/*
|
||||
|
@ -384,7 +369,7 @@ WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
|
|||
*/
|
||||
XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE);
|
||||
|
||||
recptr = XLogInsert(RM_GENERIC_ID, 0);
|
||||
XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0);
|
||||
PageSetLSN(page, recptr);
|
||||
}
|
||||
|
||||
|
@ -420,7 +405,6 @@ FlushStripe(TableWriteState *writeState)
|
|||
uint32 blockRowCount = writeState->blockRowCount;
|
||||
uint32 lastBlockIndex = stripeBuffers->rowCount / blockRowCount;
|
||||
uint32 lastBlockRowCount = stripeBuffers->rowCount % blockRowCount;
|
||||
uint64 currentFileOffset = 0;
|
||||
uint64 stripeSize = 0;
|
||||
uint64 stripeRowCount = 0;
|
||||
|
||||
|
@ -477,7 +461,7 @@ FlushStripe(TableWriteState *writeState)
|
|||
stripeRowCount, columnCount, blockCount,
|
||||
blockRowCount);
|
||||
|
||||
currentFileOffset = stripeMetadata.fileOffset;
|
||||
uint64 currentFileOffset = stripeMetadata.fileOffset;
|
||||
|
||||
/*
|
||||
* Each stripe has only one section:
|
||||
|
@ -531,11 +515,10 @@ FlushStripe(TableWriteState *writeState)
|
|||
static StringInfo
|
||||
SerializeBoolArray(bool *boolArray, uint32 boolArrayLength)
|
||||
{
|
||||
StringInfo boolArrayBuffer = NULL;
|
||||
uint32 boolArrayIndex = 0;
|
||||
uint32 byteCount = (boolArrayLength + 7) / 8;
|
||||
|
||||
boolArrayBuffer = makeStringInfo();
|
||||
StringInfo boolArrayBuffer = makeStringInfo();
|
||||
enlargeStringInfo(boolArrayBuffer, byteCount);
|
||||
boolArrayBuffer->len = byteCount;
|
||||
memset(boolArrayBuffer->data, 0, byteCount);
|
||||
|
@ -564,11 +547,10 @@ SerializeSingleDatum(StringInfo datumBuffer, Datum datum, bool datumTypeByValue,
|
|||
{
|
||||
uint32 datumLength = att_addlength_datum(0, datumTypeLength, datum);
|
||||
uint32 datumLengthAligned = att_align_nominal(datumLength, datumTypeAlign);
|
||||
char *currentDatumDataPointer = NULL;
|
||||
|
||||
enlargeStringInfo(datumBuffer, datumLengthAligned);
|
||||
|
||||
currentDatumDataPointer = datumBuffer->data + datumBuffer->len;
|
||||
char *currentDatumDataPointer = datumBuffer->data + datumBuffer->len;
|
||||
memset(currentDatumDataPointer, 0, datumLengthAligned);
|
||||
|
||||
if (datumTypeLength > 0)
|
||||
|
@ -624,11 +606,9 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
|
|||
{
|
||||
ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex];
|
||||
ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex];
|
||||
StringInfo serializedValueBuffer = NULL;
|
||||
CompressionType actualCompressionType = COMPRESSION_NONE;
|
||||
bool compressed = false;
|
||||
|
||||
serializedValueBuffer = blockData->valueBufferArray[columnIndex];
|
||||
StringInfo serializedValueBuffer = blockData->valueBufferArray[columnIndex];
|
||||
|
||||
/* the only other supported compression type is pg_lz for now */
|
||||
Assert(requestedCompressionType == COMPRESSION_NONE ||
|
||||
|
@ -638,8 +618,8 @@ SerializeBlockData(TableWriteState *writeState, uint32 blockIndex, uint32 rowCou
|
|||
* if serializedValueBuffer is be compressed, update serializedValueBuffer
|
||||
* with compressed data and store compression type.
|
||||
*/
|
||||
compressed = CompressBuffer(serializedValueBuffer, compressionBuffer,
|
||||
requestedCompressionType);
|
||||
bool compressed = CompressBuffer(serializedValueBuffer, compressionBuffer,
|
||||
requestedCompressionType);
|
||||
if (compressed)
|
||||
{
|
||||
serializedValueBuffer = compressionBuffer;
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-insert s1-begin s1-insert s2-vacuum s1-commit s2-select
|
||||
step s1-insert:
|
||||
step s1-insert:
|
||||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
step s1-begin:
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
step s1-insert:
|
||||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
s2: INFO: statistics for "test_vacuum_vs_insert":
|
||||
|
@ -16,38 +16,38 @@ total row count: 3, stripe count: 1, average rows per stripe: 3
|
|||
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0
|
||||
|
||||
s2: INFO: "test_vacuum_vs_insert": stopping truncate due to conflicting lock request
|
||||
step s2-vacuum:
|
||||
step s2-vacuum:
|
||||
VACUUM VERBOSE test_vacuum_vs_insert;
|
||||
|
||||
step s1-commit:
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-select:
|
||||
step s2-select:
|
||||
SELECT * FROM test_vacuum_vs_insert;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
|
||||
starting permutation: s1-insert s1-begin s1-insert s2-vacuum-full s1-commit s2-select
|
||||
step s1-insert:
|
||||
step s1-insert:
|
||||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
step s1-begin:
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
step s1-insert:
|
||||
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
step s2-vacuum-full:
|
||||
step s2-vacuum-full:
|
||||
VACUUM FULL VERBOSE test_vacuum_vs_insert;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
s2: INFO: vacuuming "public.test_vacuum_vs_insert"
|
||||
|
@ -55,14 +55,14 @@ s2: INFO: "test_vacuum_vs_insert": found 0 removable, 6 nonremovable row versio
|
|||
DETAIL: 0 dead row versions cannot be removed yet.
|
||||
CPU: user: 0.00 s, system: 0.00 s, elapsed: 0.00 s.
|
||||
step s2-vacuum-full: <... completed>
|
||||
step s2-select:
|
||||
step s2-select:
|
||||
SELECT * FROM test_vacuum_vs_insert;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
|
|
|
@ -1,142 +1,142 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-insert s2-insert s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
step s1-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||
|
||||
step s2-insert:
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
step s2-select:
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
1 2
|
||||
2 4
|
||||
3 6
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
|
||||
starting permutation: s1-begin s2-begin s1-copy s2-insert s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-copy:
|
||||
step s1-copy:
|
||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||
|
||||
step s2-insert:
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
||||
|
||||
starting permutation: s1-begin s2-begin s2-insert s1-copy s1-select s2-select s1-commit s2-commit s1-select
|
||||
step s1-begin:
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-begin:
|
||||
step s2-begin:
|
||||
BEGIN;
|
||||
|
||||
step s2-insert:
|
||||
step s2-insert:
|
||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||
|
||||
step s1-copy:
|
||||
step s1-copy:
|
||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
11
|
||||
12
|
||||
13
|
||||
step s2-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit:
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-select:
|
||||
step s1-select:
|
||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||
|
||||
a b
|
||||
a b
|
||||
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
||||
4 8
|
||||
5 10
|
||||
6 12
|
||||
11
|
||||
12
|
||||
13
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
Parsed test spec with 1 sessions
|
||||
|
||||
starting permutation: s1a
|
||||
step s1a:
|
||||
step s1a:
|
||||
CREATE EXTENSION cstore_fdw;
|
||||
|
||||
|
|
|
@ -6,19 +6,19 @@ CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;
|
|||
|
||||
|
||||
-- Validator tests
|
||||
CREATE FOREIGN TABLE test_validator_invalid_option ()
|
||||
SERVER cstore_server
|
||||
CREATE FOREIGN TABLE test_validator_invalid_option ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(bad_option_name '1'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
|
||||
CREATE FOREIGN TABLE test_validator_invalid_stripe_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(stripe_row_count '0'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
|
||||
CREATE FOREIGN TABLE test_validator_invalid_block_row_count ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(block_row_count '0'); -- ERROR
|
||||
|
||||
CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
|
||||
CREATE FOREIGN TABLE test_validator_invalid_compression_type ()
|
||||
SERVER cstore_server
|
||||
OPTIONS(compression 'invalid_compression'); -- ERROR
|
||||
|
||||
|
|
|
@ -125,7 +125,7 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
|
|||
TRUNCATE TABLE truncate_schema.truncate_tbl;
|
||||
SELECT count(*) FROM truncate_schema.truncate_tbl;
|
||||
|
||||
-- switch to super user, grant truncate to truncate_user
|
||||
-- switch to super user, grant truncate to truncate_user
|
||||
\c - :current_user
|
||||
GRANT TRUNCATE ON TABLE truncate_schema.truncate_tbl TO truncate_user;
|
||||
|
||||
|
|
|
@ -118,7 +118,7 @@ SELECT count(*) FROM truncate_schema.truncate_tbl;
|
|||
TRUNCATE TABLE truncate_schema.truncate_tbl;
|
||||
SELECT count(*) FROM truncate_schema.truncate_tbl;
|
||||
|
||||
-- switch to super user, grant truncate to truncate_user
|
||||
-- switch to super user, grant truncate to truncate_user
|
||||
\c - :current_user
|
||||
GRANT TRUNCATE ON TABLE truncate_schema.truncate_tbl TO truncate_user;
|
||||
|
||||
|
|
Loading…
Reference in New Issue