From a69abe3be0dee728111d942a966838ad2c2b1bd2 Mon Sep 17 00:00:00 2001 From: Naisila Puka <37271756+naisila@users.noreply.github.com> Date: Thu, 9 Sep 2021 23:41:07 +0300 Subject: [PATCH] Fixes bug about int and smallint sequences on MX (#5254) * Introduce worker_nextval udf for int&smallint column defaults * Fix current tests and add new ones for worker_nextval --- .../distributed/commands/alter_table.c | 4 +- .../citus_add_local_table_to_metadata.c | 2 +- src/backend/distributed/commands/table.c | 135 +++++++++- .../distributed/commands/utility_hook.c | 5 +- .../distributed/deparser/citus_ruleutils.c | 29 +- .../distributed/metadata/metadata_sync.c | 16 +- .../distributed/operations/node_protocol.c | 14 +- .../distributed/operations/repair_shards.c | 2 +- .../distributed/operations/stage_protocol.c | 4 +- .../distributed/sql/citus--10.1-1--10.2-1.sql | 1 + .../sql/downgrades/citus--10.2-1--10.1-1.sql | 1 + .../sql/udfs/worker_nextval/10.2-1.sql | 6 + .../sql/udfs/worker_nextval/latest.sql | 6 + .../worker/worker_data_fetch_protocol.c | 36 ++- src/include/distributed/citus_ruleutils.h | 4 +- src/include/distributed/commands.h | 1 + .../distributed/coordinator_protocol.h | 26 +- src/include/distributed/worker_protocol.h | 2 + src/test/regress/expected/multi_extension.out | 3 +- .../regress/expected/multi_metadata_sync.out | 30 ++- .../expected/multi_sequence_default.out | 255 +++++++++++++++--- .../expected/upgrade_list_citus_objects.out | 3 +- src/test/regress/sql/multi_metadata_sync.sql | 11 + .../regress/sql/multi_sequence_default.sql | 101 ++++++- 24 files changed, 596 insertions(+), 101 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/worker_nextval/10.2-1.sql create mode 100644 src/backend/distributed/sql/udfs/worker_nextval/latest.sql diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 0f3a2be3b..df105c1e8 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -539,7 +539,9 @@ ConvertTable(TableConversionState *con) } char *newAccessMethod = con->accessMethod ? con->accessMethod : con->originalAccessMethod; - List *preLoadCommands = GetPreLoadTableCreationCommands(con->relationId, true, + IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS; + List *preLoadCommands = GetPreLoadTableCreationCommands(con->relationId, + includeSequenceDefaults, newAccessMethod); if (con->accessMethod && strcmp(con->accessMethod, "columnar") == 0) diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 6dfe6cbc9..303f2ea3a 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -463,7 +463,7 @@ GetShellTableDDLEventsForCitusLocalTable(Oid relationId) * Include DEFAULT clauses for columns getting their default values from * a sequence. */ - bool includeSequenceDefaults = true; + IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS; List *tableDDLCommands = GetFullTableCreationCommands(relationId, includeSequenceDefaults); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 81fe8935e..e5a8101ea 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -42,6 +42,7 @@ #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "parser/parse_expr.h" +#include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -95,8 +96,11 @@ static void SetInterShardDDLTaskPlacementList(Task *task, static void SetInterShardDDLTaskRelationShardList(Task *task, ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); -static Oid GetSequenceOid(Oid relationId, AttrNumber attnum); static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum); +static char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, + char *colname); +static char * GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, + char *colname, TypeName *typeName); /* @@ -654,6 +658,14 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, */ bool deparseAT = false; bool propagateCommandToWorkers = true; + + /* + * Sometimes we want to run a different DDL Command string in MX workers + * For example, in cases where worker_nextval should be used instead + * of nextval() in column defaults with type int and smallint + */ + bool useInitialDDLCommandString = true; + AlterTableStmt *newStmt = copyObject(alterTableStatement); AlterTableCmd *newCmd = makeNode(AlterTableCmd); @@ -763,6 +775,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, if (contain_nextval_expression_walker(expr, NULL)) { deparseAT = true; + useInitialDDLCommandString = false; /* the new column definition will have no constraint */ ColumnDef *newColDef = copyObject(columnDefinition); @@ -832,6 +845,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, if (contain_nextval_expression_walker(expr, NULL)) { propagateCommandToWorkers = false; + useInitialDDLCommandString = false; } } else if (alterTableType == AT_AttachPartition) @@ -928,7 +942,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, sqlForTaskList = DeparseTreeNode((Node *) newStmt); } - ddlJob->commandString = alterTableCommand; + ddlJob->commandString = useInitialDDLCommandString ? alterTableCommand : NULL; if (OidIsValid(rightRelationId)) { @@ -1640,6 +1654,11 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) EnsureDependenciesExistOnAllNodes(&tableAddress); } + /* for the new sequences coming with this ALTER TABLE statement */ + bool needMetadataSyncForNewSequences = false; + + char *alterTableDefaultNextvalCmd = NULL; + List *commandList = alterTableStatement->cmds; AlterTableCmd *command = NULL; foreach_ptr(command, commandList) @@ -1728,8 +1747,16 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) if (ShouldSyncTableMetadata(relationId) && ClusterHasKnownMetadataWorkers()) { + needMetadataSyncForNewSequences = true; MarkSequenceDistributedAndPropagateDependencies( seqOid); + alterTableDefaultNextvalCmd = + GetAddColumnWithNextvalDefaultCmd(seqOid, + relationId, + columnDefinition + ->colname, + columnDefinition + ->typeName); } } } @@ -1761,15 +1788,17 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) if (ShouldSyncTableMetadata(relationId) && ClusterHasKnownMetadataWorkers()) { + needMetadataSyncForNewSequences = true; MarkSequenceDistributedAndPropagateDependencies(seqOid); + alterTableDefaultNextvalCmd = GetAlterColumnWithNextvalDefaultCmd( + seqOid, relationId, command->name); } } } } } - /* for the new sequences coming with this ALTER TABLE statement */ - if (ShouldSyncTableMetadata(relationId) && ClusterHasKnownMetadataWorkers()) + if (needMetadataSyncForNewSequences) { List *sequenceCommandList = NIL; @@ -1787,6 +1816,16 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) SendCommandToWorkersWithMetadata(sequenceCommand); } + /* + * It's easy to retrieve the sequence id to create the proper commands + * in postprocess, after the dependency between the sequence and the table + * has been created. We already return ddlJobs in PreprocessAlterTableStmt, + * hence we can't return ddlJobs in PostprocessAlterTableStmt. + * That's why we execute the following here instead of + * in ExecuteDistributedDDLJob + */ + SendCommandToWorkersWithMetadata(alterTableDefaultNextvalCmd); + SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); } } @@ -1797,7 +1836,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) * of the attribute with given attnum of the given table relationId * If there is no sequence used it returns InvalidOid. */ -static Oid +Oid GetSequenceOid(Oid relationId, AttrNumber attnum) { /* get attrdefoid from the given relationId and attnum */ @@ -1821,15 +1860,10 @@ GetSequenceOid(Oid relationId, AttrNumber attnum) if (list_length(sequencesFromAttrDef) > 1) { /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - ereport(ERROR, (errmsg( - "More than one sequence in a column default" - " is not supported for adding local tables to metadata"))); - } ereport(ERROR, (errmsg( "More than one sequence in a column default" - " is not supported for distribution"))); + " is not supported for distribution " + "or for adding local tables to metadata"))); } return lfirst_oid(list_head(sequencesFromAttrDef)); @@ -1886,6 +1920,83 @@ get_attrdef_oid(Oid relationId, AttrNumber attnum) } +/* + * GetAlterColumnWithNextvalDefaultCmd returns a string representing: + * ALTER TABLE ALTER COLUMN .. SET DEFAULT nextval() + * If sequence type is not bigint, we use worker_nextval() instead of nextval(). + */ +static char * +GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname) +{ + char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + + char *nextvalFunctionName = "nextval"; + bool useWorkerNextval = (pg_get_sequencedef(sequenceOid)->seqtypid != INT8OID); + if (useWorkerNextval) + { + /* + * We use worker_nextval for int and smallint types. + * Check issue #5126 and PR #5254 for details. + * https://github.com/citusdata/citus/issues/5126 + */ + nextvalFunctionName = "worker_nextval"; + } + + StringInfoData str = { 0 }; + initStringInfo(&str); + appendStringInfo(&str, "ALTER TABLE %s ALTER COLUMN %s " + "SET DEFAULT %s(%s::regclass)", + qualifiedRelationName, colname, + quote_qualified_identifier("pg_catalog", nextvalFunctionName), + quote_literal_cstr(qualifiedSequenceName)); + + return str.data; +} + + +/* + * GetAddColumnWithNextvalDefaultCmd returns a string representing: + * ALTER TABLE ADD COLUMN .. DEFAULT nextval() + * If sequence type is not bigint, we use worker_nextval() instead of nextval(). + */ +static char * +GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname, + TypeName *typeName) +{ + char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + + char *nextvalFunctionName = "nextval"; + bool useWorkerNextval = (pg_get_sequencedef(sequenceOid)->seqtypid != INT8OID); + if (useWorkerNextval) + { + /* + * We use worker_nextval for int and smallint types. + * Check issue #5126 and PR #5254 for details. + * https://github.com/citusdata/citus/issues/5126 + */ + nextvalFunctionName = "worker_nextval"; + } + + int32 typmod = 0; + Oid typeOid = InvalidOid; + bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY; + typenameTypeIdAndMod(NULL, typeName, &typeOid, &typmod); + + StringInfoData str = { 0 }; + initStringInfo(&str); + appendStringInfo(&str, + "ALTER TABLE %s ADD COLUMN %s %s " + "DEFAULT %s(%s::regclass)", qualifiedRelationName, colname, + format_type_extended(typeOid, typmod, formatFlags), + quote_qualified_identifier("pg_catalog", nextvalFunctionName), + quote_literal_cstr(qualifiedSequenceName)); + + return str.data; +} + + void ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command, Constraint *constraint) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 82837ba59..12fdc5f45 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -891,7 +891,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) SendCommandToWorkersWithMetadata(setSearchPathCommand); } - SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); + if (ddlJob->commandString != NULL) + { + SendCommandToWorkersWithMetadata((char *) ddlJob->commandString); + } } ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported); diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 6e185eda1..88b12deb1 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -251,12 +251,14 @@ pg_get_sequencedef(Oid sequenceRelationId) * definition includes table's schema, default column values, not null and check * constraints. The definition does not include constraints that trigger index * creations; specifically, unique and primary key constraints are excluded. - * When the flag includeSequenceDefaults is set, the function also creates + * When includeSequenceDefaults is NEXTVAL_SEQUENCE_DEFAULTS, the function also creates * DEFAULT clauses for columns getting their default values from a sequence. + * When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT + * clause using worker_nextval('sequence') and not nextval('sequence') */ char * -pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults, - char *accessMethod) +pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults + includeSequenceDefaults, char *accessMethod) { bool firstAttributePrinted = false; AttrNumber defaultValueIndex = 0; @@ -374,7 +376,26 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults, } else { - appendStringInfo(&buffer, " DEFAULT %s", defaultString); + Oid seqOid = GetSequenceOid(tableRelationId, defaultValue->adnum); + if (includeSequenceDefaults == WORKER_NEXTVAL_SEQUENCE_DEFAULTS && + seqOid != InvalidOid && + pg_get_sequencedef(seqOid)->seqtypid != INT8OID) + { + /* + * We use worker_nextval for int and smallint types. + * Check issue #5126 and PR #5254 for details. + * https://github.com/citusdata/citus/issues/5126 + */ + char *sequenceName = generate_qualified_relation_name( + seqOid); + appendStringInfo(&buffer, + " DEFAULT worker_nextval(%s::regclass)", + quote_literal_cstr(sequenceName)); + } + else + { + appendStringInfo(&buffer, " DEFAULT %s", defaultString); + } } } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6fe5ac58d..87d0528f5 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -497,7 +497,7 @@ MetadataCreateCommands(void) List *propagatedTableList = NIL; bool includeNodesFromOtherClusters = true; List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); - bool includeSequenceDefaults = true; + IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; /* make sure we have deterministic output for our tests */ workerNodeList = SortList(workerNodeList, CompareWorkerNodes); @@ -651,7 +651,7 @@ GetDistributedTableDDLEvents(Oid relationId) CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); List *commandList = NIL; - bool includeSequenceDefaults = true; + IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; /* if the table is owned by an extension we only propagate pg_dist_* records */ bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); @@ -1287,14 +1287,10 @@ GetDependentSequencesWithRelation(Oid relationId, List **attnumList, /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ if (list_length(sequencesFromAttrDef) > 1) { - if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) - { - ereport(ERROR, (errmsg( - "More than one sequence in a column default" - " is not supported for adding local tables to metadata"))); - } - ereport(ERROR, (errmsg("More than one sequence in a column default" - " is not supported for distribution"))); + ereport(ERROR, (errmsg( + "More than one sequence in a column default" + " is not supported for distribution " + "or for adding local tables to metadata"))); } if (list_length(sequencesFromAttrDef) == 1) diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 1638489e7..5c09ee59f 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -216,7 +216,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) { text *relationName = PG_GETARG_TEXT_P(0); Oid relationId = ResolveRelationId(relationName, false); - bool includeSequenceDefaults = true; + IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS; /* create a function context for cross-call persistence */ @@ -533,16 +533,19 @@ ResolveRelationId(text *relationName, bool missingOk) /* - * GetFullTableCreationCommands takes in a relationId, includeSequenceDefaults flag, + * GetFullTableCreationCommands takes in a relationId, includeSequenceDefaults, * and returns the list of DDL commands needed to reconstruct the relation. - * When the flag includeSequenceDefaults is set, the function also creates + * When includeSequenceDefaults is NEXTVAL_SEQUENCE_DEFAULTS, the function also creates * DEFAULT clauses for columns getting their default values from a sequence. + * When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT + * clause using worker_nextval('sequence') and not nextval('sequence') * These DDL commands are all palloced; and include the table's schema * definition, optional column storage and statistics definitions, and index * constraint and trigger definitions. */ List * -GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults) +GetFullTableCreationCommands(Oid relationId, IncludeSequenceDefaults + includeSequenceDefaults) { List *tableDDLEventList = NIL; @@ -651,7 +654,8 @@ GetTableReplicaIdentityCommand(Oid relationId) * to facilitate faster data load. */ List * -GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults, +GetPreLoadTableCreationCommands(Oid relationId, + IncludeSequenceDefaults includeSequenceDefaults, char *accessMethod) { List *tableDDLEventList = NIL; diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 2784e6958..8a7a276f0 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -1453,7 +1453,7 @@ RecreateTableDDLCommandList(Oid relationId) StringInfo dropCommand = makeStringInfo(); char relationKind = get_rel_relkind(relationId); - bool includeSequenceDefaults = false; + IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS; /* build appropriate DROP command based on relation kind */ if (RegularTable(relationId)) diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index a6b9c04da..66ee281ba 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -455,7 +455,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, int placementsCreated = 0; List *foreignConstraintCommandList = GetReferencingForeignConstaintCommands(relationId); - bool includeSequenceDefaults = false; + IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS; List *ddlCommandList = GetFullTableCreationCommands(relationId, includeSequenceDefaults); uint32 connectionFlag = FOR_DDL; @@ -568,7 +568,7 @@ void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, bool useExclusiveConnection, bool colocatedShard) { - bool includeSequenceDefaults = false; + IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS; List *ddlCommandList = GetFullTableCreationCommands(distributedRelationId, includeSequenceDefaults); List *foreignConstraintCommandList = diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index 8c219ee68..507a141c5 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -19,6 +19,7 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi #include "udfs/create_time_partitions/10.2-1.sql" #include "udfs/drop_old_time_partitions/10.2-1.sql" #include "udfs/get_missing_time_partition_ranges/10.2-1.sql" +#include "udfs/worker_nextval/10.2-1.sql" DROP FUNCTION pg_catalog.citus_drop_all_shards(regclass, text, text); CREATE FUNCTION pg_catalog.citus_drop_all_shards(logicalrelid regclass, diff --git a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql index 6fba9d8fd..1e47b42c4 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql @@ -19,6 +19,7 @@ DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint); DROP FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, integer); DROP FUNCTION pg_catalog.create_time_partitions(regclass, interval, timestamp with time zone, timestamp with time zone); DROP FUNCTION pg_catalog.get_missing_time_partition_ranges(regclass, interval, timestamp with time zone, timestamp with time zone); +DROP FUNCTION pg_catalog.worker_nextval(regclass); DROP PROCEDURE pg_catalog.drop_old_time_partitions(regclass, timestamptz); diff --git a/src/backend/distributed/sql/udfs/worker_nextval/10.2-1.sql b/src/backend/distributed/sql/udfs/worker_nextval/10.2-1.sql new file mode 100644 index 000000000..47aa8101b --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_nextval/10.2-1.sql @@ -0,0 +1,6 @@ +CREATE FUNCTION pg_catalog.worker_nextval(sequence regclass) + RETURNS int + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_nextval$$; +COMMENT ON FUNCTION pg_catalog.worker_nextval(regclass) + IS 'calculates nextval() for column defaults of type int or smallint'; diff --git a/src/backend/distributed/sql/udfs/worker_nextval/latest.sql b/src/backend/distributed/sql/udfs/worker_nextval/latest.sql new file mode 100644 index 000000000..47aa8101b --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_nextval/latest.sql @@ -0,0 +1,6 @@ +CREATE FUNCTION pg_catalog.worker_nextval(sequence regclass) + RETURNS int + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_nextval$$; +COMMENT ON FUNCTION pg_catalog.worker_nextval(regclass) + IS 'calculates nextval() for column defaults of type int or smallint'; diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 187eeee05..efdc2da85 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -77,6 +77,7 @@ PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_sequence_command); PG_FUNCTION_INFO_V1(worker_append_table_to_shard); +PG_FUNCTION_INFO_V1(worker_nextval); /* * Following UDFs are stub functions, you can check their comments for more @@ -700,6 +701,21 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) } +/* + * worker_nextval calculates nextval() in worker nodes + * for int and smallint column default types + * TODO: not error out but get the proper nextval() + */ +Datum +worker_nextval(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errmsg( + "nextval(sequence) calls in worker nodes are not supported" + " for column defaults of type int or smallint"))); + PG_RETURN_INT32(0); +} + + /* * check_log_statement is a copy of postgres' check_log_statement function and * returns whether a statement ought to be logged or not. @@ -755,14 +771,20 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName, int64 sequenceMinValue = sequenceData->seqmin; int valueBitLength = 48; - /* for smaller types, put the group ID into the first 4 bits */ - if (sequenceTypeId == INT4OID) + /* + * For int and smallint, we don't currently support insertion from workers + * Check issue #5126 and PR #5254 for details. + * https://github.com/citusdata/citus/issues/5126 + * So, no need to alter sequence min/max for now + * We call setval(sequence, maxvalue) such that manually using + * nextval(sequence) in the workers will error out as well. + */ + if (sequenceTypeId != INT8OID) { - valueBitLength = 28; - } - else if (sequenceTypeId == INT2OID) - { - valueBitLength = 12; + DirectFunctionCall2(setval_oid, + ObjectIdGetDatum(sequenceId), + Int64GetDatum(sequenceMaxValue)); + return; } /* calculate min/max values that the sequence can generate in this worker */ diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index fac19352d..946839181 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -15,6 +15,7 @@ #include "catalog/pg_sequence.h" #include "commands/sequence.h" +#include "distributed/coordinator_protocol.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" @@ -31,7 +32,8 @@ extern Oid get_extension_schema(Oid ext_oid); extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); -extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation, +extern char * pg_get_tableschemadef_string(Oid tableRelationId, + IncludeSequenceDefaults includeSequenceDefaults, char *accessMethod); extern void EnsureRelationKindSupported(Oid relationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index bb73e8764..7b0b2f5f2 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -422,6 +422,7 @@ extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMet extern ObjectAddress AlterTableSchemaStmtObjectAddress(Node *stmt, bool missing_ok); extern List * MakeNameListFromRangeVar(const RangeVar *rel); +extern Oid GetSequenceOid(Oid relationId, AttrNumber attnum); /* truncate.c - forward declarations */ diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 07f8e84d6..3c9bf8ad6 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -111,6 +111,25 @@ typedef enum IndexDefinitionDeparseFlags } IndexDefinitionDeparseFlags; +/* + * IncludeSequenceDefaults decides on inclusion of DEFAULT clauses for columns + * getting their default values from a sequence when creating the definition + * of a table. + */ +typedef enum IncludeSequenceDefaults +{ + NO_SEQUENCE_DEFAULTS = 0, /* don't include sequence defaults */ + NEXTVAL_SEQUENCE_DEFAULTS = 1, /* include sequence defaults */ + + /* + * Include sequence defaults, but use worker_nextval instead of nextval + * when the default will be called in worker node, and the column type is + * int or smallint. + */ + WORKER_NEXTVAL_SEQUENCE_DEFAULTS = 2, +} IncludeSequenceDefaults; + + struct TableDDLCommand; typedef struct TableDDLCommand TableDDLCommand; typedef char *(*TableDDLFunction)(void *context); @@ -193,11 +212,12 @@ extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); extern Oid ResolveRelationId(text *relationName, bool missingOk); -extern List * GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults); +extern List * GetFullTableCreationCommands(Oid relationId, + IncludeSequenceDefaults includeSequenceDefaults); extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes, bool includeReplicaIdentity); -extern List * GetPreLoadTableCreationCommands(Oid relationId, - bool includeSequenceDefaults, +extern List * GetPreLoadTableCreationCommands(Oid relationId, IncludeSequenceDefaults + includeSequenceDefaults, char *accessMethod); extern List * GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags); extern List * GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId, diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index b91d64184..5b950084f 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -162,5 +162,7 @@ extern Datum worker_find_block_local_path(PG_FUNCTION_ARGS); /* Function declaration for calculating hashed value */ extern Datum worker_hash(PG_FUNCTION_ARGS); +/* Function declaration for calculating nextval() in worker */ +extern Datum worker_nextval(PG_FUNCTION_ARGS); #endif /* WORKER_PROTOCOL_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index d4e37b787..0a6ca29a6 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -810,7 +810,8 @@ SELECT * FROM multi_extension.print_extension_changes(); | function drop_old_time_partitions(regclass,timestamp with time zone) | function get_missing_time_partition_ranges(regclass,interval,timestamp with time zone,timestamp with time zone) TABLE(partition_name text, range_from_value text, range_to_value text) | function stop_metadata_sync_to_node(text,integer,boolean) void -(15 rows) + | function worker_nextval(regclass) integer +(16 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 97d5f1c9d..ad0369790 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -985,7 +985,9 @@ SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); INSERT INTO mx_table_with_small_sequence VALUES (0); \c - - - :worker_1_port +-- Insert doesn't work because the defaults are of type int and smallint INSERT INTO mx_table_with_small_sequence VALUES (1), (3); +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint \c - - - :master_port SET citus.shard_replication_factor TO 1; -- Create an MX table with (BIGSERIAL) sequences @@ -996,6 +998,7 @@ SELECT create_distributed_table('mx_table_with_sequence', 'a'); (1 row) +INSERT INTO mx_table_with_sequence VALUES (0); SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_sequence'::regclass; Column | Type | Modifiers --------------------------------------------------------------------- @@ -1042,6 +1045,8 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_ public | mx_table_with_sequence_c_seq | sequence | postgres (1 row) +-- Insert works because the defaults are of type bigint +INSERT INTO mx_table_with_sequence VALUES (1), (3); -- check that pg_depend records exist on the worker SELECT refobjsubid FROM pg_depend WHERE objid = 'mx_table_with_sequence_b_seq'::regclass AND refobjid = 'mx_table_with_sequence'::regclass; @@ -1061,13 +1066,13 @@ WHERE objid = 'mx_table_with_sequence_c_seq'::regclass AND refobjid = 'mx_table_ SELECT nextval('mx_table_with_sequence_b_seq'); nextval --------------------------------------------------------------------- - 281474976710657 + 281474976710659 (1 row) SELECT nextval('mx_table_with_sequence_c_seq'); nextval --------------------------------------------------------------------- - 281474976710657 + 281474976710659 (1 row) -- Check that adding a new metadata node sets the sequence space correctly @@ -1119,7 +1124,11 @@ SELECT nextval('mx_table_with_sequence_c_seq'); 562949953421313 (1 row) +-- Insert doesn't work because the defaults are of type int and smallint INSERT INTO mx_table_with_small_sequence VALUES (2), (4); +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint +-- Insert works because the defaults are of type bigint +INSERT INTO mx_table_with_sequence VALUES (2), (4); -- Check that dropping the mx table with sequences works as expected \c - - - :master_port -- check our small sequence values @@ -1127,10 +1136,17 @@ SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c; a | b | c --------------------------------------------------------------------- 0 | 1 | 1 - 1 | 268435457 | 4097 - 2 | 536870913 | 8193 - 3 | 268435458 | 4098 - 4 | 536870914 | 8194 +(1 row) + +--check our bigint sequence values +SELECT a, b, c FROM mx_table_with_sequence ORDER BY a,b,c; + a | b | c +--------------------------------------------------------------------- + 0 | 1 | 1 + 1 | 281474976710657 | 281474976710657 + 2 | 562949953421314 | 562949953421314 + 3 | 281474976710658 | 281474976710658 + 4 | 562949953421315 | 562949953421315 (5 rows) -- Check that dropping the mx table with sequences works as expected @@ -1672,7 +1688,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1; CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass)) CREATE TABLE public.dist_table_1 (a integer) CREATE TABLE public.mx_ref (col_1 integer, col_2 text) - CREATE TABLE public.test_table (id integer DEFAULT nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT nextval('public.mx_test_sequence_1'::regclass)) + CREATE TABLE public.test_table (id integer DEFAULT worker_nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT worker_nextval('public.mx_test_sequence_1'::regclass)) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (4, 1, 'localhost', 8888, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'default', TRUE),(5, 1, 'localhost', 8889, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'second-cluster', TRUE),(1, 1, 'localhost', 57637, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default', TRUE),(7, 5, 'localhost', 57638, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default', TRUE) SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's') SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's') diff --git a/src/test/regress/expected/multi_sequence_default.out b/src/test/regress/expected/multi_sequence_default.out index 9ed0b0baf..903feaeeb 100644 --- a/src/test/regress/expected/multi_sequence_default.out +++ b/src/test/regress/expected/multi_sequence_default.out @@ -184,7 +184,7 @@ SELECT create_distributed_table('seq_test_4','x'); (1 row) CREATE SEQUENCE seq_4; -ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4'); +ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4'); SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node --------------------------------------------------------------------- @@ -195,28 +195,21 @@ DROP SEQUENCE seq_4 CASCADE; NOTICE: drop cascades to default value for column a of table seq_test_4 TRUNCATE seq_test_4; CREATE SEQUENCE seq_4; -ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4'); +ALTER TABLE seq_test_4 ADD COLUMN b bigint DEFAULT nextval('seq_4'); -- on worker it should generate high sequence number \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *; - x | y | a | b + x | y | a | b --------------------------------------------------------------------- - 1 | 2 | | 268435457 + 1 | 2 | | 281474976710657 (1 row) --- check that we have can properly insert to tables from before metadata sync +-- check that we have can't insert to tables from before metadata sync +-- seq_test_0 and seq_test_0_local_table have int and smallint column defaults INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *; - x | y | z | w00 | w01 | w10 | w11 | w20 | w21 ---------------------------------------------------------------------- - 1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657 -(1 row) - +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *; - x | y | z | w00 | w01 | w10 | w11 | w20 | w21 ---------------------------------------------------------------------- - 1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657 -(1 row) - +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint \c - - - :master_port SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; @@ -250,14 +243,10 @@ ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1'); --------------------------------------------------------------------- integer | 1 | 1 | 2147483647 | 1 | no | 1 --- check insertion is within int bounds in the worker +-- check insertion doesn't work in the worker because type is int \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_1 values (1, 2) RETURNING *; - x | y | z ---------------------------------------------------------------------- - 1 | 2 | 268435457 -(1 row) - +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint \c - - - :master_port SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; @@ -291,14 +280,10 @@ ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local --------------------------------------------------------------------- integer | 1 | 1 | 2147483647 | 1 | no | 1 --- check insertion is within int bounds in the worker +-- check insertion doesn't work in the worker because type is int \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *; - x | y | z ---------------------------------------------------------------------- - 1 | 2 | 268435457 -(1 row) - +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint \c - - - :master_port SET citus.shard_replication_factor TO 1; SET search_path = sequence_default, public; @@ -436,7 +421,7 @@ SELECT create_distributed_table('seq_test_5','x'); CREATE SCHEMA sequence_default_1; CREATE SEQUENCE sequence_default_1.seq_5; -ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5'); +ALTER TABLE seq_test_5 ADD COLUMN a bigint DEFAULT nextval('sequence_default_1.seq_5'); DROP SCHEMA sequence_default_1 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to sequence sequence_default_1.seq_5 @@ -451,9 +436,9 @@ INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *; -- but is still present on worker \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_5 VALUES (1, 2) RETURNING *; - x | y | a + x | y | a --------------------------------------------------------------------- - 1 | 2 | 268435457 + 1 | 2 | 281474976710657 (1 row) \c - - - :master_port @@ -593,10 +578,10 @@ ERROR: cannot alter OWNED BY option of a sequence already owned by a distribute ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8; \c - - - :worker_1_port \d sequence_default_8.seq_8 - Sequence "sequence_default_8.seq_8" - Type | Start | Minimum | Maximum | Increment | Cycles? | Cache + Sequence "sequence_default_8.seq_8" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache --------------------------------------------------------------------- - integer | 268435457 | 268435457 | 536870913 | 3 | yes | 10 + integer | 1 | 1 | 2147483647 | 3 | yes | 10 \c - - - :master_port SET citus.shard_replication_factor TO 1; @@ -611,10 +596,10 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port); ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default; \c - - - :worker_1_port \d sequence_default.seq_8 - Sequence "sequence_default.seq_8" - Type | Start | Minimum | Maximum | Increment | Cycles? | Cache + Sequence "sequence_default.seq_8" + Type | Start | Minimum | Maximum | Increment | Cycles? | Cache --------------------------------------------------------------------- - integer | 268435457 | 268435457 | 536870913 | 3 | yes | 10 + integer | 1 | 1 | 2147483647 | 3 | yes | 10 \c - - - :master_port SET citus.shard_replication_factor TO 1; @@ -638,9 +623,9 @@ CREATE SEQUENCE seq_9; CREATE SEQUENCE seq_10; CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10')); SELECT create_distributed_table('seq_test_9', 'x'); -ERROR: More than one sequence in a column default is not supported for distribution +ERROR: More than one sequence in a column default is not supported for distribution or for adding local tables to metadata SELECT citus_add_local_table_to_metadata('seq_test_9'); -ERROR: More than one sequence in a column default is not supported for adding local tables to metadata +ERROR: More than one sequence in a column default is not supported for distribution or for adding local tables to metadata ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9'); SELECT create_distributed_table('seq_test_9', 'x'); create_distributed_table @@ -772,6 +757,200 @@ SELECT create_distributed_table('seq_test_11', 'col1'); INSERT INTO sequence_default.seq_test_10 VALUES (1); ERROR: relation "seq_11" does not exist \c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +-- Check worker_nextval and setval precautions for int and smallint column defaults +-- For details see issue #5126 and PR #5254 +-- https://github.com/citusdata/citus/issues/5126 +CREATE SEQUENCE seq_12; +CREATE SEQUENCE seq_13; +CREATE SEQUENCE seq_14; +CREATE TABLE seq_test_12(col0 text, col1 smallint DEFAULT nextval('seq_12'), + col2 int DEFAULT nextval('seq_13'), + col3 bigint DEFAULT nextval('seq_14')); +SELECT create_distributed_table('seq_test_12', 'col0'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO seq_test_12 VALUES ('hello0') RETURNING *; + col0 | col1 | col2 | col3 +--------------------------------------------------------------------- + hello0 | 1 | 1 | 1 +(1 row) + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; + table_name | column_name | data_type | column_default +--------------------------------------------------------------------- + seq_test_12 | col0 | text | + seq_test_12 | col1 | smallint | worker_nextval('seq_12'::regclass) + seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass) + seq_test_12 | col3 | bigint | nextval('seq_14'::regclass) +(4 rows) + +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *; +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_12'); +ERROR: nextval: reached maximum value of sequence "seq_12" (32767) +SELECT nextval('seq_13'); +ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647) +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_14'); + nextval +--------------------------------------------------------------------- + 281474976710657 +(1 row) + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +TRUNCATE seq_test_12; +ALTER TABLE seq_test_12 DROP COLUMN col1; +ALTER TABLE seq_test_12 DROP COLUMN col2; +ALTER TABLE seq_test_12 DROP COLUMN col3; +DROP SEQUENCE seq_12, seq_13, seq_14; +CREATE SEQUENCE seq_12; +CREATE SEQUENCE seq_13; +CREATE SEQUENCE seq_14; +ALTER TABLE seq_test_12 ADD COLUMN col1 smallint DEFAULT nextval('seq_14'); +ALTER TABLE seq_test_12 ADD COLUMN col2 int DEFAULT nextval('seq_13'); +ALTER TABLE seq_test_12 ADD COLUMN col3 bigint DEFAULT nextval('seq_12'); +ALTER TABLE seq_test_12 ADD COLUMN col4 smallint; +ALTER TABLE seq_test_12 ALTER COLUMN col4 SET DEFAULT nextval('seq_14'); +ALTER TABLE seq_test_12 ADD COLUMN col5 int; +ALTER TABLE seq_test_12 ALTER COLUMN col5 SET DEFAULT nextval('seq_13'); +ALTER TABLE seq_test_12 ADD COLUMN col6 bigint; +ALTER TABLE seq_test_12 ALTER COLUMN col6 SET DEFAULT nextval('seq_12'); +INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *; + col0 | col1 | col2 | col3 | col4 | col5 | col6 +--------------------------------------------------------------------- + hello1 | 1 | 1 | 1 | 2 | 2 | 2 +(1 row) + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; + table_name | column_name | data_type | column_default +--------------------------------------------------------------------- + seq_test_12 | col0 | text | + seq_test_12 | col1 | smallint | worker_nextval('seq_14'::regclass) + seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass) + seq_test_12 | col3 | bigint | nextval('seq_12'::regclass) + seq_test_12 | col4 | smallint | worker_nextval('seq_14'::regclass) + seq_test_12 | col5 | integer | worker_nextval('seq_13'::regclass) + seq_test_12 | col6 | bigint | nextval('seq_12'::regclass) +(7 rows) + +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_12'); + nextval +--------------------------------------------------------------------- + 281474976710657 +(1 row) + +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_13'); +ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647) +SELECT nextval('seq_14'); +ERROR: nextval: reached maximum value of sequence "seq_14" (32767) +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT undistribute_table('seq_test_12'); +NOTICE: creating a new table for sequence_default.seq_test_12 +NOTICE: moving the data of sequence_default.seq_test_12 +NOTICE: dropping the old sequence_default.seq_test_12 +NOTICE: renaming the new table to sequence_default.seq_test_12 + undistribute_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('seq_test_12', 'col0'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$sequence_default.seq_test_12$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; + col0 | col1 | col2 | col3 | col4 | col5 | col6 +--------------------------------------------------------------------- + hello2 | 3 | 3 | 3 | 4 | 4 | 4 +(1 row) + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; + table_name | column_name | data_type | column_default +--------------------------------------------------------------------- + seq_test_12 | col0 | text | + seq_test_12 | col1 | smallint | worker_nextval('seq_14'::regclass) + seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass) + seq_test_12 | col3 | bigint | nextval('seq_12'::regclass) + seq_test_12 | col4 | smallint | worker_nextval('seq_14'::regclass) + seq_test_12 | col5 | integer | worker_nextval('seq_13'::regclass) + seq_test_12 | col6 | bigint | nextval('seq_12'::regclass) +(7 rows) + +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; +ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_12'); + nextval +--------------------------------------------------------------------- + 281474976710658 +(1 row) + +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_13'); +ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647) +SELECT nextval('seq_14'); +ERROR: nextval: reached maximum value of sequence "seq_14" (32767) +\c - - - :master_port -- clean up DROP TABLE sequence_default.seq_test_7_par; SET client_min_messages TO error; -- suppress cascading objects dropping diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 9f48ba9e6..bb2efb550 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -207,6 +207,7 @@ ORDER BY 1; function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray) function worker_last_saved_explain_analyze() function worker_merge_files_into_table(bigint,integer,text[],text[]) + function worker_nextval(regclass) function worker_partial_agg(oid,anyelement) function worker_partial_agg_ffunc(internal) function worker_partial_agg_sfunc(internal,oid,anyelement) @@ -257,5 +258,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(241 rows) +(242 rows) diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index ab5247dd4..a1e69b2a3 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -452,6 +452,7 @@ SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); INSERT INTO mx_table_with_small_sequence VALUES (0); \c - - - :worker_1_port +-- Insert doesn't work because the defaults are of type int and smallint INSERT INTO mx_table_with_small_sequence VALUES (1), (3); \c - - - :master_port @@ -460,6 +461,7 @@ SET citus.shard_replication_factor TO 1; -- Create an MX table with (BIGSERIAL) sequences CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL); SELECT create_distributed_table('mx_table_with_sequence', 'a'); +INSERT INTO mx_table_with_sequence VALUES (0); SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_sequence'::regclass; \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq @@ -470,6 +472,9 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_ \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq +-- Insert works because the defaults are of type bigint +INSERT INTO mx_table_with_sequence VALUES (1), (3); + -- check that pg_depend records exist on the worker SELECT refobjsubid FROM pg_depend WHERE objid = 'mx_table_with_sequence_b_seq'::regclass AND refobjid = 'mx_table_with_sequence'::regclass; @@ -493,7 +498,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_ SELECT nextval('mx_table_with_sequence_b_seq'); SELECT nextval('mx_table_with_sequence_c_seq'); +-- Insert doesn't work because the defaults are of type int and smallint INSERT INTO mx_table_with_small_sequence VALUES (2), (4); +-- Insert works because the defaults are of type bigint +INSERT INTO mx_table_with_sequence VALUES (2), (4); -- Check that dropping the mx table with sequences works as expected \c - - - :master_port @@ -501,6 +509,9 @@ INSERT INTO mx_table_with_small_sequence VALUES (2), (4); -- check our small sequence values SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c; +--check our bigint sequence values +SELECT a, b, c FROM mx_table_with_sequence ORDER BY a,b,c; + -- Check that dropping the mx table with sequences works as expected DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence; \d mx_table_with_sequence diff --git a/src/test/regress/sql/multi_sequence_default.sql b/src/test/regress/sql/multi_sequence_default.sql index 405ffac37..4b0ac1ca3 100644 --- a/src/test/regress/sql/multi_sequence_default.sql +++ b/src/test/regress/sql/multi_sequence_default.sql @@ -90,17 +90,18 @@ ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint; CREATE TABLE seq_test_4 (x int, y int); SELECT create_distributed_table('seq_test_4','x'); CREATE SEQUENCE seq_4; -ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4'); +ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4'); SELECT start_metadata_sync_to_node('localhost', :worker_1_port); DROP SEQUENCE seq_4 CASCADE; TRUNCATE seq_test_4; CREATE SEQUENCE seq_4; -ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4'); +ALTER TABLE seq_test_4 ADD COLUMN b bigint DEFAULT nextval('seq_4'); -- on worker it should generate high sequence number \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *; --- check that we have can properly insert to tables from before metadata sync +-- check that we have can't insert to tables from before metadata sync +-- seq_test_0 and seq_test_0_local_table have int and smallint column defaults INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *; INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *; @@ -119,7 +120,7 @@ SELECT create_distributed_table('seq_test_1','x'); ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1'); -- type is changed to int \d seq_1 --- check insertion is within int bounds in the worker +-- check insertion doesn't work in the worker because type is int \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_1 values (1, 2) RETURNING *; \c - - - :master_port @@ -137,7 +138,7 @@ SELECT citus_add_local_table_to_metadata('seq_test_1_local_table'); ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table'); -- type is changed to int \d seq_1_local_table --- check insertion is within int bounds in the worker +-- check insertion doesn't work in the worker because type is int \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *; \c - - - :master_port @@ -218,7 +219,7 @@ CREATE TABLE seq_test_5 (x int, y int); SELECT create_distributed_table('seq_test_5','x'); CREATE SCHEMA sequence_default_1; CREATE SEQUENCE sequence_default_1.seq_5; -ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5'); +ALTER TABLE seq_test_5 ADD COLUMN a bigint DEFAULT nextval('sequence_default_1.seq_5'); DROP SCHEMA sequence_default_1 CASCADE; -- sequence is gone from coordinator INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *; @@ -372,8 +373,96 @@ SELECT create_distributed_table('seq_test_11', 'col1'); \c - - - :worker_1_port INSERT INTO sequence_default.seq_test_10 VALUES (1); \c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +-- Check worker_nextval and setval precautions for int and smallint column defaults +-- For details see issue #5126 and PR #5254 +-- https://github.com/citusdata/citus/issues/5126 +CREATE SEQUENCE seq_12; +CREATE SEQUENCE seq_13; +CREATE SEQUENCE seq_14; +CREATE TABLE seq_test_12(col0 text, col1 smallint DEFAULT nextval('seq_12'), + col2 int DEFAULT nextval('seq_13'), + col3 bigint DEFAULT nextval('seq_14')); +SELECT create_distributed_table('seq_test_12', 'col0'); +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +INSERT INTO seq_test_12 VALUES ('hello0') RETURNING *; + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *; +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_12'); +SELECT nextval('seq_13'); +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_14'); + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +TRUNCATE seq_test_12; +ALTER TABLE seq_test_12 DROP COLUMN col1; +ALTER TABLE seq_test_12 DROP COLUMN col2; +ALTER TABLE seq_test_12 DROP COLUMN col3; +DROP SEQUENCE seq_12, seq_13, seq_14; +CREATE SEQUENCE seq_12; +CREATE SEQUENCE seq_13; +CREATE SEQUENCE seq_14; +ALTER TABLE seq_test_12 ADD COLUMN col1 smallint DEFAULT nextval('seq_14'); +ALTER TABLE seq_test_12 ADD COLUMN col2 int DEFAULT nextval('seq_13'); +ALTER TABLE seq_test_12 ADD COLUMN col3 bigint DEFAULT nextval('seq_12'); +ALTER TABLE seq_test_12 ADD COLUMN col4 smallint; +ALTER TABLE seq_test_12 ALTER COLUMN col4 SET DEFAULT nextval('seq_14'); +ALTER TABLE seq_test_12 ADD COLUMN col5 int; +ALTER TABLE seq_test_12 ALTER COLUMN col5 SET DEFAULT nextval('seq_13'); +ALTER TABLE seq_test_12 ADD COLUMN col6 bigint; +ALTER TABLE seq_test_12 ALTER COLUMN col6 SET DEFAULT nextval('seq_12'); +INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *; + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_12'); +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_13'); +SELECT nextval('seq_14'); + +\c - - - :master_port +SET citus.shard_replication_factor TO 1; +SET search_path = sequence_default, public; +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT undistribute_table('seq_test_12'); +SELECT create_distributed_table('seq_test_12', 'col0'); +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; + +\c - - - :worker_1_port +SET search_path = sequence_default, public; +-- we should see worker_nextval for int and smallint columns +SELECT table_name, column_name, data_type, column_default FROM information_schema.columns +WHERE table_name = 'seq_test_12' ORDER BY column_name; +-- insertion from worker should fail +INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *; +-- nextval from worker should work for bigint sequences +SELECT nextval('seq_12'); +-- nextval from worker should fail for int and smallint sequences +SELECT nextval('seq_13'); +SELECT nextval('seq_14'); + +\c - - - :master_port + -- clean up DROP TABLE sequence_default.seq_test_7_par; SET client_min_messages TO error; -- suppress cascading objects dropping