Move identity columns modifications on workers logic to metadata sync

issue/6694
Gokhan Gulbiz 2023-03-27 12:49:41 +03:00
parent 212200f066
commit 235e653754
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
8 changed files with 84 additions and 67 deletions

View File

@ -98,8 +98,6 @@
* once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
*/ */
#define LOG_PER_TUPLE_AMOUNT 1000000 #define LOG_PER_TUPLE_AMOUNT 1000000
#define WORKER_MODIFY_IDENTITY_COLUMNS \
"SELECT pg_catalog.worker_modify_identity_columns(%s)"
/* local function forward declarations */ /* local function forward declarations */
static void CreateDistributedTableConcurrently(Oid relationId, static void CreateDistributedTableConcurrently(Oid relationId,
@ -168,7 +166,6 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMeth
char *distributionColumnName, char *distributionColumnName,
char *colocateWithTableName); char *colocateWithTableName);
static void WarnIfTableHaveNoReplicaIdentity(Oid relationId); static void WarnIfTableHaveNoReplicaIdentity(Oid relationId);
static void DistributeIdentityColumns(Oid targetRelationId);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -1203,8 +1200,6 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
bool skip_validation = true; bool skip_validation = true;
ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands, ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
skip_validation); skip_validation);
DistributeIdentityColumns(relationId);
} }
@ -1253,7 +1248,7 @@ EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList) foreach_ptr(seqInfo, seqInfoList)
@ -1330,7 +1325,7 @@ EnsureRelationHasCompatibleSequenceTypes(Oid relationId)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
EnsureDistributedSequencesHaveOneType(relationId, seqInfoList); EnsureDistributedSequencesHaveOneType(relationId, seqInfoList);
} }
@ -1844,51 +1839,6 @@ ErrorIfTableIsACatalogTable(Relation relation)
} }
/*
* DistributeIdentityColumns is responsible for marking sequences depend on
* identity columns of a given table. If the table has any identity columns,
* this function executes a command on workers to modify the identity columns
* min/max values to produce unique values on workers.
*/
static void
DistributeIdentityColumns(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool missingSequenceOk = false;
bool tableHasIdentityColumn = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
tableHasIdentityColumn = true;
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);
ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
if (tableHasIdentityColumn)
{
StringInfo stringInfo = makeStringInfo();
char *tableName = generate_qualified_relation_name(targetRelationId);
appendStringInfo(stringInfo,
WORKER_MODIFY_IDENTITY_COLUMNS,
quote_literal_cstr(tableName));
SendCommandToWorkersWithMetadata(stringInfo->data);
}
}
/* /*
* EnsureLocalTableEmptyIfNecessary errors out if the function should be empty * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
* according to ShouldLocalTableBeEmpty but it is not. * according to ShouldLocalTableBeEmpty but it is not.

View File

@ -33,7 +33,8 @@
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress); static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char
depType);
static List * FilterDistributedSequences(GrantStmt *stmt); static List * FilterDistributedSequences(GrantStmt *stmt);
@ -183,7 +184,7 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList,
char *columnName = NameStr(attributeForm->attname); char *columnName = NameStr(attributeForm->attname);
List *columnOwnedSequences = List *columnOwnedSequences =
getOwnedSequences_internal(relationId, attributeIndex + 1, 0); getOwnedSequences_internal(relationId, attributeIndex + 1, DEPENDENCY_AUTO);
if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0) if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0)
{ {
@ -453,21 +454,22 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
/* the code-path only supports a single object */ /* the code-path only supports a single object */
Assert(list_length(addresses) == 1); Assert(list_length(addresses) == 1);
/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);
/* error out if the sequence is distributed */ /* error out if the sequence is distributed */
if (IsAnyObjectDistributed(addresses)) if (IsAnyObjectDistributed(addresses) || SequenceUsedInDistributedTable(address,
DEPENDENCY_INTERNAL))
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Altering a distributed sequence is currently not supported."))); "Altering a distributed sequence is currently not supported.")));
} }
/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);
/* /*
* error out if the sequence is used in a distributed table * error out if the sequence is used in a distributed table
* and this is an ALTER SEQUENCE .. AS .. statement * and this is an ALTER SEQUENCE .. AS .. statement
*/ */
Oid citusTableId = SequenceUsedInDistributedTable(address); Oid citusTableId = SequenceUsedInDistributedTable(address, DEPENDENCY_AUTO);
if (citusTableId != InvalidOid) if (citusTableId != InvalidOid)
{ {
List *options = stmt->options; List *options = stmt->options;
@ -497,16 +499,19 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
* SequenceUsedInDistributedTable returns true if the argument sequence * SequenceUsedInDistributedTable returns true if the argument sequence
* is used as the default value of a column in a distributed table. * is used as the default value of a column in a distributed table.
* Returns false otherwise * Returns false otherwise
* See DependencyType for the possible values of depType.
* We use DEPENDENCY_INTERNAL for sequences created by identity column.
* DEPENDENCY_AUTO for regular sequences.
*/ */
static Oid static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress) SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
{ {
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid; Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0); GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList) foreach_ptr(seqInfo, seqInfoList)
{ {

View File

@ -3315,7 +3315,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
*/ */
AttrNumber attnum = get_attnum(relationId, command->name); AttrNumber attnum = get_attnum(relationId, command->name);
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum); GetDependentSequencesWithRelation(relationId, &seqInfoList, attnum,
DEPENDENCY_AUTO);
if (seqInfoList != NIL) if (seqInfoList != NIL)
{ {
ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command " ereport(ERROR, (errmsg("cannot execute ALTER COLUMN TYPE .. command "

View File

@ -1834,7 +1834,7 @@ static List *
GetRelationSequenceDependencyList(Oid relationId) GetRelationSequenceDependencyList(Oid relationId)
{ {
List *seqInfoList = NIL; List *seqInfoList = NIL;
GetDependentSequencesWithRelation(relationId, &seqInfoList, 0); GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
List *seqIdList = NIL; List *seqIdList = NIL;
SequenceInfo *seqInfo = NULL; SequenceInfo *seqInfo = NULL;

View File

@ -1586,10 +1586,13 @@ GetAttributeTypeOid(Oid relationId, AttrNumber attnum)
* For both cases, we use the intermediate AttrDefault object from pg_depend. * For both cases, we use the intermediate AttrDefault object from pg_depend.
* If attnum is specified, we only return the sequences related to that * If attnum is specified, we only return the sequences related to that
* attribute of the relationId. * attribute of the relationId.
* See DependencyType for the possible values of depType.
* We use DEPENDENCY_INTERNAL for sequences created by identity column.
* DEPENDENCY_AUTO for regular sequences.
*/ */
void void
GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum) AttrNumber attnum, char depType)
{ {
Assert(*seqInfoList == NIL); Assert(*seqInfoList == NIL);
@ -1626,7 +1629,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
if (deprec->classid == AttrDefaultRelationId && if (deprec->classid == AttrDefaultRelationId &&
deprec->objsubid == 0 && deprec->objsubid == 0 &&
deprec->refobjsubid != 0 && deprec->refobjsubid != 0 &&
deprec->deptype == DEPENDENCY_AUTO) deprec->deptype == depType)
{ {
/* /*
* We are going to generate corresponding SequenceInfo * We are going to generate corresponding SequenceInfo
@ -1635,7 +1638,7 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
attrdefResult = lappend_oid(attrdefResult, deprec->objid); attrdefResult = lappend_oid(attrdefResult, deprec->objid);
attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid); attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid);
} }
else if (deprec->deptype == DEPENDENCY_AUTO && else if (deprec->deptype == depType &&
deprec->refobjsubid != 0 && deprec->refobjsubid != 0 &&
deprec->classid == RelationRelationId && deprec->classid == RelationRelationId &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
@ -1882,6 +1885,51 @@ SequenceDependencyCommandList(Oid relationId)
} }
/*
* IdentitySequenceDependencyCommandList generate a command to execute a UDF (WORKER_MODIFY_IDENTITY_COLUMNS) on workers
* to modify the identity columns min/max values to produce unique values on workers.
*/
List *
IdentitySequenceDependencyCommandList(Oid targetRelationId)
{
List *commandList = NIL;
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
bool tableHasIdentityColumn = false;
for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);
if (attributeForm->attidentity)
{
tableHasIdentityColumn = true;
break;
}
}
if (tableHasIdentityColumn)
{
StringInfo stringInfo = makeStringInfo();
char *tableName = generate_qualified_relation_name(targetRelationId);
appendStringInfo(stringInfo,
WORKER_MODIFY_IDENTITY_COLUMNS,
quote_literal_cstr(tableName));
commandList = lappend(commandList,
makeTableDDLCommandString(
stringInfo->data));
}
return commandList;
}
/* /*
* CreateSequenceDependencyCommand generates a query string for calling * CreateSequenceDependencyCommand generates a query string for calling
* worker_record_sequence_dependency on the worker to recreate a sequence->table * worker_record_sequence_dependency on the worker to recreate a sequence->table

View File

@ -497,6 +497,15 @@ GetFullTableCreationCommands(Oid relationId,
tableDDLEventList = lappend(tableDDLEventList, tableDDLEventList = lappend(tableDDLEventList,
truncateTriggerCommand); truncateTriggerCommand);
} }
/*
* For identity column sequences, we only need to modify
* their min/max values to produce unique values on the worker nodes.
*/
List *identitySequenceDependencyCommandList =
IdentitySequenceDependencyCommandList(relationId);
tableDDLEventList = list_concat(tableDDLEventList,
identitySequenceDependencyCommandList);
} }
tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList); tableDDLEventList = list_concat(tableDDLEventList, postLoadCreationCommandList);

View File

@ -138,6 +138,7 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
* worker_modify_identity_columns takes a table oid, runs an ALTER SEQUENCE statement * worker_modify_identity_columns takes a table oid, runs an ALTER SEQUENCE statement
* for each identity column to adjust the minvalue and maxvalue of the sequence owned by * for each identity column to adjust the minvalue and maxvalue of the sequence owned by
* identity column such that the sequence creates globally unique values. * identity column such that the sequence creates globally unique values.
* We use table oid instead of sequence name to avoid any potential conflicts between sequences of different tables. This way, we can safely iterate through identity columns on a specific table without any issues. While this may introduce a small amount of business logic to workers, it's a much safer approach overall.
*/ */
Datum Datum
worker_modify_identity_columns(PG_FUNCTION_ARGS) worker_modify_identity_columns(PG_FUNCTION_ARGS)

View File

@ -101,11 +101,12 @@ extern void SyncNodeMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig); extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure); extern bool ShouldInitiateMetadataSync(bool *lockFailure);
extern List * SequenceDependencyCommandList(Oid relationId); extern List * SequenceDependencyCommandList(Oid relationId);
extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId);
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
extern List * GetSequencesFromAttrDef(Oid attrdefOid); extern List * GetSequencesFromAttrDef(Oid attrdefOid);
extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum); AttrNumber attnum, char depType);
extern List * GetDependentFunctionsWithRelation(Oid relationId); extern List * GetDependentFunctionsWithRelation(Oid relationId);
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
extern void SetLocalEnableMetadataSync(bool state); extern void SetLocalEnableMetadataSync(bool state);
@ -146,6 +147,8 @@ extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
"placementid = EXCLUDED.placementid" "placementid = EXCLUDED.placementid"
#define METADATA_SYNC_CHANNEL "metadata_sync" #define METADATA_SYNC_CHANNEL "metadata_sync"
#define WORKER_MODIFY_IDENTITY_COLUMNS \
"SELECT pg_catalog.worker_modify_identity_columns(%s)"
/* controlled via GUC */ /* controlled via GUC */
extern char *EnableManualMetadataChangesForUser; extern char *EnableManualMetadataChangesForUser;