From 2e03efd91e25b78480b11f7de3f868f2e43f4f91 Mon Sep 17 00:00:00 2001 From: jeff-davis Date: Tue, 4 Jan 2022 23:26:46 -0800 Subject: [PATCH] Columnar: move DDL hooks to citus to remove dependency. (#5547) Add a new hook ColumnarTableSetOptions_hook so that citus can get control when the columnar table options change. --- src/backend/columnar/columnar_tableam.c | 143 +----------------- .../distributed/commands/utility_hook.c | 23 +++ .../distributed/operations/node_protocol.c | 121 ++++++++++++++- src/backend/distributed/shared_library_init.c | 6 + src/include/columnar/columnar.h | 17 +-- src/include/columnar/columnar_tableam.h | 2 - .../distributed/commands/utility_hook.h | 1 + .../distributed/coordinator_protocol.h | 16 ++ 8 files changed, 176 insertions(+), 153 deletions(-) diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index df1d28584..c56c3192b 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -105,6 +105,8 @@ typedef struct IndexFetchColumnarData } IndexFetchColumnarData; +ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook = NULL; + static object_access_hook_type PrevObjectAccessHook = NULL; static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; @@ -2298,121 +2300,6 @@ ColumnarCheckLogicalReplication(Relation rel) } -/* - * CitusCreateAlterColumnarTableSet generates a portable - */ -static char * -CitusCreateAlterColumnarTableSet(char *qualifiedRelationName, - const ColumnarOptions *options) -{ - StringInfoData buf = { 0 }; - initStringInfo(&buf); - - appendStringInfo(&buf, - "SELECT alter_columnar_table_set(%s, " - "chunk_group_row_limit => %d, " - "stripe_row_limit => %lu, " - "compression_level => %d, " - "compression => %s);", - quote_literal_cstr(qualifiedRelationName), - options->chunkRowCount, - options->stripeRowCount, - options->compressionLevel, - quote_literal_cstr(CompressionTypeStr(options->compressionType))); - - return buf.data; -} - - -/* - * GetTableDDLCommandColumnar is an internal function used to turn a - * ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql - * command that will be executed against a table. The resulting command will set the - * options of the table to the same options as the relation on the coordinator. - */ -static char * -GetTableDDLCommandColumnar(void *context) -{ - ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context; - - char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName, - tableDDLContext->relationName); - - return CitusCreateAlterColumnarTableSet(qualifiedShardName, - &tableDDLContext->options); -} - - -/* - * GetShardedTableDDLCommandColumnar is an internal function used to turn a - * ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql - * command that will be executed against a shard. The resulting command will set the - * options of the shard to the same options as the relation the shard is based on. - */ -char * -GetShardedTableDDLCommandColumnar(uint64 shardId, void *context) -{ - ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context; - - /* - * AppendShardId is destructive of the original cahr *, given we want to serialize - * more than once we copy it before appending the shard id. - */ - char *relationName = pstrdup(tableDDLContext->relationName); - AppendShardIdToName(&relationName, shardId); - - char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName, - relationName); - - return CitusCreateAlterColumnarTableSet(qualifiedShardName, - &tableDDLContext->options); -} - - -/* - * ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that - * will apply the passed columnar options to the relation identified by relationId on a - * new table or shard. - */ -static TableDDLCommand * -ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName, - ColumnarOptions options) -{ - ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0( - sizeof(ColumnarTableDDLContext)); - - /* build the context */ - context->schemaName = schemaName; - context->relationName = relationName; - context->options = options; - - /* create TableDDLCommand based on the context build above */ - return makeTableDDLCommandFunction( - GetTableDDLCommandColumnar, - GetShardedTableDDLCommandColumnar, - context); -} - - -/* - * ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will - * apply the columnar options currently applicable to the relation identified by - * relationId on a new table or shard. - */ -TableDDLCommand * -ColumnarGetTableOptionsDDL(Oid relationId) -{ - Oid namespaceId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(namespaceId); - char *relationName = get_rel_name(relationId); - - ColumnarOptions options = { 0 }; - ReadColumnarOptions(relationId, &options); - - return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options); -} - - /* * alter_columnar_table_set is a UDF exposed in postgres to change settings on a columnar * table. Calling this function on a non-columnar table gives an error. @@ -2522,18 +2409,9 @@ alter_columnar_table_set(PG_FUNCTION_ARGS) options.compressionLevel))); } - if (EnableDDLPropagation && IsCitusTable(relationId)) + if (ColumnarTableSetOptions_hook != NULL) { - /* when a columnar table is distributed update all settings on the shards */ - Oid namespaceId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(namespaceId); - char *relationName = get_rel_name(relationId); - TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName, - relationName, - options); - DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command); - - ExecuteDistributedDDLJob(ddljob); + ColumnarTableSetOptions_hook(relationId, options); } SetColumnarOptions(relationId, &options); @@ -2618,18 +2496,9 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS) columnar_compression_level))); } - if (EnableDDLPropagation && IsCitusTable(relationId)) + if (ColumnarTableSetOptions_hook != NULL) { - /* when a columnar table is distributed update all settings on the shards */ - Oid namespaceId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(namespaceId); - char *relationName = get_rel_name(relationId); - TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName, - relationName, - options); - DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command); - - ExecuteDistributedDDLJob(ddljob); + ColumnarTableSetOptions_hook(relationId, options); } SetColumnarOptions(relationId, &options); diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index aae37b7d4..3bcdfed86 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1432,3 +1432,26 @@ DropSchemaOrDBInProgress(void) { return activeDropSchemaOrDBs > 0; } + + +/* + * ColumnarTableSetOptionsHook propagates columnar table options to shards, if + * necessary. + */ +void +ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options) +{ + if (EnableDDLPropagation && IsCitusTable(relationId)) + { + /* when a columnar table is distributed update all settings on the shards */ + Oid namespaceId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(namespaceId); + char *relationName = get_rel_name(relationId); + TableDDLCommand *command = ColumnarGetCustomTableOptionsDDL(schemaName, + relationName, + options); + DDLJob *ddljob = CreateCustomDDLTaskList(relationId, command); + + ExecuteDistributedDDLJob(ddljob); + } +} diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 9974fe56a..8f6b2d5bd 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -64,8 +64,6 @@ #include "utils/ruleutils.h" #include "utils/varlena.h" -#include "columnar/columnar_tableam.h" - /* Shard related configuration */ int ShardCount = 32; int ShardReplicationFactor = 1; /* desired replication factor for shards */ @@ -80,6 +78,10 @@ static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_ int indexFlags); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); +static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName, + const ColumnarOptions *options); +static char * GetTableDDLCommandColumnar(void *context); +static TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_get_table_metadata); @@ -1018,3 +1020,118 @@ GetTableDDLCommand(TableDDLCommand *command) /* unreachable: compiler should warn/error when not all cases are covered above */ ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type))); } + + +/* + * CitusCreateAlterColumnarTableSet generates a portable + */ +static char * +CitusCreateAlterColumnarTableSet(char *qualifiedRelationName, + const ColumnarOptions *options) +{ + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + appendStringInfo(&buf, + "SELECT alter_columnar_table_set(%s, " + "chunk_group_row_limit => %d, " + "stripe_row_limit => %lu, " + "compression_level => %d, " + "compression => %s);", + quote_literal_cstr(qualifiedRelationName), + options->chunkRowCount, + options->stripeRowCount, + options->compressionLevel, + quote_literal_cstr(CompressionTypeStr(options->compressionType))); + + return buf.data; +} + + +/* + * GetTableDDLCommandColumnar is an internal function used to turn a + * ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql + * command that will be executed against a table. The resulting command will set the + * options of the table to the same options as the relation on the coordinator. + */ +static char * +GetTableDDLCommandColumnar(void *context) +{ + ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context; + + char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName, + tableDDLContext->relationName); + + return CitusCreateAlterColumnarTableSet(qualifiedShardName, + &tableDDLContext->options); +} + + +/* + * GetShardedTableDDLCommandColumnar is an internal function used to turn a + * ColumnarTableDDLContext stored on the context of a TableDDLCommandFunction into a sql + * command that will be executed against a shard. The resulting command will set the + * options of the shard to the same options as the relation the shard is based on. + */ +char * +GetShardedTableDDLCommandColumnar(uint64 shardId, void *context) +{ + ColumnarTableDDLContext *tableDDLContext = (ColumnarTableDDLContext *) context; + + /* + * AppendShardId is destructive of the original cahr *, given we want to serialize + * more than once we copy it before appending the shard id. + */ + char *relationName = pstrdup(tableDDLContext->relationName); + AppendShardIdToName(&relationName, shardId); + + char *qualifiedShardName = quote_qualified_identifier(tableDDLContext->schemaName, + relationName); + + return CitusCreateAlterColumnarTableSet(qualifiedShardName, + &tableDDLContext->options); +} + + +/* + * ColumnarGetCustomTableOptionsDDL returns a TableDDLCommand representing a command that + * will apply the passed columnar options to the relation identified by relationId on a + * new table or shard. + */ +TableDDLCommand * +ColumnarGetCustomTableOptionsDDL(char *schemaName, char *relationName, + ColumnarOptions options) +{ + ColumnarTableDDLContext *context = (ColumnarTableDDLContext *) palloc0( + sizeof(ColumnarTableDDLContext)); + + /* build the context */ + context->schemaName = schemaName; + context->relationName = relationName; + context->options = options; + + /* create TableDDLCommand based on the context build above */ + return makeTableDDLCommandFunction( + GetTableDDLCommandColumnar, + GetShardedTableDDLCommandColumnar, + context); +} + + +/* + * ColumnarGetTableOptionsDDL returns a TableDDLCommand representing a command that will + * apply the columnar options currently applicable to the relation identified by + * relationId on a new table or shard. + */ +static TableDDLCommand * +ColumnarGetTableOptionsDDL(Oid relationId) +{ + Oid namespaceId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(namespaceId); + char *relationName = get_rel_name(relationId); + + ColumnarOptions options = { 0 }; + ReadColumnarOptions(relationId, &options); + + return ColumnarGetCustomTableOptionsDDL(schemaName, relationName, options); +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 7ab700af6..8d82e4d69 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -311,6 +311,12 @@ _PG_init(void) original_client_auth_hook = ClientAuthentication_hook; ClientAuthentication_hook = CitusAuthHook; + /* + * When the options change on a columnar table, we may need to propagate + * the changes to shards. + */ + ColumnarTableSetOptions_hook = ColumnarTableSetOptionsHook; + InitializeMaintenanceDaemon(); /* initialize coordinated transaction management */ diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index fff84503b..5195cbfee 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -62,18 +62,6 @@ typedef struct ColumnarOptions } ColumnarOptions; -/* - * ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction - * instance described below. - */ -typedef struct ColumnarTableDDLContext -{ - char *schemaName; - char *relationName; - ColumnarOptions options; -} ColumnarTableDDLContext; - - /* ColumnChunkSkipNode contains statistics for a ColumnChunkData. */ typedef struct ColumnChunkSkipNode { @@ -209,11 +197,16 @@ typedef struct ColumnarReadState ColumnarReadState; struct ColumnarWriteState; typedef struct ColumnarWriteState ColumnarWriteState; +/* GUCs */ extern int columnar_compression; extern int columnar_stripe_row_limit; extern int columnar_chunk_group_row_limit; extern int columnar_compression_level; +/* called when the user changes options on the given relation */ +typedef void (*ColumnarTableSetOptions_hook_type)(Oid relid, ColumnarOptions options); +extern ColumnarTableSetOptions_hook_type ColumnarTableSetOptions_hook; + extern void columnar_init_gucs(void); extern CompressionType ParseCompressionType(const char *compressionTypeString); diff --git a/src/include/columnar/columnar_tableam.h b/src/include/columnar/columnar_tableam.h index 6418e3dac..9b03da3b0 100644 --- a/src/include/columnar/columnar_tableam.h +++ b/src/include/columnar/columnar_tableam.h @@ -60,8 +60,6 @@ extern TableScanDesc columnar_beginscan_extended(Relation relation, Snapshot sna extern int64 ColumnarScanChunkGroupsFiltered(ColumnarScanDesc columnarScanDesc); extern bool ColumnarSupportsIndexAM(char *indexAMName); extern bool IsColumnarTableAmTable(Oid relationId); -extern TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId); -extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context); #endif /* COLUMNAR_TABLEAM_H */ diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index fc209140b..9ead0df8b 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -91,6 +91,7 @@ extern void UndistributeDisconnectedCitusLocalTables(void); extern void NotifyUtilityHookConstraintDropped(void); extern void ResetConstraintDropped(void); extern void ExecuteDistributedDDLJob(DDLJob *ddlJob); +extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options); /* forward declarations for sending custom commands to a distributed table */ extern DDLJob * CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command); diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 948770112..cceb12c40 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -21,6 +21,7 @@ #include "nodes/pg_list.h" #include "distributed/metadata_utility.h" +#include "columnar/columnar.h" /* * In our distributed database, we need a mechanism to make remote procedure @@ -181,6 +182,17 @@ struct TableDDLCommand }; }; +/* + * ColumnarTableDDLContext holds the instance variable for the TableDDLCommandFunction + * instance described below. + */ +typedef struct ColumnarTableDDLContext +{ + char *schemaName; + char *relationName; + ColumnarOptions options; +} ColumnarTableDDLContext; + /* make functions for TableDDLCommand */ extern TableDDLCommand * makeTableDDLCommandString(char *commandStr); extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function, @@ -190,7 +202,11 @@ extern TableDDLCommand * makeTableDDLCommandFunction(TableDDLFunction function, extern char * GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, char *schemaName); +extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context); extern char * GetTableDDLCommand(TableDDLCommand *command); +extern TableDDLCommand * ColumnarGetCustomTableOptionsDDL(char *schemaName, + char *relationName, + ColumnarOptions options); /* Config variables managed via guc.c */