mirror of https://github.com/citusdata/citus.git
Update columnar code to support PG 15.
May require additional tweaks as release gets closer.pull/5859/head
parent
7dc0a94293
commit
45e27de88e
|
@ -116,8 +116,6 @@ columnar_storage_info(PG_FUNCTION_ARGS)
|
||||||
RelationGetRelationName(rel))));
|
RelationGetRelationName(rel))));
|
||||||
}
|
}
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
|
||||||
|
|
||||||
Datum values[STORAGE_INFO_NATTS] = { 0 };
|
Datum values[STORAGE_INFO_NATTS] = { 0 };
|
||||||
bool nulls[STORAGE_INFO_NATTS] = { 0 };
|
bool nulls[STORAGE_INFO_NATTS] = { 0 };
|
||||||
|
|
||||||
|
|
|
@ -1433,7 +1433,7 @@ DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
|
||||||
simple_heap_delete(state->rel, tid);
|
simple_heap_delete(state->rel, tid);
|
||||||
|
|
||||||
/* execute AFTER ROW DELETE Triggers to enforce constraints */
|
/* execute AFTER ROW DELETE Triggers to enforce constraints */
|
||||||
ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL);
|
ExecARDeleteTriggers_compat(estate, resultRelInfo, tid, NULL, NULL, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1738,11 +1738,10 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
if (nblocks < 2)
|
if (nblocks < 2)
|
||||||
{
|
{
|
||||||
ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId());
|
ColumnarStorageInit(RelationGetSmgr(rel), ColumnarMetadataNewStorageId());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
|
||||||
|
#include "columnar/columnar_version_compat.h"
|
||||||
|
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_storage.h"
|
#include "columnar/columnar_storage.h"
|
||||||
|
|
||||||
|
@ -354,8 +356,7 @@ ColumnarStorageGetReservedOffset(Relation rel, bool force)
|
||||||
bool
|
bool
|
||||||
ColumnarStorageIsCurrent(Relation rel)
|
ColumnarStorageIsCurrent(Relation rel)
|
||||||
{
|
{
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
|
|
||||||
if (nblocks < 2)
|
if (nblocks < 2)
|
||||||
{
|
{
|
||||||
|
@ -439,8 +440,7 @@ ColumnarStorageReserveData(Relation rel, uint64 amount)
|
||||||
PhysicalAddr final = LogicalToPhysical(nextReservation - 1);
|
PhysicalAddr final = LogicalToPhysical(nextReservation - 1);
|
||||||
|
|
||||||
/* extend with new pages */
|
/* extend with new pages */
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
|
|
||||||
while (nblocks <= final.blockno)
|
while (nblocks <= final.blockno)
|
||||||
{
|
{
|
||||||
|
@ -547,8 +547,7 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation)
|
||||||
rel->rd_id, newDataReservation);
|
rel->rd_id, newDataReservation);
|
||||||
}
|
}
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
if (old_rel_pages == 0)
|
if (old_rel_pages == 0)
|
||||||
{
|
{
|
||||||
/* nothing to do */
|
/* nothing to do */
|
||||||
|
@ -627,8 +626,7 @@ ColumnarOverwriteMetapage(Relation relation, ColumnarMetapage columnarMetapage)
|
||||||
static ColumnarMetapage
|
static ColumnarMetapage
|
||||||
ColumnarMetapageRead(Relation rel, bool force)
|
ColumnarMetapageRead(Relation rel, bool force)
|
||||||
{
|
{
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
if (nblocks == 0)
|
if (nblocks == 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -913,8 +913,7 @@ columnar_relation_nontransactional_truncate(Relation rel)
|
||||||
RelationTruncate(rel, 0);
|
RelationTruncate(rel, 0);
|
||||||
|
|
||||||
uint64 storageId = ColumnarMetadataNewStorageId();
|
uint64 storageId = ColumnarMetadataNewStorageId();
|
||||||
RelationOpenSmgr(rel);
|
ColumnarStorageInit(RelationGetSmgr(rel), storageId);
|
||||||
ColumnarStorageInit(rel->rd_smgr, storageId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1136,8 +1135,7 @@ LogRelationStats(Relation rel, int elevel)
|
||||||
totalStripeLength += stripe->dataLength;
|
totalStripeLength += stripe->dataLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
uint64 relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
RelationCloseSmgr(rel);
|
RelationCloseSmgr(rel);
|
||||||
|
|
||||||
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
||||||
|
@ -1239,8 +1237,7 @@ TruncateColumnar(Relation rel, int elevel)
|
||||||
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1,
|
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1,
|
||||||
ColumnarFirstLogicalOffset);
|
ColumnarFirstLogicalOffset);
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
|
|
||||||
if (!ColumnarStorageTruncate(rel, newDataReservation))
|
if (!ColumnarStorageTruncate(rel, newDataReservation))
|
||||||
{
|
{
|
||||||
|
@ -1248,8 +1245,7 @@ TruncateColumnar(Relation rel, int elevel)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
RelationOpenSmgr(rel);
|
BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We can release the exclusive lock as soon as we have truncated.
|
* We can release the exclusive lock as soon as we have truncated.
|
||||||
|
@ -1784,20 +1780,17 @@ columnar_relation_size(Relation rel, ForkNumber forkNumber)
|
||||||
|
|
||||||
uint64 nblocks = 0;
|
uint64 nblocks = 0;
|
||||||
|
|
||||||
/* Open it at the smgr level if not already done */
|
|
||||||
RelationOpenSmgr(rel);
|
|
||||||
|
|
||||||
/* InvalidForkNumber indicates returning the size for all forks */
|
/* InvalidForkNumber indicates returning the size for all forks */
|
||||||
if (forkNumber == InvalidForkNumber)
|
if (forkNumber == InvalidForkNumber)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < MAX_FORKNUM; i++)
|
for (int i = 0; i < MAX_FORKNUM; i++)
|
||||||
{
|
{
|
||||||
nblocks += smgrnblocks(rel->rd_smgr, i);
|
nblocks += smgrnblocks(RelationGetSmgr(rel), i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
nblocks = smgrnblocks(rel->rd_smgr, forkNumber);
|
nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
return nblocks * BLCKSZ;
|
return nblocks * BLCKSZ;
|
||||||
|
@ -1819,8 +1812,7 @@ columnar_estimate_rel_size(Relation rel, int32 *attr_widths,
|
||||||
double *allvisfrac)
|
double *allvisfrac)
|
||||||
{
|
{
|
||||||
CheckCitusColumnarVersion(ERROR);
|
CheckCitusColumnarVersion(ERROR);
|
||||||
RelationOpenSmgr(rel);
|
*pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||||
*pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
|
|
||||||
*tuples = ColumnarTableRowCount(rel);
|
*tuples = ColumnarTableRowCount(rel);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2295,18 +2287,30 @@ detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull)
|
||||||
static void
|
static void
|
||||||
ColumnarCheckLogicalReplication(Relation rel)
|
ColumnarCheckLogicalReplication(Relation rel)
|
||||||
{
|
{
|
||||||
|
bool pubActionInsert = false;
|
||||||
|
|
||||||
if (!is_publishable_relation(rel))
|
if (!is_publishable_relation(rel))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
{
|
||||||
|
PublicationDesc pubdesc;
|
||||||
|
|
||||||
|
RelationBuildPublicationDesc(rel, &pubdesc);
|
||||||
|
pubActionInsert = pubdesc.pubactions.pubupdate;
|
||||||
|
}
|
||||||
|
#else
|
||||||
if (rel->rd_pubactions == NULL)
|
if (rel->rd_pubactions == NULL)
|
||||||
{
|
{
|
||||||
GetRelationPublicationActions(rel);
|
GetRelationPublicationActions(rel);
|
||||||
Assert(rel->rd_pubactions != NULL);
|
Assert(rel->rd_pubactions != NULL);
|
||||||
}
|
}
|
||||||
|
pubActionInsert = rel->rd_pubactions->pubinsert;
|
||||||
|
#endif
|
||||||
|
|
||||||
if (rel->rd_pubactions->pubinsert)
|
if (pubActionInsert)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg(
|
errmsg(
|
||||||
|
|
|
@ -14,6 +14,26 @@
|
||||||
|
|
||||||
#include "distributed/pg_version_constants.h"
|
#include "distributed/pg_version_constants.h"
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \
|
||||||
|
ExecARDeleteTriggers(a, b, c, d, e, f)
|
||||||
|
#else
|
||||||
|
#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \
|
||||||
|
ExecARDeleteTriggers(a, b, c, d, e)
|
||||||
|
#include "storage/smgr.h"
|
||||||
|
static inline SMgrRelation
|
||||||
|
RelationGetSmgr(Relation rel)
|
||||||
|
{
|
||||||
|
if (unlikely(rel->rd_smgr == NULL))
|
||||||
|
{
|
||||||
|
smgrsetowner(&(rel->rd_smgr), smgropen(rel->rd_node, rel->rd_backend));
|
||||||
|
}
|
||||||
|
return rel->rd_smgr;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_14
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
||||||
#define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
#define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
||||||
ColumnarProcessUtility(a, b, c, d, e, f, g, h)
|
ColumnarProcessUtility(a, b, c, d, e, f, g, h)
|
||||||
|
|
Loading…
Reference in New Issue