diff --git a/src/backend/distributed/commands/create_citus_local_table.c b/src/backend/distributed/commands/create_citus_local_table.c index 691f88890..352c3197e 100644 --- a/src/backend/distributed/commands/create_citus_local_table.c +++ b/src/backend/distributed/commands/create_citus_local_table.c @@ -287,8 +287,17 @@ GetShellTableDDLEventsForCitusLocalTable(Oid relationId) */ bool includeSequenceDefaults = true; - List *shellTableDDLEvents = GetFullTableCreationCommands(relationId, - includeSequenceDefaults); + List *tableDDLCommands = GetFullTableCreationCommands(relationId, + includeSequenceDefaults); + + List *shellTableDDLEvents = NIL; + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, tableDDLCommands) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + shellTableDDLEvents = lappend(shellTableDDLEvents, + GetTableDDLCommand(tableDDLCommand)); + } shellTableDDLEvents = list_concat(shellTableDDLEvents, foreignConstraintCommands); return shellTableDDLEvents; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index a57940aaa..a8f15da81 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1639,7 +1639,7 @@ UndistributeTable(Oid relationId) ereport(ERROR, (errmsg("could not run SPI query"))); } preLoadCommands = lappend(preLoadCommands, - attachPartitionCommand); + makeTableDDLCommandString(attachPartitionCommand)); UndistributeTable(partitionRelationId); } } @@ -1648,26 +1648,31 @@ UndistributeTable(Oid relationId) uint32 hashOfName = hash_any((unsigned char *) tempName, strlen(tempName)); AppendShardIdToName(&tempName, hashOfName); - char *tableCreationCommand = NULL; ereport(NOTICE, (errmsg("creating a new local table for %s", quote_qualified_identifier(schemaName, relationName)))); + TableDDLCommand *tableCreationCommand = NULL; foreach_ptr(tableCreationCommand, preLoadCommands) { - Node *parseTree = ParseTreeNode(tableCreationCommand); + Assert(CitusIsA(tableCreationCommand, TableDDLCommand)); + + char *tableCreationSql = GetTableDDLCommand(tableCreationCommand); + Node *parseTree = ParseTreeNode(tableCreationSql); RelayEventExtendNames(parseTree, schemaName, hashOfName); - CitusProcessUtility(parseTree, tableCreationCommand, PROCESS_UTILITY_TOPLEVEL, + CitusProcessUtility(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); } ReplaceTable(relationId, get_relname_relid(tempName, schemaId)); - char *tableConstructionCommand = NULL; + TableDDLCommand *tableConstructionCommand = NULL; foreach_ptr(tableConstructionCommand, postLoadCommands) { - spiResult = SPI_execute(tableConstructionCommand, false, 0); + Assert(CitusIsA(tableConstructionCommand, TableDDLCommand)); + char *tableConstructionSQL = GetTableDDLCommand(tableConstructionCommand); + spiResult = SPI_execute(tableConstructionSQL, false, 0); if (spiResult != SPI_OK_UTILITY) { ereport(ERROR, (errmsg("could not run SPI query"))); @@ -1707,7 +1712,7 @@ GetViewCreationCommandsOfTable(Oid relationId) "CREATE VIEW %s AS %s", qualifiedViewName, viewDefinition); - commands = lappend(commands, query->data); + commands = lappend(commands, makeTableDDLCommandString(query->data)); } return commands; } diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index 42f52605f..88d5402f8 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -26,6 +26,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" +#include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -75,8 +76,9 @@ GetExplicitTriggerCommandList(Oid relationId) { char *createTriggerCommand = pg_get_triggerdef_command(triggerId); - createTriggerCommandList = lappend(createTriggerCommandList, - createTriggerCommand); + createTriggerCommandList = lappend( + createTriggerCommandList, + makeTableDDLCommandString(createTriggerCommand)); } /* revert back to original search_path */ diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 8e5394f72..93a222409 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -413,8 +413,16 @@ MetadataCreateCommands(void) metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, workerSequenceDDLCommands); - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - ddlCommandList); + + /* ddlCommandList contains TableDDLCommand information, need to materialize */ + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, ddlCommandList) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, + GetTableDDLCommand(tableDDLCommand)); + } + metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, tableOwnerResetCommand); metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, @@ -510,10 +518,18 @@ GetDistributedTableDDLEvents(Oid relationId) List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); commandList = list_concat(commandList, sequenceDDLCommands); - /* commands to create the table */ + /* + * Commands to create the table, these commands are TableDDLCommands so lets + * materialize to the non-sharded version + */ List *tableDDLCommands = GetFullTableCreationCommands(relationId, includeSequenceDefaults); - commandList = list_concat(commandList, tableDDLCommands); + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, tableDDLCommands) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); + } /* command to associate sequences with table */ List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index ef8cf1b50..17c385f62 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -244,8 +244,9 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) (ListCellAndListWrapper *) functionContext->user_fctx; if (wrapper->listCell != NULL) { - char *ddlStatement = (char *) lfirst(wrapper->listCell); - text *ddlStatementText = cstring_to_text(ddlStatement); + TableDDLCommand *ddlStatement = (TableDDLCommand *) lfirst(wrapper->listCell); + Assert(CitusIsA(ddlStatement, TableDDLCommand)); + text *ddlStatementText = cstring_to_text(GetTableDDLCommand(ddlStatement)); wrapper->listCell = lnext_compat(wrapper->list, wrapper->listCell); @@ -595,8 +596,9 @@ GetTableReplicaIdentityCommand(Oid relationId) if (replicaIdentityCreateCommand) { - replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList, - replicaIdentityCreateCommand); + replicaIdentityCreateCommandList = lappend( + replicaIdentityCreateCommandList, + makeTableDDLCommandString(replicaIdentityCreateCommand)); } return replicaIdentityCreateCommandList; @@ -624,9 +626,11 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults) if (extensionDef != NULL) { - tableDDLEventList = lappend(tableDDLEventList, extensionDef); + tableDDLEventList = lappend(tableDDLEventList, + makeTableDDLCommandString(extensionDef)); } - tableDDLEventList = lappend(tableDDLEventList, serverDef); + tableDDLEventList = lappend(tableDDLEventList, + makeTableDDLCommandString(serverDef)); } /* fetch table schema and column option definitions */ @@ -634,16 +638,19 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults) includeSequenceDefaults); char *tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId); - tableDDLEventList = lappend(tableDDLEventList, tableSchemaDef); + tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString( + tableSchemaDef)); if (tableColumnOptionsDef != NULL) { - tableDDLEventList = lappend(tableDDLEventList, tableColumnOptionsDef); + tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString( + tableColumnOptionsDef)); } char *tableOwnerDef = TableOwnerResetCommand(relationId); if (tableOwnerDef != NULL) { - tableDDLEventList = lappend(tableDDLEventList, tableOwnerDef); + tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString( + tableOwnerDef)); } List *policyCommands = CreatePolicyCommands(relationId); @@ -676,7 +683,8 @@ GetTableIndexAndConstraintCommands(Oid relationId) BTEqualStrategyNumber, F_OIDEQ, relationId); SysScanDesc scanDescriptor = systable_beginscan(pgIndex, - IndexIndrelidIndexId, true, /* indexOK */ + IndexIndrelidIndexId, + true, /* indexOK */ NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); @@ -702,7 +710,8 @@ GetTableIndexAndConstraintCommands(Oid relationId) } /* append found constraint or index definition to the list */ - indexDDLEventList = lappend(indexDDLEventList, statementDef); + indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString( + statementDef)); /* if table is clustered on this index, append definition to the list */ if (indexForm->indisclustered) @@ -710,7 +719,8 @@ GetTableIndexAndConstraintCommands(Oid relationId) char *clusteredDef = pg_get_indexclusterdef_string(indexId); Assert(clusteredDef != NULL); - indexDDLEventList = lappend(indexDDLEventList, clusteredDef); + indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString( + clusteredDef)); } heapTuple = systable_getnext(scanDescriptor); @@ -841,3 +851,105 @@ DistributedTableReplicationIsEnabled() { return (ShardReplicationFactor > 1); } + + +/* + * makeTableDDLCommandString creates a TableDDLCommand based on a constant string. If the + * TableDDLCommand is turned into a sharded table command the constant will be wrapped in + * worker_apply_shard_ddl_command with the target shardId. If the command applies to an + * un-sharded table (eg. mx) the command is applied as is. + */ +TableDDLCommand * +makeTableDDLCommandString(char *commandStr) +{ + TableDDLCommand *command = CitusMakeNode(TableDDLCommand); + + command->type = TABLE_DDL_COMMAND_STRING; + command->commandStr = commandStr; + + return command; +} + + +/* + * GetShardedTableDDLCommandString is the internal function for TableDDLCommand objects + * created with makeTableDDLCommandString. + */ +static char * +GetShardedTableDDLCommandString(TableDDLCommand *command, uint64 shardId, + char *schemaName) +{ + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + Assert(command->type == TABLE_DDL_COMMAND_STRING); + + char *escapedDDLCommand = quote_literal_cstr(command->commandStr); + + if (schemaName != NULL && strcmp(schemaName, "public") != 0) + { + char *escapedSchemaName = quote_literal_cstr(schemaName); + appendStringInfo(&buf, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedSchemaName, + escapedDDLCommand); + } + else + { + appendStringInfo(&buf, WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId, + escapedDDLCommand); + } + + return buf.data; +} + + +/* + * GetTableDDLCommandString is the internal function for TableDDLCommand objects created + * with makeTableDDLCommandString to return the non-sharded version of the ddl command. + */ +static char * +GetTableDDLCommandString(TableDDLCommand *command) +{ + Assert(command->type == TABLE_DDL_COMMAND_STRING); + return command->commandStr; +} + + +/* + * GetShardedTableDDLCommand returns the ddl command expressed by this TableDDLCommand + * where all applicable names are transformed into the names for a shard identified by + * shardId + */ +char * +GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, char *schemaName) +{ + switch (command->type) + { + case TABLE_DDL_COMMAND_STRING: + { + return GetShardedTableDDLCommandString(command, shardId, schemaName); + } + } + + /* unreachable: compiler should warn/error when not all cases are covered above */ + ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type))); +} + + +/* + * GetTableDDLCommand returns the ddl command expressed by this TableDDLCommand where all + * table names are targeting the base table, not any shards. + */ +char * +GetTableDDLCommand(TableDDLCommand *command) +{ + switch (command->type) + { + case TABLE_DDL_COMMAND_STRING: + { + return GetTableDDLCommandString(command); + } + } + + /* unreachable: compiler should warn/error when not all cases are covered above */ + ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type))); +} diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 79b355ce8..75d12b530 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -979,7 +979,7 @@ RecreateTableDDLCommandList(Oid relationId) "table"))); } - List *dropCommandList = list_make1(dropCommand->data); + List *dropCommandList = list_make1(makeTableDDLCommandString(dropCommand->data)); List *createCommandList = GetPreLoadTableCreationCommands(relationId, includeSequenceDefaults); List *recreateCommandList = list_concat(dropCommandList, createCommandList); @@ -996,20 +996,15 @@ RecreateTableDDLCommandList(Oid relationId) static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId) { - List *applyDdlCommandList = NIL; + List *applyDDLCommandList = NIL; - const char *ddlCommand = NULL; + TableDDLCommand *ddlCommand = NULL; foreach_ptr(ddlCommand, ddlCommandList) { - char *escapedDdlCommand = quote_literal_cstr(ddlCommand); - - StringInfo applyDdlCommand = makeStringInfo(); - appendStringInfo(applyDdlCommand, - WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, - shardId, escapedDdlCommand); - - applyDdlCommandList = lappend(applyDdlCommandList, applyDdlCommand->data); + Assert(CitusIsA(ddlCommand, TableDDLCommand)); + char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId, NULL); + applyDDLCommandList = lappend(applyDDLCommandList, applyDDLCommand); } - return applyDdlCommandList; + return applyDDLCommandList; } diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 0793170dc..71c178d16 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -678,25 +678,13 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, char *schemaName = get_namespace_name(schemaId); char *escapedSchemaName = quote_literal_cstr(schemaName); - const char *ddlCommand = NULL; + TableDDLCommand *ddlCommand = NULL; foreach_ptr(ddlCommand, ddlCommandList) { - char *escapedDDLCommand = quote_literal_cstr(ddlCommand); - StringInfo applyDDLCommand = makeStringInfo(); - - if (strcmp(schemaName, "public") != 0) - { - appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, - escapedSchemaName, escapedDDLCommand); - } - else - { - appendStringInfo(applyDDLCommand, - WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId, - escapedDDLCommand); - } - - commandList = lappend(commandList, applyDDLCommand->data); + Assert(CitusIsA(ddlCommand, TableDDLCommand)); + char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId, + schemaName); + commandList = lappend(commandList, applyDDLCommand); } const char *command = NULL; diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 2efd9b003..6b8635e13 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -15,6 +15,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodes.h" #include "distributed/citus_nodefuncs.h" +#include "distributed/coordinator_protocol.h" #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/metadata_cache.h" @@ -45,7 +46,8 @@ static const char *CitusNodeTagNamesD[] = { "RelationShard", "RelationRowLock", "DeferredErrorMessage", - "GroupShardPlacement" + "GroupShardPlacement", + "TableDDLCommand" }; const char **CitusNodeTagNames = CitusNodeTagNamesD; @@ -411,7 +413,8 @@ const ExtensibleNodeMethods nodeMethods[] = DEFINE_NODE_METHODS_NO_READ(MultiJoin), DEFINE_NODE_METHODS_NO_READ(MultiPartition), DEFINE_NODE_METHODS_NO_READ(MultiCartesianProduct), - DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp) + DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp), + DEFINE_NODE_METHODS_NO_READ(TableDDLCommand) }; void diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 1043c06a7..b02c7daf7 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -24,6 +24,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/coordinator_protocol.h" #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/multi_logical_planner.h" @@ -564,3 +565,20 @@ OutDeferredErrorMessage(OUTFUNC_ARGS) WRITE_INT_FIELD(linenumber); WRITE_STRING_FIELD(functionname); } + + +void +OutTableDDLCommand(OUTFUNC_ARGS) +{ + WRITE_LOCALS(TableDDLCommand); + WRITE_NODE_TYPE("TableDDLCommand"); + + switch (node->type) + { + case TABLE_DDL_COMMAND_STRING: + { + WRITE_STRING_FIELD(commandStr); + break; + } + } +} diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index 9dbafeb6f..caeda3a72 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -62,6 +62,7 @@ extern void OutMultiJoin(OUTFUNC_ARGS); extern void OutMultiPartition(OUTFUNC_ARGS); extern void OutMultiCartesianProduct(OUTFUNC_ARGS); extern void OutMultiExtendedOp(OUTFUNC_ARGS); +extern void OutTableDDLCommand(OUTFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS); extern void CopyNodeDistributedPlan(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index fe244082c..888133a89 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -65,7 +65,8 @@ typedef enum CitusNodeTag T_RelationShard, T_RelationRowLock, T_DeferredErrorMessage, - T_GroupShardPlacement + T_GroupShardPlacement, + T_TableDDLCommand } CitusNodeTag; diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index a0efc6cea..9bc819265 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -83,6 +83,59 @@ typedef enum SHARD_PLACEMENT_RANDOM = 3 } ShardPlacementPolicyType; +/* + * TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in + * TableDDLCpmmand for details. + */ +typedef enum TableDDLCommandType +{ + TABLE_DDL_COMMAND_STRING, +} TableDDLCommandType; + + +/* + * TableDDLCommand holds the definition of a command to be executed to bring the table and + * or shard into a certain state. The command needs to be able to serialized into two + * versions: + * - one version should have the vanilla commands operating on the base table. These are + * used for example to create the MX table shards + * - the second versions should replace all identifiers with an identifier containing the + * shard id. + * + * Current implementations are + * - command string, created via makeTableDDLCommandString. This variant contains a ddl + * command that will be wrapped in `worker_apply_shard_ddl_command` when applied + * against a shard. + */ +typedef struct TableDDLCommand +{ + CitusNode node; + + /* encoding the type this TableDDLCommand contains */ + TableDDLCommandType type; + + /* + * This union contains one (1) typed field for every implementation for + * TableDDLCommand. A union enforces no overloading of fields but instead requiers at + * most one of the fields to be used at any time. + */ + union + { + /* + * CommandStr is used when type is set to TABLE_DDL_COMMAND_STRING. It holds the + * sql ddl command string representing the ddl command. + */ + char *commandStr; + }; +} TableDDLCommand; + +/* make functions for TableDDLCommand */ +extern TableDDLCommand * makeTableDDLCommandString(char *commandStr); + +extern char * GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, + char *schemaName); +extern char * GetTableDDLCommand(TableDDLCommand *command); + /* Config variables managed via guc.c */ extern int ShardCount;