refactor table ddl events scoped for shards (#4342)

Refactor internals on how Citus creates the SQL commands it sends to recreate shards.

Before Citus collected solely ddl commands as `char *`'s to recreate a table. If they were used to create a shard they were wrapped with `worker_apply_shard_ddl_command` and send to the workers. On the workers the UDF wrapping the ddl command would rewrite the parsetree to replace tables names with their shard name equivalent.

This worked well, but poses an issue when adding columnar. Due to limitations in Postgres on creating custom options on table access methods we need to fall back on a UDF to set columnar specific options. Now, to recreate the table, we can not longer rely on having solely DDL statements to recreate a table.

A prototype was made to run this UDF wrapped in `worker_apply_shard_ddl_command`. This became pretty messy, hard to understand and subsequently hard to maintain.

This PR proposes a refactor of the internal representation of table ddl commands into a `TableDDLCommand` structure. The current implementation only supports a `char *` as its contents. Based on the use of the DDL statement (eg. creating the table -mx- or creating a shard) one of two different functions can be called to get the statement to send to the worker:
 - `GetTableDDLCommand(TableDDLCommand *command)`: This function returns that ddl command to create the table. In this implementation it will just return the `char *`. This has the same functionality as getting the old list and not wrapping it.
 - `GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, char *schemaName)`: This function returns the ddl command wrapped in `worker_apply_shard_ddl_command` with the `shardId` as an argument. Due to backwards compatibility it also accepts a. `schemaName`. The exact purpose is not directly clear. Ideally new implementations would work with fully qualified statements and ignore the `schemaName`.

A future implementation could accept 2.function pointers and a `void *` for context to let the two pointers work on. This gives greater flexibility in controlling what commands get send in which situations. Also, in a future, we could implement the intermediate step of creating the `parsetree` datastructure of statements based on the contents in the catalog with a corresponding deparser. For sharded queries a mutator could be ran over the parsetree to rewrite the tablenames to the names with the shard identifier. This will completely omit the requirement for `worker_apply_shard_ddl_command`.
pull/4350/head
Nils Dijk 2020-11-26 13:31:59 +01:00 committed by GitHub
parent 83020f444e
commit 326e6afa53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 262 additions and 59 deletions

View File

@ -287,8 +287,17 @@ GetShellTableDDLEventsForCitusLocalTable(Oid relationId)
*/
bool includeSequenceDefaults = true;
List *shellTableDDLEvents = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
List *shellTableDDLEvents = NIL;
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
shellTableDDLEvents = lappend(shellTableDDLEvents,
GetTableDDLCommand(tableDDLCommand));
}
shellTableDDLEvents = list_concat(shellTableDDLEvents, foreignConstraintCommands);
return shellTableDDLEvents;

View File

@ -1639,7 +1639,7 @@ UndistributeTable(Oid relationId)
ereport(ERROR, (errmsg("could not run SPI query")));
}
preLoadCommands = lappend(preLoadCommands,
attachPartitionCommand);
makeTableDDLCommandString(attachPartitionCommand));
UndistributeTable(partitionRelationId);
}
}
@ -1648,26 +1648,31 @@ UndistributeTable(Oid relationId)
uint32 hashOfName = hash_any((unsigned char *) tempName, strlen(tempName));
AppendShardIdToName(&tempName, hashOfName);
char *tableCreationCommand = NULL;
ereport(NOTICE, (errmsg("creating a new local table for %s",
quote_qualified_identifier(schemaName, relationName))));
TableDDLCommand *tableCreationCommand = NULL;
foreach_ptr(tableCreationCommand, preLoadCommands)
{
Node *parseTree = ParseTreeNode(tableCreationCommand);
Assert(CitusIsA(tableCreationCommand, TableDDLCommand));
char *tableCreationSql = GetTableDDLCommand(tableCreationCommand);
Node *parseTree = ParseTreeNode(tableCreationSql);
RelayEventExtendNames(parseTree, schemaName, hashOfName);
CitusProcessUtility(parseTree, tableCreationCommand, PROCESS_UTILITY_TOPLEVEL,
CitusProcessUtility(parseTree, tableCreationSql, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
}
ReplaceTable(relationId, get_relname_relid(tempName, schemaId));
char *tableConstructionCommand = NULL;
TableDDLCommand *tableConstructionCommand = NULL;
foreach_ptr(tableConstructionCommand, postLoadCommands)
{
spiResult = SPI_execute(tableConstructionCommand, false, 0);
Assert(CitusIsA(tableConstructionCommand, TableDDLCommand));
char *tableConstructionSQL = GetTableDDLCommand(tableConstructionCommand);
spiResult = SPI_execute(tableConstructionSQL, false, 0);
if (spiResult != SPI_OK_UTILITY)
{
ereport(ERROR, (errmsg("could not run SPI query")));
@ -1707,7 +1712,7 @@ GetViewCreationCommandsOfTable(Oid relationId)
"CREATE VIEW %s AS %s",
qualifiedViewName,
viewDefinition);
commands = lappend(commands, query->data);
commands = lappend(commands, makeTableDDLCommandString(query->data));
}
return commands;
}

View File

@ -26,6 +26,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
@ -75,8 +76,9 @@ GetExplicitTriggerCommandList(Oid relationId)
{
char *createTriggerCommand = pg_get_triggerdef_command(triggerId);
createTriggerCommandList = lappend(createTriggerCommandList,
createTriggerCommand);
createTriggerCommandList = lappend(
createTriggerCommandList,
makeTableDDLCommandString(createTriggerCommand));
}
/* revert back to original search_path */

View File

@ -413,8 +413,16 @@ MetadataCreateCommands(void)
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
ddlCommandList);
/* ddlCommandList contains TableDDLCommand information, need to materialize */
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, ddlCommandList)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
GetTableDDLCommand(tableDDLCommand));
}
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
tableOwnerResetCommand);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
@ -510,10 +518,18 @@ GetDistributedTableDDLEvents(Oid relationId)
List *sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
commandList = list_concat(commandList, sequenceDDLCommands);
/* commands to create the table */
/*
* Commands to create the table, these commands are TableDDLCommands so lets
* materialize to the non-sharded version
*/
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
commandList = list_concat(commandList, tableDDLCommands);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
/* command to associate sequences with table */
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);

View File

@ -244,8 +244,9 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
(ListCellAndListWrapper *) functionContext->user_fctx;
if (wrapper->listCell != NULL)
{
char *ddlStatement = (char *) lfirst(wrapper->listCell);
text *ddlStatementText = cstring_to_text(ddlStatement);
TableDDLCommand *ddlStatement = (TableDDLCommand *) lfirst(wrapper->listCell);
Assert(CitusIsA(ddlStatement, TableDDLCommand));
text *ddlStatementText = cstring_to_text(GetTableDDLCommand(ddlStatement));
wrapper->listCell = lnext_compat(wrapper->list, wrapper->listCell);
@ -595,8 +596,9 @@ GetTableReplicaIdentityCommand(Oid relationId)
if (replicaIdentityCreateCommand)
{
replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList,
replicaIdentityCreateCommand);
replicaIdentityCreateCommandList = lappend(
replicaIdentityCreateCommandList,
makeTableDDLCommandString(replicaIdentityCreateCommand));
}
return replicaIdentityCreateCommandList;
@ -624,9 +626,11 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
if (extensionDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList, extensionDef);
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(extensionDef));
}
tableDDLEventList = lappend(tableDDLEventList, serverDef);
tableDDLEventList = lappend(tableDDLEventList,
makeTableDDLCommandString(serverDef));
}
/* fetch table schema and column option definitions */
@ -634,16 +638,19 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
includeSequenceDefaults);
char *tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);
tableDDLEventList = lappend(tableDDLEventList, tableSchemaDef);
tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString(
tableSchemaDef));
if (tableColumnOptionsDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList, tableColumnOptionsDef);
tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString(
tableColumnOptionsDef));
}
char *tableOwnerDef = TableOwnerResetCommand(relationId);
if (tableOwnerDef != NULL)
{
tableDDLEventList = lappend(tableDDLEventList, tableOwnerDef);
tableDDLEventList = lappend(tableDDLEventList, makeTableDDLCommandString(
tableOwnerDef));
}
List *policyCommands = CreatePolicyCommands(relationId);
@ -676,7 +683,8 @@ GetTableIndexAndConstraintCommands(Oid relationId)
BTEqualStrategyNumber, F_OIDEQ, relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgIndex,
IndexIndrelidIndexId, true, /* indexOK */
IndexIndrelidIndexId,
true, /* indexOK */
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
@ -702,7 +710,8 @@ GetTableIndexAndConstraintCommands(Oid relationId)
}
/* append found constraint or index definition to the list */
indexDDLEventList = lappend(indexDDLEventList, statementDef);
indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString(
statementDef));
/* if table is clustered on this index, append definition to the list */
if (indexForm->indisclustered)
@ -710,7 +719,8 @@ GetTableIndexAndConstraintCommands(Oid relationId)
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
Assert(clusteredDef != NULL);
indexDDLEventList = lappend(indexDDLEventList, clusteredDef);
indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString(
clusteredDef));
}
heapTuple = systable_getnext(scanDescriptor);
@ -841,3 +851,105 @@ DistributedTableReplicationIsEnabled()
{
return (ShardReplicationFactor > 1);
}
/*
* makeTableDDLCommandString creates a TableDDLCommand based on a constant string. If the
* TableDDLCommand is turned into a sharded table command the constant will be wrapped in
* worker_apply_shard_ddl_command with the target shardId. If the command applies to an
* un-sharded table (eg. mx) the command is applied as is.
*/
TableDDLCommand *
makeTableDDLCommandString(char *commandStr)
{
TableDDLCommand *command = CitusMakeNode(TableDDLCommand);
command->type = TABLE_DDL_COMMAND_STRING;
command->commandStr = commandStr;
return command;
}
/*
* GetShardedTableDDLCommandString is the internal function for TableDDLCommand objects
* created with makeTableDDLCommandString.
*/
static char *
GetShardedTableDDLCommandString(TableDDLCommand *command, uint64 shardId,
char *schemaName)
{
StringInfoData buf = { 0 };
initStringInfo(&buf);
Assert(command->type == TABLE_DDL_COMMAND_STRING);
char *escapedDDLCommand = quote_literal_cstr(command->commandStr);
if (schemaName != NULL && strcmp(schemaName, "public") != 0)
{
char *escapedSchemaName = quote_literal_cstr(schemaName);
appendStringInfo(&buf, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
escapedDDLCommand);
}
else
{
appendStringInfo(&buf, WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId,
escapedDDLCommand);
}
return buf.data;
}
/*
* GetTableDDLCommandString is the internal function for TableDDLCommand objects created
* with makeTableDDLCommandString to return the non-sharded version of the ddl command.
*/
static char *
GetTableDDLCommandString(TableDDLCommand *command)
{
Assert(command->type == TABLE_DDL_COMMAND_STRING);
return command->commandStr;
}
/*
* GetShardedTableDDLCommand returns the ddl command expressed by this TableDDLCommand
* where all applicable names are transformed into the names for a shard identified by
* shardId
*/
char *
GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId, char *schemaName)
{
switch (command->type)
{
case TABLE_DDL_COMMAND_STRING:
{
return GetShardedTableDDLCommandString(command, shardId, schemaName);
}
}
/* unreachable: compiler should warn/error when not all cases are covered above */
ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type)));
}
/*
* GetTableDDLCommand returns the ddl command expressed by this TableDDLCommand where all
* table names are targeting the base table, not any shards.
*/
char *
GetTableDDLCommand(TableDDLCommand *command)
{
switch (command->type)
{
case TABLE_DDL_COMMAND_STRING:
{
return GetTableDDLCommandString(command);
}
}
/* unreachable: compiler should warn/error when not all cases are covered above */
ereport(ERROR, (errmsg("unsupported TableDDLCommand: %d", command->type)));
}

View File

@ -979,7 +979,7 @@ RecreateTableDDLCommandList(Oid relationId)
"table")));
}
List *dropCommandList = list_make1(dropCommand->data);
List *dropCommandList = list_make1(makeTableDDLCommandString(dropCommand->data));
List *createCommandList = GetPreLoadTableCreationCommands(relationId,
includeSequenceDefaults);
List *recreateCommandList = list_concat(dropCommandList, createCommandList);
@ -996,20 +996,15 @@ RecreateTableDDLCommandList(Oid relationId)
static List *
WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId)
{
List *applyDdlCommandList = NIL;
List *applyDDLCommandList = NIL;
const char *ddlCommand = NULL;
TableDDLCommand *ddlCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList)
{
char *escapedDdlCommand = quote_literal_cstr(ddlCommand);
StringInfo applyDdlCommand = makeStringInfo();
appendStringInfo(applyDdlCommand,
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA,
shardId, escapedDdlCommand);
applyDdlCommandList = lappend(applyDdlCommandList, applyDdlCommand->data);
Assert(CitusIsA(ddlCommand, TableDDLCommand));
char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId, NULL);
applyDDLCommandList = lappend(applyDDLCommandList, applyDDLCommand);
}
return applyDdlCommandList;
return applyDDLCommandList;
}

View File

@ -678,25 +678,13 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
const char *ddlCommand = NULL;
TableDDLCommand *ddlCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList)
{
char *escapedDDLCommand = quote_literal_cstr(ddlCommand);
StringInfo applyDDLCommand = makeStringInfo();
if (strcmp(schemaName, "public") != 0)
{
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand);
}
else
{
appendStringInfo(applyDDLCommand,
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId,
escapedDDLCommand);
}
commandList = lappend(commandList, applyDDLCommand->data);
Assert(CitusIsA(ddlCommand, TableDDLCommand));
char *applyDDLCommand = GetShardedTableDDLCommand(ddlCommand, shardId,
schemaName);
commandList = lappend(commandList, applyDDLCommand);
}
const char *command = NULL;

View File

@ -15,6 +15,7 @@
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/errormessage.h"
#include "distributed/log_utils.h"
#include "distributed/metadata_cache.h"
@ -45,7 +46,8 @@ static const char *CitusNodeTagNamesD[] = {
"RelationShard",
"RelationRowLock",
"DeferredErrorMessage",
"GroupShardPlacement"
"GroupShardPlacement",
"TableDDLCommand"
};
const char **CitusNodeTagNames = CitusNodeTagNamesD;
@ -411,7 +413,8 @@ const ExtensibleNodeMethods nodeMethods[] =
DEFINE_NODE_METHODS_NO_READ(MultiJoin),
DEFINE_NODE_METHODS_NO_READ(MultiPartition),
DEFINE_NODE_METHODS_NO_READ(MultiCartesianProduct),
DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp)
DEFINE_NODE_METHODS_NO_READ(MultiExtendedOp),
DEFINE_NODE_METHODS_NO_READ(TableDDLCommand)
};
void

View File

@ -24,6 +24,7 @@
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/errormessage.h"
#include "distributed/log_utils.h"
#include "distributed/multi_logical_planner.h"
@ -564,3 +565,20 @@ OutDeferredErrorMessage(OUTFUNC_ARGS)
WRITE_INT_FIELD(linenumber);
WRITE_STRING_FIELD(functionname);
}
void
OutTableDDLCommand(OUTFUNC_ARGS)
{
WRITE_LOCALS(TableDDLCommand);
WRITE_NODE_TYPE("TableDDLCommand");
switch (node->type)
{
case TABLE_DDL_COMMAND_STRING:
{
WRITE_STRING_FIELD(commandStr);
break;
}
}
}

View File

@ -62,6 +62,7 @@ extern void OutMultiJoin(OUTFUNC_ARGS);
extern void OutMultiPartition(OUTFUNC_ARGS);
extern void OutMultiCartesianProduct(OUTFUNC_ARGS);
extern void OutMultiExtendedOp(OUTFUNC_ARGS);
extern void OutTableDDLCommand(OUTFUNC_ARGS);
extern void CopyNodeJob(COPYFUNC_ARGS);
extern void CopyNodeDistributedPlan(COPYFUNC_ARGS);

View File

@ -65,7 +65,8 @@ typedef enum CitusNodeTag
T_RelationShard,
T_RelationRowLock,
T_DeferredErrorMessage,
T_GroupShardPlacement
T_GroupShardPlacement,
T_TableDDLCommand
} CitusNodeTag;

View File

@ -83,6 +83,59 @@ typedef enum
SHARD_PLACEMENT_RANDOM = 3
} ShardPlacementPolicyType;
/*
* TableDDLCommandType encodes the implementation used by TableDDLCommand. See comments in
* TableDDLCpmmand for details.
*/
typedef enum TableDDLCommandType
{
TABLE_DDL_COMMAND_STRING,
} TableDDLCommandType;
/*
* TableDDLCommand holds the definition of a command to be executed to bring the table and
* or shard into a certain state. The command needs to be able to serialized into two
* versions:
* - one version should have the vanilla commands operating on the base table. These are
* used for example to create the MX table shards
* - the second versions should replace all identifiers with an identifier containing the
* shard id.
*
* Current implementations are
* - command string, created via makeTableDDLCommandString. This variant contains a ddl
* command that will be wrapped in `worker_apply_shard_ddl_command` when applied
* against a shard.
*/
typedef struct TableDDLCommand
{
CitusNode node;
/* encoding the type this TableDDLCommand contains */
TableDDLCommandType type;
/*
* This union contains one (1) typed field for every implementation for
* TableDDLCommand. A union enforces no overloading of fields but instead requiers at
* most one of the fields to be used at any time.
*/
union
{
/*
* CommandStr is used when type is set to TABLE_DDL_COMMAND_STRING. It holds the
* sql ddl command string representing the ddl command.
*/
char *commandStr;
};
} TableDDLCommand;
/* make functions for TableDDLCommand */
extern TableDDLCommand * makeTableDDLCommandString(char *commandStr);
extern char * GetShardedTableDDLCommand(TableDDLCommand *command, uint64 shardId,
char *schemaName);
extern char * GetTableDDLCommand(TableDDLCommand *command);
/* Config variables managed via guc.c */
extern int ShardCount;