Address feedback

merge-cstore-pykello
Hadi Moshayedi 2020-10-01 21:09:47 -07:00
parent d37c717e14
commit a87c15a1e1
5 changed files with 162 additions and 100 deletions

View File

@ -16,13 +16,9 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include "access/heapam.h"
#include "catalog/objectaccess.h"
#include "catalog/storage.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "cstore.h" #include "cstore.h"
@ -42,11 +38,6 @@ static const struct config_enum_entry cstore_compression_options[] =
{ NULL, 0, false } { NULL, 0, false }
}; };
static object_access_hook_type prevObjectAccess = NULL;
static void ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId,
void *arg);
void void
cstore_init() cstore_init()
{ {
@ -87,9 +78,6 @@ cstore_init()
NULL, NULL,
NULL, NULL,
NULL); NULL);
prevObjectAccess = object_access_hook;
object_access_hook = ObjectAccess;
} }
@ -122,71 +110,3 @@ InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions)
{ {
InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount); InitCStoreTableMetadata(relNode, cstoreOptions->blockRowCount);
} }
/*
* Implements object_access_hook. One of the places this is called is just
* before dropping an object, which allows us to clean-up resources for
* cstore tables while the pg_class record for the table is still there.
*/
static void
ObjectAccess(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg)
{
if (prevObjectAccess)
{
prevObjectAccess(access, classId, objectId, subId, arg);
}
/*
* Do nothing if this is not a DROP relation command.
*/
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
{
return;
}
if (IsCStoreFdwTable(objectId))
{
/*
* Drop both metadata and storage. We need to drop storage here since
* we manage relfilenode for FDW tables in the extension.
*/
Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock);
RelationOpenSmgr(rel);
RelationDropStorage(rel);
DeleteTableMetadataRowIfExists(rel->rd_node.relNode);
/* keep the lock since we did physical changes to the relation */
relation_close(rel, NoLock);
}
else
{
Oid relNode = InvalidOid;
Relation rel = try_relation_open(objectId, AccessExclusiveLock);
if (rel == NULL)
{
return;
}
relNode = rel->rd_node.relNode;
if (IsCStoreStorage(relNode))
{
/*
* Drop only metadata for table am cstore tables. Postgres manages
* storage for these tables, so we don't need to drop that.
*/
DeleteTableMetadataRowIfExists(relNode);
/* keep the lock since we did physical changes to the relation */
relation_close(rel, NoLock);
}
else
{
/*
* For non-cstore tables, we do nothing.
* Release the lock since we haven't changed the relation.
*/
relation_close(rel, AccessExclusiveLock);
}
}
}

View File

@ -249,8 +249,6 @@ extern void cstore_init(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString); extern CompressionType ParseCompressionType(const char *compressionTypeString);
extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions); extern void InitializeCStoreTableFile(Oid relNode, CStoreOptions *cstoreOptions);
extern bool IsCStoreFdwTable(Oid relationId);
extern Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
/* Function declarations for writing to a cstore file */ /* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(Relation relation, extern TableWriteState * CStoreBeginWrite(Relation relation,

View File

@ -25,6 +25,7 @@
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_foreign_table.h" #include "catalog/pg_foreign_table.h"
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/storage.h" #include "catalog/storage.h"
@ -54,6 +55,7 @@
#include "parser/parser.h" #include "parser/parser.h"
#include "parser/parse_coerce.h" #include "parser/parse_coerce.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/builtins.h" #include "utils/builtins.h"
@ -105,6 +107,8 @@ static const CStoreValidOption ValidOptionArray[] =
{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId } { OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
}; };
static object_access_hook_type prevObjectAccessHook = NULL;
/* local functions forward declarations */ /* local functions forward declarations */
#if PG_VERSION_NUM >= 100000 #if PG_VERSION_NUM >= 100000
static void CStoreProcessUtility(PlannedStmt *plannedStatement, const char *queryString, static void CStoreProcessUtility(PlannedStmt *plannedStatement, const char *queryString,
@ -130,7 +134,8 @@ static List * FindCStoreTables(List *tableList);
static List * OpenRelationsForTruncate(List *cstoreTableList); static List * OpenRelationsForTruncate(List *cstoreTableList);
static void FdwNewRelFileNode(Relation relation); static void FdwNewRelFileNode(Relation relation);
static void TruncateCStoreTables(List *cstoreRelationList); static void TruncateCStoreTables(List *cstoreRelationList);
static bool CStoreServer(ForeignServer *server); static bool IsCStoreFdwTable(Oid relationId);
static bool IsCStoreServer(ForeignServer *server);
static bool DistributedTable(Oid relationId); static bool DistributedTable(Oid relationId);
static bool DistributedWorkerCopy(CopyStmt *copyStatement); static bool DistributedWorkerCopy(CopyStmt *copyStatement);
static StringInfo OptionNamesString(Oid currentContextId); static StringInfo OptionNamesString(Oid currentContextId);
@ -187,7 +192,11 @@ static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte); RangeTblEntry *rte);
#endif #endif
static void cstore_fdw_initrel(Relation rel); static void cstore_fdw_initrel(Relation rel);
static Relation cstore_fdw_open(Oid relationId, LOCKMODE lockmode);
static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode); static Relation cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode);
static void CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
int subId,
void *arg);
PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger); PG_FUNCTION_INFO_V1(cstore_ddl_event_end_trigger);
PG_FUNCTION_INFO_V1(cstore_table_size); PG_FUNCTION_INFO_V1(cstore_table_size);
@ -209,6 +218,8 @@ cstore_fdw_init()
{ {
PreviousProcessUtilityHook = ProcessUtility_hook; PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = CStoreProcessUtility; ProcessUtility_hook = CStoreProcessUtility;
prevObjectAccessHook = object_access_hook;
object_access_hook = CStoreFdwObjectAccessHook;
} }
@ -251,7 +262,7 @@ cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS)
bool missingOK = false; bool missingOK = false;
ForeignServer *server = GetForeignServerByName(serverName, missingOK); ForeignServer *server = GetForeignServerByName(serverName, missingOK);
if (CStoreServer(server)) if (IsCStoreServer(server))
{ {
Oid relationId = RangeVarGetRelid(createStatement->base.relation, Oid relationId = RangeVarGetRelid(createStatement->base.relation,
AccessShareLock, false); AccessShareLock, false);
@ -358,7 +369,6 @@ CStoreProcessUtility(Node * parseTree, const char * queryString,
CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo, CALL_PREVIOUS_UTILITY(parseTree, queryString, context, paramListInfo,
destReceiver, completionTag); destReceiver, completionTag);
} }
/* handle other utility statements */ /* handle other utility statements */
else else
{ {
@ -895,7 +905,7 @@ IsCStoreFdwTable(Oid relationId)
{ {
ForeignTable *foreignTable = GetForeignTable(relationId); ForeignTable *foreignTable = GetForeignTable(relationId);
ForeignServer *server = GetForeignServer(foreignTable->serverid); ForeignServer *server = GetForeignServer(foreignTable->serverid);
if (CStoreServer(server)) if (IsCStoreServer(server))
{ {
cstoreTable = true; cstoreTable = true;
} }
@ -906,11 +916,11 @@ IsCStoreFdwTable(Oid relationId)
/* /*
* CStoreServer checks if the given foreign server belongs to cstore_fdw. If it * IsCStoreServer checks if the given foreign server belongs to cstore_fdw. If it
* does, the function returns true. Otherwise, it returns false. * does, the function returns true. Otherwise, it returns false.
*/ */
static bool static bool
CStoreServer(ForeignServer *server) IsCStoreServer(ForeignServer *server)
{ {
ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid); ForeignDataWrapper *foreignDataWrapper = GetForeignDataWrapper(server->fdwid);
bool cstoreServer = false; bool cstoreServer = false;
@ -2143,7 +2153,7 @@ cstore_fdw_initrel(Relation rel)
} }
Relation static Relation
cstore_fdw_open(Oid relationId, LOCKMODE lockmode) cstore_fdw_open(Oid relationId, LOCKMODE lockmode)
{ {
Relation rel = heap_open(relationId, lockmode); Relation rel = heap_open(relationId, lockmode);
@ -2163,3 +2173,61 @@ cstore_fdw_openrv(RangeVar *relation, LOCKMODE lockmode)
return rel; return rel;
} }
/*
* Implements object_access_hook. One of the places this is called is just
* before dropping an object, which allows us to clean-up resources for
* cstore tables.
*
* When cleaning up resources, we need to have access to the pg_class record
* for the table so we can indentify the relfilenode belonging to the relation.
* We don't have access to this information in sql_drop event triggers, since
* the relation has already been dropped there. object_access_hook is called
* __before__ dropping tables, so we still have access to the pg_class
* entry here.
*
* Note that the utility hook is called once per __command__, and not for
* every object dropped, and since a drop can cascade to other objects, it
* is difficult to get full set of dropped objects in the utility hook.
* But object_access_hook is called once per dropped object, so it is
* much easier to clean-up all dropped objects here.
*/
static void
CStoreFdwObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId,
int subId, void *arg)
{
if (prevObjectAccessHook)
{
prevObjectAccessHook(access, classId, objectId, subId, arg);
}
/*
* Do nothing if this is not a DROP relation command.
*/
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
{
return;
}
/*
* Lock relation to prevent it from being dropped and to avoid
* race conditions in the next if block.
*/
LockRelationOid(objectId, AccessShareLock);
if (IsCStoreFdwTable(objectId))
{
/*
* Drop both metadata and storage. We need to drop storage here since
* we manage relfilenode for FDW tables in the extension.
*/
Relation rel = cstore_fdw_open(objectId, AccessExclusiveLock);
RelationOpenSmgr(rel);
RelationDropStorage(rel);
DeleteTableMetadataRowIfExists(rel->rd_node.relNode);
/* keep the lock since we did physical changes to the relation */
relation_close(rel, NoLock);
}
}

View File

@ -94,17 +94,6 @@ static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
#define Anum_cstore_skipnodes_value_compression_type 12 #define Anum_cstore_skipnodes_value_compression_type 12
/*
* IsCStoreStorage returns if relfilenode belongs to a cstore table.
*/
bool
IsCStoreStorage(Oid relfilenode)
{
uint64 blockRowCount = 0;
return ReadCStoreTables(relfilenode, &blockRowCount);
}
/* /*
* InitCStoreTableMetadata adds a record for the given relation in cstore_table. * InitCStoreTableMetadata adds a record for the given relation in cstore_table.
*/ */

View File

@ -14,6 +14,8 @@
#include "access/xact.h" #include "access/xact.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_am.h"
#include "catalog/storage.h" #include "catalog/storage.h"
#include "catalog/storage_xlog.h" #include "catalog/storage_xlog.h"
#include "commands/progress.h" #include "commands/progress.h"
@ -30,10 +32,13 @@
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h"
#include "cstore.h" #include "cstore.h"
#include "cstore_tableam.h" #include "cstore_tableam.h"
#define CSTORE_TABLEAM_NAME "cstore_tableam"
typedef struct CStoreScanDescData typedef struct CStoreScanDescData
{ {
TableScanDescData cs_base; TableScanDescData cs_base;
@ -45,6 +50,13 @@ typedef struct CStoreScanDescData *CStoreScanDesc;
static TableWriteState *CStoreWriteState = NULL; static TableWriteState *CStoreWriteState = NULL;
static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL; static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL;
static MemoryContext CStoreContext = NULL; static MemoryContext CStoreContext = NULL;
static object_access_hook_type prevObjectAccessHook = NULL;
/* forward declaration for static functions */
static void CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid
objectId, int subId,
void *arg);
static bool IsCStoreTableAmTable(Oid relationId);
static CStoreOptions * static CStoreOptions *
CStoreTableAMGetOptions(void) CStoreTableAMGetOptions(void)
@ -624,6 +636,8 @@ cstore_tableam_init()
{ {
PreviousExecutorEndHook = ExecutorEnd_hook; PreviousExecutorEndHook = ExecutorEnd_hook;
ExecutorEnd_hook = CStoreExecutorEnd; ExecutorEnd_hook = CStoreExecutorEnd;
prevObjectAccessHook = object_access_hook;
object_access_hook = CStoreTableAMObjectAccessHook;
} }
@ -634,6 +648,79 @@ cstore_tableam_finish()
} }
/*
* Implements object_access_hook. One of the places this is called is just
* before dropping an object, which allows us to clean-up resources for
* cstore tables.
*
* See the comments for CStoreFdwObjectAccessHook for more details.
*/
static void
CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int
subId,
void *arg)
{
if (prevObjectAccessHook)
{
prevObjectAccessHook(access, classId, objectId, subId, arg);
}
/*
* Do nothing if this is not a DROP relation command.
*/
if (access != OAT_DROP || classId != RelationRelationId || OidIsValid(subId))
{
return;
}
/*
* Lock relation to prevent it from being dropped and to avoid
* race conditions in the next if block.
*/
LockRelationOid(objectId, AccessShareLock);
if (IsCStoreTableAmTable(objectId))
{
/*
* Drop metadata. No need to drop storage here since for
* tableam tables storage is managed by postgres.
*/
Relation rel = table_open(objectId, AccessExclusiveLock);
DeleteTableMetadataRowIfExists(rel->rd_node.relNode);
/* keep the lock since we did physical changes to the relation */
table_close(rel, NoLock);
}
}
/*
* IsCStoreTableAmTable returns true if relation has cstore_tableam
* access method. This can be called before extension creation.
*/
static bool
IsCStoreTableAmTable(Oid relationId)
{
bool result;
Relation rel;
if (!OidIsValid(relationId))
{
return false;
}
/*
* Lock relation to prevent it from being dropped &
* avoid race conditions.
*/
rel = relation_open(relationId, AccessShareLock);
result = rel->rd_tableam == GetCstoreTableAmRoutine();
relation_close(rel, NoLock);
return result;
}
static const TableAmRoutine cstore_am_methods = { static const TableAmRoutine cstore_am_methods = {
.type = T_TableAmRoutine, .type = T_TableAmRoutine,