Separate citus and citus_columnar modules.

Now the modules are independent. Columnar can be loaded by itself, or
along with citus.
custom-rmgr-15
Jeff Davis 2022-03-15 16:48:49 -07:00 committed by Jeff Davis
parent 1c83b19159
commit 18ec9f6138
9 changed files with 112 additions and 34 deletions

View File

@ -103,8 +103,8 @@ typedef struct IndexFetchColumnarData
MemoryContext scanContext; MemoryContext scanContext;
} IndexFetchColumnarData; } IndexFetchColumnarData;
/* available to other extensions using find_rendezvous_variable() */
ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook = NULL; static ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL; static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
@ -1910,6 +1910,11 @@ ColumnarSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
void void
columnar_tableam_init() columnar_tableam_init()
{ {
ColumnarTableSetOptions_hook_type **ColumnarTableSetOptions_hook_ptr =
(ColumnarTableSetOptions_hook_type **) find_rendezvous_variable(
"ColumnarTableSetOptions_hook");
*ColumnarTableSetOptions_hook_ptr = &ColumnarTableSetOptions_hook;
RegisterXactCallback(ColumnarXactCallback, NULL); RegisterXactCallback(ColumnarXactCallback, NULL);
RegisterSubXactCallback(ColumnarSubXactCallback, NULL); RegisterSubXactCallback(ColumnarSubXactCallback, NULL);

View File

@ -19,8 +19,6 @@ DATA_built = $(generated_sql_files)
# directories with source files # directories with source files
SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib test transaction utils worker SUBDIRS = . commands connection ddl deparser executor metadata operations planner progress relay safeclib test transaction utils worker
# columnar modules
SUBDIRS += ../columnar
# enterprise modules # enterprise modules
SUBDIRS += SUBDIRS +=
@ -30,11 +28,9 @@ ENSURE_SUBDIRS_EXIST := $(shell mkdir -p $(SUBDIRS))
# That patsubst rule searches all directories listed in SUBDIRS for .c # That patsubst rule searches all directories listed in SUBDIRS for .c
# files, and adds the corresponding .o files to OBJS # files, and adds the corresponding .o files to OBJS
ALL_OBJS = \ OBJS += \
$(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c)))) $(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c))))
OBJS += $(filter-out ../columnar/mod.o,$(ALL_OBJS))
# be explicit about the default target # be explicit about the default target
all: all:

View File

@ -52,6 +52,7 @@
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h" #include "distributed/relation_access_tracking.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h" #include "distributed/shard_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
@ -687,7 +688,7 @@ ConvertTable(TableConversionState *con)
strcmp(con->originalAccessMethod, "columnar") == 0) strcmp(con->originalAccessMethod, "columnar") == 0)
{ {
ColumnarOptions options = { 0 }; ColumnarOptions options = { 0 };
ReadColumnarOptions(con->relationId, &options); ExternReadColumnarOptions(con->relationId, &options);
ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0( ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0(
sizeof(ColumnarTableDDLContext)); sizeof(ColumnarTableDDLContext));
@ -843,7 +844,7 @@ DropIndexesNotSupportedByColumnar(Oid relationId, bool suppressNoticeMessages)
foreach_oid(indexId, indexIdList) foreach_oid(indexId, indexIdList)
{ {
char *indexAmName = GetIndexAccessMethodName(indexId); char *indexAmName = GetIndexAccessMethodName(indexId);
if (ColumnarSupportsIndexAM(indexAmName)) if (ExternColumnarSupportsIndexAM(indexAmName))
{ {
continue; continue;
} }

View File

@ -48,6 +48,7 @@
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/namespace_utils.h" #include "distributed/namespace_utils.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/shared_library_init.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
@ -613,7 +614,7 @@ GetPreLoadTableCreationCommands(Oid relationId,
/* add columnar options for cstore tables */ /* add columnar options for cstore tables */
if (accessMethod == NULL && IsColumnarTableAmTable(relationId)) if (accessMethod == NULL && ExternIsColumnarTableAmTable(relationId))
{ {
TableDDLCommand *cstoreOptionsDDL = ColumnarGetTableOptionsDDL(relationId); TableDDLCommand *cstoreOptionsDDL = ColumnarGetTableOptionsDDL(relationId);
if (cstoreOptionsDDL != NULL) if (cstoreOptionsDDL != NULL)
@ -1047,7 +1048,8 @@ CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
options->chunkRowCount, options->chunkRowCount,
options->stripeRowCount, options->stripeRowCount,
options->compressionLevel, options->compressionLevel,
quote_literal_cstr(CompressionTypeStr(options->compressionType))); quote_literal_cstr(ExternCompressionTypeStr(
options->compressionType)));
return buf.data; return buf.data;
} }
@ -1136,7 +1138,7 @@ ColumnarGetTableOptionsDDL(Oid relationId)
char *relationName = get_rel_name(relationId); char *relationName = get_rel_name(relationId);
ColumnarOptions options = { 0 }; ColumnarOptions options = { 0 };
ReadColumnarOptions(relationId, &options); ExternReadColumnarOptions(relationId, &options);
return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options); return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options);
} }

View File

@ -100,6 +100,37 @@
/* marks shared object as one loadable by the postgres version compiled against */ /* marks shared object as one loadable by the postgres version compiled against */
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
ColumnarSupportsIndexAM_type ExternColumnarSupportsIndexAM = NULL;
CompressionTypeStr_type ExternCompressionTypeStr = NULL;
IsColumnarTableAmTable_type ExternIsColumnarTableAmTable = NULL;
ReadColumnarOptions_type ExternReadColumnarOptions = NULL;
/*
* Define "pass-through" functions so that a SQL function defined as one of
* these symbols in the citus module can use the definition in the columnar
* module.
*/
#define DEFINE_COLUMNAR_PASSTHROUGH_FUNC(funcname) \
static PGFunction CppConcat(extern_, funcname); \
PG_FUNCTION_INFO_V1(funcname); \
Datum funcname(PG_FUNCTION_ARGS) \
{ \
return CppConcat(extern_, funcname)(fcinfo); \
}
#define INIT_COLUMNAR_PASSTHROUGH_SYMBOL(funcname) \
CppConcat(extern_, funcname) = load_external_function( \
COLUMNAR_MODULE_NAME, # funcname, true, &handle)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_handler)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_set)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(alter_columnar_table_reset)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(upgrade_columnar_storage)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(downgrade_columnar_storage)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_relation_storageid)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_storage_info)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(columnar_store_memory_stats)
DEFINE_COLUMNAR_PASSTHROUGH_FUNC(test_columnar_storage_write_new_page)
#define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999 #define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999
static char *CitusVersion = CITUS_VERSION; static char *CitusVersion = CITUS_VERSION;
@ -323,12 +354,6 @@ _PG_init(void)
original_client_auth_hook = ClientAuthentication_hook; original_client_auth_hook = ClientAuthentication_hook;
ClientAuthentication_hook = CitusAuthHook; ClientAuthentication_hook = CitusAuthHook;
/*
* When the options change on a columnar table, we may need to propagate
* the changes to shards.
*/
ColumnarTableSetOptions_hook = ColumnarTableSetOptionsHook;
InitializeMaintenanceDaemon(); InitializeMaintenanceDaemon();
/* initialize coordinated transaction management */ /* initialize coordinated transaction management */
@ -357,7 +382,56 @@ _PG_init(void)
{ {
DoInitialCleanup(); DoInitialCleanup();
} }
columnar_init();
/* ensure columnar module is loaded at the right time */
load_file(COLUMNAR_MODULE_NAME, false);
/*
* Now, acquire symbols from columnar module. First, acquire
* the address of the set options hook, and set it so that we
* can propagate options changes.
*/
ColumnarTableSetOptions_hook_type **ColumnarTableSetOptions_hook_ptr =
(ColumnarTableSetOptions_hook_type **) find_rendezvous_variable(
"ColumnarTableSetOptions_hook");
/* rendezvous variable registered during columnar initialization */
Assert(ColumnarTableSetOptions_hook_ptr != NULL);
Assert(*ColumnarTableSetOptions_hook_ptr != NULL);
**ColumnarTableSetOptions_hook_ptr = ColumnarTableSetOptionsHook;
/*
* Acquire symbols for columnar functions that citus calls.
*/
void *handle = NULL;
ExternColumnarSupportsIndexAM = (ColumnarSupportsIndexAM_type) (void *)
load_external_function(COLUMNAR_MODULE_NAME,
"ColumnarSupportsIndexAM",
true, &handle);
ExternCompressionTypeStr = (CompressionTypeStr_type) (void *)
load_external_function(COLUMNAR_MODULE_NAME,
"CompressionTypeStr",
true, &handle);
ExternIsColumnarTableAmTable = (IsColumnarTableAmTable_type) (void *)
load_external_function(COLUMNAR_MODULE_NAME,
"IsColumnarTableAmTable",
true, &handle);
ExternReadColumnarOptions = (ReadColumnarOptions_type) (void *)
load_external_function(COLUMNAR_MODULE_NAME,
"ReadColumnarOptions",
true, &handle);
/* initialize symbols for "pass-through" functions */
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(columnar_handler);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(alter_columnar_table_set);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(alter_columnar_table_reset);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(upgrade_columnar_storage);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(downgrade_columnar_storage);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(columnar_relation_storageid);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(columnar_storage_info);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(columnar_store_memory_stats);
INIT_COLUMNAR_PASSTHROUGH_SYMBOL(test_columnar_storage_write_new_page);
} }

View File

@ -25,6 +25,8 @@
#include "columnar/columnar_compression.h" #include "columnar/columnar_compression.h"
#include "columnar/columnar_metadata.h" #include "columnar/columnar_metadata.h"
#define COLUMNAR_MODULE_NAME "citus_columnar"
/* Defines for valid option names */ /* Defines for valid option names */
#define OPTION_NAME_COMPRESSION_TYPE "compression" #define OPTION_NAME_COMPRESSION_TYPE "compression"
#define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_limit" #define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_limit"
@ -187,6 +189,10 @@ typedef enum StripeWriteStateEnum
STRIPE_WRITE_IN_PROGRESS STRIPE_WRITE_IN_PROGRESS
} StripeWriteStateEnum; } StripeWriteStateEnum;
typedef bool (*ColumnarSupportsIndexAM_type)(char *);
typedef const char *(*CompressionTypeStr_type)(CompressionType);
typedef bool (*IsColumnarTableAmTable_type)(Oid);
typedef bool (*ReadColumnarOptions_type)(Oid, ColumnarOptions *);
/* ColumnarReadState represents state of a columnar scan. */ /* ColumnarReadState represents state of a columnar scan. */
struct ColumnarReadState; struct ColumnarReadState;
@ -205,7 +211,6 @@ extern int columnar_compression_level;
/* called when the user changes options on the given relation */ /* called when the user changes options on the given relation */
typedef void (*ColumnarTableSetOptions_hook_type)(Oid relid, ColumnarOptions options); typedef void (*ColumnarTableSetOptions_hook_type)(Oid relid, ColumnarOptions options);
extern ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook;
extern void columnar_init(void); extern void columnar_init(void);
extern void columnar_init_gucs(void); extern void columnar_init_gucs(void);
@ -316,5 +321,4 @@ extern bool PendingWritesInUpperTransactions(Oid relfilenode,
SubTransactionId currentSubXid); SubTransactionId currentSubXid);
extern MemoryContext GetWriteContextForDebug(void); extern MemoryContext GetWriteContextForDebug(void);
#endif /* COLUMNAR_H */ #endif /* COLUMNAR_H */

View File

@ -11,10 +11,17 @@
#ifndef SHARED_LIBRARY_INIT_H #ifndef SHARED_LIBRARY_INIT_H
#define SHARED_LIBRARY_INIT_H #define SHARED_LIBRARY_INIT_H
#include "columnar/columnar.h"
#define GUC_STANDARD 0 #define GUC_STANDARD 0
#define MAX_SHARD_COUNT 64000 #define MAX_SHARD_COUNT 64000
#define MAX_SHARD_REPLICATION_FACTOR 100 #define MAX_SHARD_REPLICATION_FACTOR 100
extern ColumnarSupportsIndexAM_type ExternColumnarSupportsIndexAM;
extern CompressionTypeStr_type ExternCompressionTypeStr;
extern IsColumnarTableAmTable_type ExternIsColumnarTableAmTable;
extern ReadColumnarOptions_type ExternReadColumnarOptions;
extern void StartupCitusBackend(void); extern void StartupCitusBackend(void);
#endif /* SHARED_LIBRARY_INIT_H */ #endif /* SHARED_LIBRARY_INIT_H */

View File

@ -185,11 +185,5 @@ SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_sto
f f
(1 row) (1 row)
-- -- make sure citus_columnar can be loaded
-- Will throw error due to redefining GUCs, because citus is already
-- loaded and has already initialized columnar code. This test is to
-- ensure that the citus_columnar module is built and present, and
-- that it can be loaded without link failures.
--
LOAD 'citus_columnar'; LOAD 'citus_columnar';
ERROR: attempt to redefine parameter "columnar.compression"

View File

@ -130,10 +130,5 @@ SELECT COUNT(*)=0 FROM columnar_temp;
-- since we deleted all the rows, we shouldn't have any stripes for table -- since we deleted all the rows, we shouldn't have any stripes for table
SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id); SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id);
-- -- make sure citus_columnar can be loaded
-- Will throw error due to redefining GUCs, because citus is already
-- loaded and has already initialized columnar code. This test is to
-- ensure that the citus_columnar module is built and present, and
-- that it can be loaded without link failures.
--
LOAD 'citus_columnar'; LOAD 'citus_columnar';