Propagate identity columns as-is to workers.

issue/6694
Gokhan Gulbiz 2023-02-28 15:00:45 +03:00
parent 17ad61678f
commit afed03336d
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
7 changed files with 125 additions and 205 deletions

View File

@ -399,6 +399,8 @@ UndistributeTable(TableConversionParameters *params)
ErrorIfUnsupportedCascadeObjects(params->relationId);
ErrorIfTableHasIdentityColumn(params->relationId);
params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params);
@ -431,6 +433,7 @@ AlterDistributedTable(TableConversionParameters *params)
EnsureHashDistributedTable(params->relationId);
ErrorIfUnsupportedCascadeObjects(params->relationId);
ErrorIfTableHasIdentityColumn(params->relationId);
params->conversionType = ALTER_DISTRIBUTED_TABLE;
TableConversionState *con = CreateTableConversion(params);
@ -494,6 +497,7 @@ AlterTableSetAccessMethod(TableConversionParameters *params)
}
ErrorIfUnsupportedCascadeObjects(params->relationId);
ErrorIfTableHasIdentityColumn(params->relationId);
params->conversionType = ALTER_TABLE_SET_ACCESS_METHOD;
params->shardCountIsNull = true;
@ -1537,97 +1541,6 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
return query->data;
}
/*
* This function marks all the identity sequences as distributed on the given table.
*/
static void
MarkIdentitiesAsDistributed(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);
ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
}
/*
* This function returns sql statements to rename identites on the given table
*/
static void
PrepareRenameIdentitiesCommands(Oid sourceRelationId, Oid targetRelationId,
List **outCoordinatorCommands, List **outWorkerCommands)
{
Relation targetRelation = relation_open(targetRelationId, AccessShareLock);
TupleDesc targetTupleDescriptor = RelationGetDescr(targetRelation);
relation_close(targetRelation, NoLock);
bool missingSequenceOk = false;
for (int attributeIndex = 0; attributeIndex < targetTupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(targetTupleDescriptor,
attributeIndex);
if (attributeForm->attidentity)
{
char *columnName = NameStr(attributeForm->attname);
Oid targetSequenceOid = getIdentitySequence(targetRelationId,
attributeForm->attnum,
missingSequenceOk);
char *targetSequenceName = generate_relation_name(targetSequenceOid, NIL);
Oid sourceSequenceOid = getIdentitySequence(sourceRelationId,
attributeForm->attnum,
missingSequenceOk);
char *sourceSequenceName = generate_relation_name(sourceSequenceOid, NIL);
/* to rename sequence on the coordinator */
*outCoordinatorCommands = lappend(*outCoordinatorCommands, psprintf(
"SET citus.enable_ddl_propagation TO OFF; ALTER SEQUENCE %s RENAME TO %s; RESET citus.enable_ddl_propagation;",
quote_identifier(
targetSequenceName),
quote_identifier(
sourceSequenceName)));
/* update workers to use existing sequence and drop the new one generated by PG */
bool missingTableOk = true;
*outWorkerCommands = lappend(*outWorkerCommands,
GetAlterColumnWithNextvalDefaultCmd(
sourceSequenceOid, sourceRelationId,
columnName,
missingTableOk));
/* drop the sequence generated by identity column */
*outWorkerCommands = lappend(*outWorkerCommands, psprintf(
"DROP SEQUENCE IF EXISTS %s",
quote_identifier(
targetSequenceName)));
}
}
}
/*
* ReplaceTable replaces the source table with the target table.
* It moves all the rows of the source table to target table with INSERT SELECT.
@ -1686,24 +1599,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
}
/*
* Drop identity dependencies (sequences marked as DEPENDENCY_INTERNAL) on the workers
* to keep their states after the source table is dropped.
*/
List *ownedIdentitySequences = getOwnedSequences_internal(sourceId, 0,
DEPENDENCY_INTERNAL);
if (ownedIdentitySequences != NIL && ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
}
/*
* Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO)
*/
@ -1763,23 +1658,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, sourceName))));
}
/*
* We need to prepare rename identities commands before dropping the original table,
* otherwise we can't find the original names of the identity sequences.
* We prepare separate commands for the coordinator and the workers because:
* In the coordinator, we simply need to rename the identity sequences
* to their names on the old table, because right now the identity
* sequences have default names generated by Postgres with the creation of the new table
* In the workers, we have not dropped the original identity sequences,
* so what we do is we alter the columns and set their default to the
* original identity sequences, and after that we drop the new sequences.
*/
List *coordinatorCommandsToRenameIdentites = NIL;
List *workerCommandsToRenameIdentites = NIL;
PrepareRenameIdentitiesCommands(sourceId, targetId,
&coordinatorCommandsToRenameIdentites,
&workerCommandsToRenameIdentites);
resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
@ -1797,27 +1675,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, targetName),
quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);
char *coordinatorCommand = NULL;
foreach_ptr(coordinatorCommand, coordinatorCommandsToRenameIdentites)
{
ExecuteQueryViaSPI(coordinatorCommand, SPI_OK_UTILITY);
}
char *workerCommand = NULL;
foreach_ptr(workerCommand, workerCommandsToRenameIdentites)
{
SendCommandToWorkersWithMetadata(workerCommand);
}
/*
* To preserve identity sequences states in case of redistributing the table again,
* we don't drop them when we undistribute a table. To maintain consistency and
* avoid future problems if we redistribute the table, we want to apply all changes happening to
* the identity sequence in the coordinator to their corresponding sequences in the workers as well.
* That's why we have to mark identity sequences as distributed
*/
MarkIdentitiesAsDistributed(targetId);
}

View File

@ -98,6 +98,7 @@
* once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
*/
#define LOG_PER_TUPLE_AMOUNT 1000000
#define WORKER_MODIFY_IDENTITY_COMMAND "SELECT worker_modify_identity_columns(%s)"
/* local function forward declarations */
static void CreateDistributedTableConcurrently(Oid relationId,
@ -160,6 +161,7 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMeth
char *distributionColumnName,
char *colocateWithTableName);
static void WarnIfTableHaveNoReplicaIdentity(Oid relationId);
static void MarkIdentitiesAsDistributed(Oid targetRelationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -222,6 +224,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
EnsureCitusTableCanBeCreated(relationId);
/* enable create_distributed_table on an empty node */
InsertCoordinatorIfClusterEmpty();
@ -301,6 +304,8 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS)
shardCountIsStrict = true;
}
ErrorIfTableHasUnsupportedIdentityColumn(relationId);
CreateDistributedTableConcurrently(relationId, distributionColumnName,
distributionMethod,
colocateWithTableName,
@ -963,6 +968,8 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName)
{
ErrorIfTableHasUnsupportedIdentityColumn(relationId);
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
* we don't want to ask user to first undistribute their citus local tables
@ -1157,6 +1164,8 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
bool skip_validation = true;
ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
skip_validation);
MarkIdentitiesAsDistributed(relationId);
}
@ -1796,6 +1805,52 @@ ErrorIfTableIsACatalogTable(Relation relation)
}
/*
* This function marks all the identity sequences as distributed on the given table.
*/
static void
MarkIdentitiesAsDistributed(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool missingSequenceOk = false;
bool tableHasIdentityColumn = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
tableHasIdentityColumn = true;
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);
ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
if (tableHasIdentityColumn)
{
StringInfo stringInfo = makeStringInfo();
char *tableName = generate_qualified_relation_name(targetRelationId);
appendStringInfo(stringInfo,
WORKER_MODIFY_IDENTITY_COMMAND,
quote_literal_cstr(tableName));
SendCommandToWorkersWithMetadata(stringInfo->data);
}
}
/*
* EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
* according to ShouldLocalTableBeEmpty but it is not.

View File

@ -370,7 +370,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY,
creatingShellTableOnRemoteNode);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)

View File

@ -1378,29 +1378,6 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
}
}
/*
* We check for ADD COLUMN .. GENERATED .. AS IDENTITY expr
* since it uses a sequence as an internal dependency
* we should deparse the statement
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
deparseAT = true;
useInitialDDLCommandString = false;
/*
* Since we don't support constraints for AT_AddColumn
* we have to set is_not_null to true explicitly for identity columns
*/
ColumnDef *newColDef = copyObject(columnDefinition);
newColDef->constraints = NULL;
newColDef->is_not_null = true;
newCmd->def = (Node *) newColDef;
}
}
/*
* We check for ADD COLUMN .. SERIAL pseudo-type
@ -2539,34 +2516,6 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
}
}
}
/*
* We check for ADD COLUMN .. GENERATED AS IDENTITY expr
* since it uses a seqeunce as an internal dependency
*/
constraint = NULL;
foreach_ptr(constraint, columnConstraints)
{
if (constraint->contype == CONSTR_IDENTITY)
{
AttrNumber attnum = get_attnum(relationId,
columnDefinition->colname);
bool missing_ok = false;
Oid seqOid = getIdentitySequence(relationId, attnum, missing_ok);
if (ShouldSyncTableMetadata(relationId))
{
needMetadataSyncForNewSequences = true;
alterTableDefaultNextvalCmd =
GetAddColumnWithNextvalDefaultCmd(seqOid,
relationId,
columnDefinition
->colname,
columnDefinition
->typeName);
}
}
}
}
/*
* We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq')
@ -3222,6 +3171,17 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{
if (columnConstraint->contype == CONSTR_IDENTITY)
{
/*
* We currently don't support adding an identity column for an MX table
*/
if (ShouldSyncTableMetadata(relationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN commands involving identity"
" columns when metadata is synchronized to workers")));
}
/*
* Currently we don't support backfilling the new identity column with default values
* if the table is not empty
@ -4011,3 +3971,50 @@ MakeNameListFromRangeVar(const RangeVar *rel)
return list_make1(makeString(rel->relname));
}
}
/*
* ErrorIfTableHasUnsupportedIdentityColumn errors out if the given table has any identity column other than bigint identity column
*/
void
ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity && attributeForm->atttypid != INT8OID)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot complete operation on a table with smallint/int identity column")));
}
}
}
/*
* ErrorIfTableHasIdentityColumn errors out if the given table has identity column
*/
void
ErrorIfTableHasIdentityColumn(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot complete operation on a table with identity column")));
}
}
}

View File

@ -422,7 +422,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
else if (includeIdentityDefaults == INCLUDE_IDENTITY)
{
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(seqOid);
uint64 sequenceStart = nextval_internal(seqOid, false);
char *sequenceDef = psprintf(
" GENERATED %s AS IDENTITY (INCREMENT BY " INT64_FORMAT \
" MINVALUE " INT64_FORMAT " MAXVALUE "
@ -433,7 +432,8 @@ pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
"ALWAYS" : "BY DEFAULT",
pgSequenceForm->seqincrement,
pgSequenceForm->seqmin,
pgSequenceForm->seqmax, sequenceStart,
pgSequenceForm->seqmax,
pgSequenceForm->seqstart,
pgSequenceForm->seqcache,
pgSequenceForm->seqcycle ? "" : "NO ");

View File

@ -1635,8 +1635,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
attrdefResult = lappend_oid(attrdefResult, deprec->objid);
attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
}
else if ((deprec->deptype == DEPENDENCY_AUTO || deprec->deptype ==
DEPENDENCY_INTERNAL) &&
else if (deprec->deptype == DEPENDENCY_AUTO &&
deprec->refobjsubid != 0 &&
deprec->classid == RelationRelationId &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
@ -2605,8 +2604,7 @@ CreateShellTableOnWorkers(Oid relationId)
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults =
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY;
bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,

View File

@ -566,6 +566,9 @@ extern bool ConstrTypeCitusCanDefaultName(ConstrType constrType);
extern char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
char *colname, bool missingTableOk);
extern void ErrorIfTableHasUnsupportedIdentityColumn(Oid relationId);
extern void ErrorIfTableHasIdentityColumn(Oid relationId);
/* text_search.c - forward declarations */
extern List * GetCreateTextSearchConfigStatements(const ObjectAddress *address);
extern List * GetCreateTextSearchDictionaryStatements(const ObjectAddress *address);