Columnar: move DDL hooks to citus to remove dependency. (#5547)

Add a new hook ColumnarTableSetOptions_hook so that citus can get
control when the columnar table options change.
pull/5595/head
jeff-davis 2022-01-04 23:26:46 -08:00 committed by GitHub
parent c9292cfad1
commit 2e03efd91e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 176 additions and 153 deletions

View File

@ -105,6 +105,8 @@ typedef struct IndexFetchColumnarData
} IndexFetchColumnarData;
ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
@ -2298,121 +2300,6 @@ ColumnarCheckLogicalReplication(Relation rel)
}
/*
* CitusCreateAlterColumnarTableSet generates a portable
*/
static char *
CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf,
"SELECT alter_columnar_table_set(%s, "
"chunk_group_row_limit => %d, "
"stripe_row_limit => %lu, "
"compression_level => %d, "
"compression => %s);",
quote_literal_cstr(qualifiedRelationName),
options->chunkRowCount,
options->stripeRowCount,
options->compressionLevel,
quote_literal_cstr(CompressionTypeStr(options->compressionType)));
return buf.data;
}
/*
* GetTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a table. The resulting command will set the
* options of the table to the same options as the relation on the coordinator.
*/
static char *
GetTableDDLCommandColumnar(void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
tableDDLContext->relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* GetShardedTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a shard. The resulting command will set the
* options of the shard to the same options as the relation the shard is based on.
*/
char *
GetShardedTableDDLCommandColumnar(uint64 shardId, void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
/*
* AppendShardId is destructive of the original cahr *, given we want to serialize
* more than once we copy it before appending the shard id.
*/
char *relationName = pstrdup(tableDDLContext->relationName);
AppendShardIdToName(&relationName, shardId);
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that
* will apply the passed columnar options to the relation identified by relationId on a
* new table or shard.
*/
static TableDDLCommand *
ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName,
ColumnarOptions options)
{
ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0(
sizeof(ColumnarTableDDLContext));
/* build the context */
context->schemaName = schemaName;
context->relationName = relationName;
context->options = options;
/* create TableDDLCommand based on the context build above */
return makeTableDDLCommandFunction(
GetTableDDLCommandColumnar,
GetShardedTableDDLCommandColumnar,
context);
}
/*
* ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will
* apply the columnar options currently applicable to the relation identified by
* relationId on a new table or shard.
*/
TableDDLCommand *
ColumnarGetTableOptionsDDL(Oid relationId)
{
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
ColumnarOptions options = { 0 };
ReadColumnarOptions(relationId, &options);
return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options);
}
/*
* alter_columnar_table_set is a UDF exposed in postgres to change settings on a columnar
* table. Calling this function on a non-columnar table gives an error.
@ -2522,18 +2409,9 @@ alter_columnar_table_set(PG_FUNCTION_ARGS)
options.compressionLevel)));
}
if (EnableDDLPropagation && IsCitusTable(relationId))
if (ColumnarTableSetOptions_hook != NULL)
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
ColumnarTableSetOptions_hook(relationId, options);
}
SetColumnarOptions(relationId, &options);
@ -2618,18 +2496,9 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS)
columnar_compression_level)));
}
if (EnableDDLPropagation && IsCitusTable(relationId))
if (ColumnarTableSetOptions_hook != NULL)
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
ColumnarTableSetOptions_hook(relationId, options);
}
SetColumnarOptions(relationId, &options);

View File

@ -1432,3 +1432,26 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}
/*
* ColumnarTableSetOptionsHook propagates columnar table options to shards, if
* necessary.
*/
void
ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options)
{
if (EnableDDLPropagation && IsCitusTable(relationId))
{
/* when a columnar table is distributed update all settings on the shards */
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName,
relationName,
options);
DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command);
ExecuteDistributedDDLJob(ddljob);
}
}

View File

@ -64,8 +64,6 @@
#include "utils/ruleutils.h"
#include "utils/varlena.h"
#include "columnar/columnar_tableam.h"
/* Shard related configuration */
int ShardCount = 32;
int ShardReplicationFactor = 1; /* desired replication factor for shards */
@ -80,6 +78,10 @@ static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_
int indexFlags);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options);
static char * GetTableDDLCommandColumnar(void *context);
static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata);
@ -1018,3 +1020,118 @@ GetTableDDLCommand(TableDDLCommand *command)
/* unreachable: compiler should warn/error when not all cases are covered above */
ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type)));
}
/*
* CitusCreateAlterColumnarTableSet generates a portable
*/
static char *
CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,
const ColumnarOptions *options)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
appendStringInfo(&buf,
"SELECT alter_columnar_table_set(%s, "
"chunk_group_row_limit => %d, "
"stripe_row_limit => %lu, "
"compression_level => %d, "
"compression => %s);",
quote_literal_cstr(qualifiedRelationName),
options->chunkRowCount,
options->stripeRowCount,
options->compressionLevel,
quote_literal_cstr(CompressionTypeStr(options->compressionType)));
return buf.data;
}
/*
* GetTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a table. The resulting command will set the
* options of the table to the same options as the relation on the coordinator.
*/
static char *
GetTableDDLCommandColumnar(void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
tableDDLContext->relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* GetShardedTableDDLCommandColumnar is an internal function used to turn a
* ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql
* command that will be executed against a shard. The resulting command will set the
* options of the shard to the same options as the relation the shard is based on.
*/
char *
GetShardedTableDDLCommandColumnar(uint64 shardId, void *context)
{
ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context;
/*
* AppendShardId is destructive of the original cahr *, given we want to serialize
* more than once we copy it before appending the shard id.
*/
char *relationName = pstrdup(tableDDLContext->relationName);
AppendShardIdToName(&relationName, shardId);
char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName,
relationName);
return CitusCreateAlterColumnarTableSet(qualifiedShardName,
&tableDDLContext->options);
}
/*
* ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that
* will apply the passed columnar options to the relation identified by relationId on a
* new table or shard.
*/
TableDDLCommand *
ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName,
ColumnarOptions options)
{
ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0(
sizeof(ColumnarTableDDLContext));
/* build the context */
context->schemaName = schemaName;
context->relationName = relationName;
context->options = options;
/* create TableDDLCommand based on the context build above */
return makeTableDDLCommandFunction(
GetTableDDLCommandColumnar,
GetShardedTableDDLCommandColumnar,
context);
}
/*
* ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will
* apply the columnar options currently applicable to the relation identified by
* relationId on a new table or shard.
*/
static TableDDLCommand *
ColumnarGetTableOptionsDDL(Oid relationId)
{
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
ColumnarOptions options = { 0 };
ReadColumnarOptions(relationId, &options);
return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options);
}

View File

@ -311,6 +311,12 @@ _PG_init(void)
original_client_auth_hook = ClientAuthentication_hook;
ClientAuthentication_hook = CitusAuthHook;
/*
* When the options change on a columnar table, we may need to propagate
* the changes to shards.
*/
ColumnarTableSetOptions_hook = ColumnarTableSetOptionsHook;
InitializeMaintenanceDaemon();
/* initialize coordinated transaction management */

View File

@ -62,18 +62,6 @@ typedef struct ColumnarOptions
} ColumnarOptions;
/*
* ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction
* instance described below.
*/
typedef struct ColumnarTableDDLContext
{
char *schemaName;
char *relationName;
ColumnarOptions options;
} ColumnarTableDDLContext;
/* ColumnChunkSkipNode contains statistics for a ColumnChunkData. */
typedef struct ColumnChunkSkipNode
{
@ -209,11 +197,16 @@ typedef struct ColumnarReadState ColumnarReadState;
struct ColumnarWriteState;
typedef struct ColumnarWriteState ColumnarWriteState;
/* GUCs */
extern int columnar_compression;
extern int columnar_stripe_row_limit;
extern int columnar_chunk_group_row_limit;
extern int columnar_compression_level;
/* called when the user changes options on the given relation */
typedef void (*ColumnarTableSetOptions_hook_type)(Oid relid, ColumnarOptions options);
extern ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook;
extern void columnar_init_gucs(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString);

View File

@ -60,8 +60,6 @@ extern TableScanDesc columnar_beginscan_extended(Relation relation, Snapshot sna
extern int64 ColumnarScanChunkGroupsFiltered(ColumnarScanDesc columnarScanDesc);
extern bool ColumnarSupportsIndexAM(char *indexAMName);
extern bool IsColumnarTableAmTable(Oid relationId);
extern TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
#endif /* COLUMNAR_TABLEAM_H */

View File

@ -91,6 +91,7 @@ extern void UndistributeDisconnectedCitusLocalTables(void);
extern void NotifyUtilityHookConstraintDropped(void);
extern void ResetConstraintDropped(void);
extern void ExecuteDistributedDDLJob(DDLJob *ddlJob);
extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options);
/* forward declarations for sending custom commands to a distributed table */
extern DDLJob * CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command);

View File

@ -21,6 +21,7 @@
#include "nodes/pg_list.h"
#include "distributed/metadata_utility.h"
#include "columnar/columnar.h"
/*
* In our distributed database, we need a mechanism to make remote procedure
@ -181,6 +182,17 @@ struct TableDDLCommand
};
};
/*
* ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction
* instance described below.
*/
typedef struct ColumnarTableDDLContext
{
char *schemaName;
char *relationName;
ColumnarOptions options;
} ColumnarTableDDLContext;
/* make functions for TableDDLCommand */
extern TableDDLCommand * makeTableDDLCommandString(char *commandStr);
extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function,
@ -190,7 +202,11 @@ extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function,
extern char * GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId,
char *schemaName);
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
extern char * GetTableDDLCommand(TableDDLCommand *command);
extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName,
char *relationName,
ColumnarOptions options);
/* Config variables managed via guc.c */