mirror of https://github.com/citusdata/citus.git
Fixes bug about int and smallint sequences on MX (#5254)
* Introduce worker_nextval udf for int&smallint column defaults * Fix current tests and add new ones for worker_nextvalpull/5263/head
parent
80a44a7b93
commit
a69abe3be0
|
@ -539,7 +539,9 @@ ConvertTable(TableConversionState *con)
|
|||
}
|
||||
char *newAccessMethod = con->accessMethod ? con->accessMethod :
|
||||
con->originalAccessMethod;
|
||||
List *preLoadCommands = GetPreLoadTableCreationCommands(con->relationId, true,
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
|
||||
List *preLoadCommands = GetPreLoadTableCreationCommands(con->relationId,
|
||||
includeSequenceDefaults,
|
||||
newAccessMethod);
|
||||
|
||||
if (con->accessMethod && strcmp(con->accessMethod, "columnar") == 0)
|
||||
|
|
|
@ -463,7 +463,7 @@ GetShellTableDDLEventsForCitusLocalTable(Oid relationId)
|
|||
* Include DEFAULT clauses for columns getting their default values from
|
||||
* a sequence.
|
||||
*/
|
||||
bool includeSequenceDefaults = true;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
|
||||
|
||||
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
|
||||
includeSequenceDefaults);
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "lib/stringinfo.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "parser/parse_expr.h"
|
||||
#include "parser/parse_type.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
@ -95,8 +96,11 @@ static void SetInterShardDDLTaskPlacementList(Task *task,
|
|||
static void SetInterShardDDLTaskRelationShardList(Task *task,
|
||||
ShardInterval *leftShardInterval,
|
||||
ShardInterval *rightShardInterval);
|
||||
static Oid GetSequenceOid(Oid relationId, AttrNumber attnum);
|
||||
static Oid get_attrdef_oid(Oid relationId, AttrNumber attnum);
|
||||
static char * GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
|
||||
char *colname);
|
||||
static char * GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId,
|
||||
char *colname, TypeName *typeName);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -654,6 +658,14 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
*/
|
||||
bool deparseAT = false;
|
||||
bool propagateCommandToWorkers = true;
|
||||
|
||||
/*
|
||||
* Sometimes we want to run a different DDL Command string in MX workers
|
||||
* For example, in cases where worker_nextval should be used instead
|
||||
* of nextval() in column defaults with type int and smallint
|
||||
*/
|
||||
bool useInitialDDLCommandString = true;
|
||||
|
||||
AlterTableStmt *newStmt = copyObject(alterTableStatement);
|
||||
|
||||
AlterTableCmd *newCmd = makeNode(AlterTableCmd);
|
||||
|
@ -763,6 +775,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
if (contain_nextval_expression_walker(expr, NULL))
|
||||
{
|
||||
deparseAT = true;
|
||||
useInitialDDLCommandString = false;
|
||||
|
||||
/* the new column definition will have no constraint */
|
||||
ColumnDef *newColDef = copyObject(columnDefinition);
|
||||
|
@ -832,6 +845,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
if (contain_nextval_expression_walker(expr, NULL))
|
||||
{
|
||||
propagateCommandToWorkers = false;
|
||||
useInitialDDLCommandString = false;
|
||||
}
|
||||
}
|
||||
else if (alterTableType == AT_AttachPartition)
|
||||
|
@ -928,7 +942,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
|||
sqlForTaskList = DeparseTreeNode((Node *) newStmt);
|
||||
}
|
||||
|
||||
ddlJob->commandString = alterTableCommand;
|
||||
ddlJob->commandString = useInitialDDLCommandString ? alterTableCommand : NULL;
|
||||
|
||||
if (OidIsValid(rightRelationId))
|
||||
{
|
||||
|
@ -1640,6 +1654,11 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||
}
|
||||
|
||||
/* for the new sequences coming with this ALTER TABLE statement */
|
||||
bool needMetadataSyncForNewSequences = false;
|
||||
|
||||
char *alterTableDefaultNextvalCmd = NULL;
|
||||
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
AlterTableCmd *command = NULL;
|
||||
foreach_ptr(command, commandList)
|
||||
|
@ -1728,8 +1747,16 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateDependencies(
|
||||
seqOid);
|
||||
alterTableDefaultNextvalCmd =
|
||||
GetAddColumnWithNextvalDefaultCmd(seqOid,
|
||||
relationId,
|
||||
columnDefinition
|
||||
->colname,
|
||||
columnDefinition
|
||||
->typeName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1761,15 +1788,17 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateDependencies(seqOid);
|
||||
alterTableDefaultNextvalCmd = GetAlterColumnWithNextvalDefaultCmd(
|
||||
seqOid, relationId, command->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* for the new sequences coming with this ALTER TABLE statement */
|
||||
if (ShouldSyncTableMetadata(relationId) && ClusterHasKnownMetadataWorkers())
|
||||
if (needMetadataSyncForNewSequences)
|
||||
{
|
||||
List *sequenceCommandList = NIL;
|
||||
|
||||
|
@ -1787,6 +1816,16 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
SendCommandToWorkersWithMetadata(sequenceCommand);
|
||||
}
|
||||
|
||||
/*
|
||||
* It's easy to retrieve the sequence id to create the proper commands
|
||||
* in postprocess, after the dependency between the sequence and the table
|
||||
* has been created. We already return ddlJobs in PreprocessAlterTableStmt,
|
||||
* hence we can't return ddlJobs in PostprocessAlterTableStmt.
|
||||
* That's why we execute the following here instead of
|
||||
* in ExecuteDistributedDDLJob
|
||||
*/
|
||||
SendCommandToWorkersWithMetadata(alterTableDefaultNextvalCmd);
|
||||
|
||||
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
||||
}
|
||||
}
|
||||
|
@ -1797,7 +1836,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
* of the attribute with given attnum of the given table relationId
|
||||
* If there is no sequence used it returns InvalidOid.
|
||||
*/
|
||||
static Oid
|
||||
Oid
|
||||
GetSequenceOid(Oid relationId, AttrNumber attnum)
|
||||
{
|
||||
/* get attrdefoid from the given relationId and attnum */
|
||||
|
@ -1821,15 +1860,10 @@ GetSequenceOid(Oid relationId, AttrNumber attnum)
|
|||
if (list_length(sequencesFromAttrDef) > 1)
|
||||
{
|
||||
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
ereport(ERROR, (errmsg(
|
||||
"More than one sequence in a column default"
|
||||
" is not supported for adding local tables to metadata")));
|
||||
}
|
||||
ereport(ERROR, (errmsg(
|
||||
"More than one sequence in a column default"
|
||||
" is not supported for distribution")));
|
||||
" is not supported for distribution "
|
||||
"or for adding local tables to metadata")));
|
||||
}
|
||||
|
||||
return lfirst_oid(list_head(sequencesFromAttrDef));
|
||||
|
@ -1886,6 +1920,83 @@ get_attrdef_oid(Oid relationId, AttrNumber attnum)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAlterColumnWithNextvalDefaultCmd returns a string representing:
|
||||
* ALTER TABLE ALTER COLUMN .. SET DEFAULT nextval()
|
||||
* If sequence type is not bigint, we use worker_nextval() instead of nextval().
|
||||
*/
|
||||
static char *
|
||||
GetAlterColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname)
|
||||
{
|
||||
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
|
||||
char *nextvalFunctionName = "nextval";
|
||||
bool useWorkerNextval = (pg_get_sequencedef(sequenceOid)->seqtypid != INT8OID);
|
||||
if (useWorkerNextval)
|
||||
{
|
||||
/*
|
||||
* We use worker_nextval for int and smallint types.
|
||||
* Check issue #5126 and PR #5254 for details.
|
||||
* https://github.com/citusdata/citus/issues/5126
|
||||
*/
|
||||
nextvalFunctionName = "worker_nextval";
|
||||
}
|
||||
|
||||
StringInfoData str = { 0 };
|
||||
initStringInfo(&str);
|
||||
appendStringInfo(&str, "ALTER TABLE %s ALTER COLUMN %s "
|
||||
"SET DEFAULT %s(%s::regclass)",
|
||||
qualifiedRelationName, colname,
|
||||
quote_qualified_identifier("pg_catalog", nextvalFunctionName),
|
||||
quote_literal_cstr(qualifiedSequenceName));
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAddColumnWithNextvalDefaultCmd returns a string representing:
|
||||
* ALTER TABLE ADD COLUMN .. DEFAULT nextval()
|
||||
* If sequence type is not bigint, we use worker_nextval() instead of nextval().
|
||||
*/
|
||||
static char *
|
||||
GetAddColumnWithNextvalDefaultCmd(Oid sequenceOid, Oid relationId, char *colname,
|
||||
TypeName *typeName)
|
||||
{
|
||||
char *qualifiedSequenceName = generate_qualified_relation_name(sequenceOid);
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
|
||||
char *nextvalFunctionName = "nextval";
|
||||
bool useWorkerNextval = (pg_get_sequencedef(sequenceOid)->seqtypid != INT8OID);
|
||||
if (useWorkerNextval)
|
||||
{
|
||||
/*
|
||||
* We use worker_nextval for int and smallint types.
|
||||
* Check issue #5126 and PR #5254 for details.
|
||||
* https://github.com/citusdata/citus/issues/5126
|
||||
*/
|
||||
nextvalFunctionName = "worker_nextval";
|
||||
}
|
||||
|
||||
int32 typmod = 0;
|
||||
Oid typeOid = InvalidOid;
|
||||
bits16 formatFlags = FORMAT_TYPE_TYPEMOD_GIVEN | FORMAT_TYPE_FORCE_QUALIFY;
|
||||
typenameTypeIdAndMod(NULL, typeName, &typeOid, &typmod);
|
||||
|
||||
StringInfoData str = { 0 };
|
||||
initStringInfo(&str);
|
||||
appendStringInfo(&str,
|
||||
"ALTER TABLE %s ADD COLUMN %s %s "
|
||||
"DEFAULT %s(%s::regclass)", qualifiedRelationName, colname,
|
||||
format_type_extended(typeOid, typmod, formatFlags),
|
||||
quote_qualified_identifier("pg_catalog", nextvalFunctionName),
|
||||
quote_literal_cstr(qualifiedSequenceName));
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command,
|
||||
Constraint *constraint)
|
||||
|
|
|
@ -891,7 +891,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
SendCommandToWorkersWithMetadata(setSearchPathCommand);
|
||||
}
|
||||
|
||||
SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
|
||||
if (ddlJob->commandString != NULL)
|
||||
{
|
||||
SendCommandToWorkersWithMetadata((char *) ddlJob->commandString);
|
||||
}
|
||||
}
|
||||
|
||||
ExecuteUtilityTaskList(ddlJob->taskList, localExecutionSupported);
|
||||
|
|
|
@ -251,12 +251,14 @@ pg_get_sequencedef(Oid sequenceRelationId)
|
|||
* definition includes table's schema, default column values, not null and check
|
||||
* constraints. The definition does not include constraints that trigger index
|
||||
* creations; specifically, unique and primary key constraints are excluded.
|
||||
* When the flag includeSequenceDefaults is set, the function also creates
|
||||
* When includeSequenceDefaults is NEXTVAL_SEQUENCE_DEFAULTS, the function also creates
|
||||
* DEFAULT clauses for columns getting their default values from a sequence.
|
||||
* When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT
|
||||
* clause using worker_nextval('sequence') and not nextval('sequence')
|
||||
*/
|
||||
char *
|
||||
pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
|
||||
char *accessMethod)
|
||||
pg_get_tableschemadef_string(Oid tableRelationId, IncludeSequenceDefaults
|
||||
includeSequenceDefaults, char *accessMethod)
|
||||
{
|
||||
bool firstAttributePrinted = false;
|
||||
AttrNumber defaultValueIndex = 0;
|
||||
|
@ -374,7 +376,26 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults,
|
|||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(&buffer, " DEFAULT %s", defaultString);
|
||||
Oid seqOid = GetSequenceOid(tableRelationId, defaultValue->adnum);
|
||||
if (includeSequenceDefaults == WORKER_NEXTVAL_SEQUENCE_DEFAULTS &&
|
||||
seqOid != InvalidOid &&
|
||||
pg_get_sequencedef(seqOid)->seqtypid != INT8OID)
|
||||
{
|
||||
/*
|
||||
* We use worker_nextval for int and smallint types.
|
||||
* Check issue #5126 and PR #5254 for details.
|
||||
* https://github.com/citusdata/citus/issues/5126
|
||||
*/
|
||||
char *sequenceName = generate_qualified_relation_name(
|
||||
seqOid);
|
||||
appendStringInfo(&buffer,
|
||||
" DEFAULT worker_nextval(%s::regclass)",
|
||||
quote_literal_cstr(sequenceName));
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(&buffer, " DEFAULT %s", defaultString);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -497,7 +497,7 @@ MetadataCreateCommands(void)
|
|||
List *propagatedTableList = NIL;
|
||||
bool includeNodesFromOtherClusters = true;
|
||||
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
|
||||
bool includeSequenceDefaults = true;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
|
||||
|
||||
/* make sure we have deterministic output for our tests */
|
||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||
|
@ -651,7 +651,7 @@ GetDistributedTableDDLEvents(Oid relationId)
|
|||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
|
||||
List *commandList = NIL;
|
||||
bool includeSequenceDefaults = true;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
|
||||
|
||||
/* if the table is owned by an extension we only propagate pg_dist_* records */
|
||||
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
|
||||
|
@ -1287,14 +1287,10 @@ GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
|
|||
/* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */
|
||||
if (list_length(sequencesFromAttrDef) > 1)
|
||||
{
|
||||
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
ereport(ERROR, (errmsg(
|
||||
"More than one sequence in a column default"
|
||||
" is not supported for adding local tables to metadata")));
|
||||
}
|
||||
ereport(ERROR, (errmsg("More than one sequence in a column default"
|
||||
" is not supported for distribution")));
|
||||
ereport(ERROR, (errmsg(
|
||||
"More than one sequence in a column default"
|
||||
" is not supported for distribution "
|
||||
"or for adding local tables to metadata")));
|
||||
}
|
||||
|
||||
if (list_length(sequencesFromAttrDef) == 1)
|
||||
|
|
|
@ -216,7 +216,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
|||
{
|
||||
text *relationName = PG_GETARG_TEXT_P(0);
|
||||
Oid relationId = ResolveRelationId(relationName, false);
|
||||
bool includeSequenceDefaults = true;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NEXTVAL_SEQUENCE_DEFAULTS;
|
||||
|
||||
|
||||
/* create a function context for cross-call persistence */
|
||||
|
@ -533,16 +533,19 @@ ResolveRelationId(text *relationName, bool missingOk)
|
|||
|
||||
|
||||
/*
|
||||
* GetFullTableCreationCommands takes in a relationId, includeSequenceDefaults flag,
|
||||
* GetFullTableCreationCommands takes in a relationId, includeSequenceDefaults,
|
||||
* and returns the list of DDL commands needed to reconstruct the relation.
|
||||
* When the flag includeSequenceDefaults is set, the function also creates
|
||||
* When includeSequenceDefaults is NEXTVAL_SEQUENCE_DEFAULTS, the function also creates
|
||||
* DEFAULT clauses for columns getting their default values from a sequence.
|
||||
* When it's WORKER_NEXTVAL_SEQUENCE_DEFAULTS, the function creates the DEFAULT
|
||||
* clause using worker_nextval('sequence') and not nextval('sequence')
|
||||
* These DDL commands are all palloced; and include the table's schema
|
||||
* definition, optional column storage and statistics definitions, and index
|
||||
* constraint and trigger definitions.
|
||||
*/
|
||||
List *
|
||||
GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
|
||||
GetFullTableCreationCommands(Oid relationId, IncludeSequenceDefaults
|
||||
includeSequenceDefaults)
|
||||
{
|
||||
List *tableDDLEventList = NIL;
|
||||
|
||||
|
@ -651,7 +654,8 @@ GetTableReplicaIdentityCommand(Oid relationId)
|
|||
* to facilitate faster data load.
|
||||
*/
|
||||
List *
|
||||
GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults,
|
||||
GetPreLoadTableCreationCommands(Oid relationId,
|
||||
IncludeSequenceDefaults includeSequenceDefaults,
|
||||
char *accessMethod)
|
||||
{
|
||||
List *tableDDLEventList = NIL;
|
||||
|
|
|
@ -1453,7 +1453,7 @@ RecreateTableDDLCommandList(Oid relationId)
|
|||
|
||||
StringInfo dropCommand = makeStringInfo();
|
||||
char relationKind = get_rel_relkind(relationId);
|
||||
bool includeSequenceDefaults = false;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
|
||||
|
||||
/* build appropriate DROP command based on relation kind */
|
||||
if (RegularTable(relationId))
|
||||
|
|
|
@ -455,7 +455,7 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
|
|||
int placementsCreated = 0;
|
||||
List *foreignConstraintCommandList =
|
||||
GetReferencingForeignConstaintCommands(relationId);
|
||||
bool includeSequenceDefaults = false;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
|
||||
List *ddlCommandList = GetFullTableCreationCommands(relationId,
|
||||
includeSequenceDefaults);
|
||||
uint32 connectionFlag = FOR_DDL;
|
||||
|
@ -568,7 +568,7 @@ void
|
|||
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
||||
bool useExclusiveConnection, bool colocatedShard)
|
||||
{
|
||||
bool includeSequenceDefaults = false;
|
||||
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
|
||||
List *ddlCommandList = GetFullTableCreationCommands(distributedRelationId,
|
||||
includeSequenceDefaults);
|
||||
List *foreignConstraintCommandList =
|
||||
|
|
|
@ -19,6 +19,7 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
|
|||
#include "udfs/create_time_partitions/10.2-1.sql"
|
||||
#include "udfs/drop_old_time_partitions/10.2-1.sql"
|
||||
#include "udfs/get_missing_time_partition_ranges/10.2-1.sql"
|
||||
#include "udfs/worker_nextval/10.2-1.sql"
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_drop_all_shards(regclass, text, text);
|
||||
CREATE FUNCTION pg_catalog.citus_drop_all_shards(logicalrelid regclass,
|
||||
|
|
|
@ -19,6 +19,7 @@ DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint);
|
|||
DROP FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, integer);
|
||||
DROP FUNCTION pg_catalog.create_time_partitions(regclass, interval, timestamp with time zone, timestamp with time zone);
|
||||
DROP FUNCTION pg_catalog.get_missing_time_partition_ranges(regclass, interval, timestamp with time zone, timestamp with time zone);
|
||||
DROP FUNCTION pg_catalog.worker_nextval(regclass);
|
||||
|
||||
DROP PROCEDURE pg_catalog.drop_old_time_partitions(regclass, timestamptz);
|
||||
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.worker_nextval(sequence regclass)
|
||||
RETURNS int
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_nextval$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_nextval(regclass)
|
||||
IS 'calculates nextval() for column defaults of type int or smallint';
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.worker_nextval(sequence regclass)
|
||||
RETURNS int
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_nextval$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_nextval(regclass)
|
||||
IS 'calculates nextval() for column defaults of type int or smallint';
|
|
@ -77,6 +77,7 @@ PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command);
|
|||
PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command);
|
||||
PG_FUNCTION_INFO_V1(worker_apply_sequence_command);
|
||||
PG_FUNCTION_INFO_V1(worker_append_table_to_shard);
|
||||
PG_FUNCTION_INFO_V1(worker_nextval);
|
||||
|
||||
/*
|
||||
* Following UDFs are stub functions, you can check their comments for more
|
||||
|
@ -700,6 +701,21 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* worker_nextval calculates nextval() in worker nodes
|
||||
* for int and smallint column default types
|
||||
* TODO: not error out but get the proper nextval()
|
||||
*/
|
||||
Datum
|
||||
worker_nextval(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ereport(ERROR, (errmsg(
|
||||
"nextval(sequence) calls in worker nodes are not supported"
|
||||
" for column defaults of type int or smallint")));
|
||||
PG_RETURN_INT32(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* check_log_statement is a copy of postgres' check_log_statement function and
|
||||
* returns whether a statement ought to be logged or not.
|
||||
|
@ -755,14 +771,20 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
|
|||
int64 sequenceMinValue = sequenceData->seqmin;
|
||||
int valueBitLength = 48;
|
||||
|
||||
/* for smaller types, put the group ID into the first 4 bits */
|
||||
if (sequenceTypeId == INT4OID)
|
||||
/*
|
||||
* For int and smallint, we don't currently support insertion from workers
|
||||
* Check issue #5126 and PR #5254 for details.
|
||||
* https://github.com/citusdata/citus/issues/5126
|
||||
* So, no need to alter sequence min/max for now
|
||||
* We call setval(sequence, maxvalue) such that manually using
|
||||
* nextval(sequence) in the workers will error out as well.
|
||||
*/
|
||||
if (sequenceTypeId != INT8OID)
|
||||
{
|
||||
valueBitLength = 28;
|
||||
}
|
||||
else if (sequenceTypeId == INT2OID)
|
||||
{
|
||||
valueBitLength = 12;
|
||||
DirectFunctionCall2(setval_oid,
|
||||
ObjectIdGetDatum(sequenceId),
|
||||
Int64GetDatum(sequenceMaxValue));
|
||||
return;
|
||||
}
|
||||
|
||||
/* calculate min/max values that the sequence can generate in this worker */
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "catalog/pg_sequence.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "nodes/parsenodes.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
@ -31,7 +32,8 @@ extern Oid get_extension_schema(Oid ext_oid);
|
|||
extern char * pg_get_serverdef_string(Oid tableRelationId);
|
||||
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
|
||||
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
|
||||
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation,
|
||||
extern char * pg_get_tableschemadef_string(Oid tableRelationId,
|
||||
IncludeSequenceDefaults includeSequenceDefaults,
|
||||
char *accessMethod);
|
||||
extern void EnsureRelationKindSupported(Oid relationId);
|
||||
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
|
||||
|
|
|
@ -422,6 +422,7 @@ extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMet
|
|||
extern ObjectAddress AlterTableSchemaStmtObjectAddress(Node *stmt,
|
||||
bool missing_ok);
|
||||
extern List * MakeNameListFromRangeVar(const RangeVar *rel);
|
||||
extern Oid GetSequenceOid(Oid relationId, AttrNumber attnum);
|
||||
|
||||
|
||||
/* truncate.c - forward declarations */
|
||||
|
|
|
@ -111,6 +111,25 @@ typedef enum IndexDefinitionDeparseFlags
|
|||
} IndexDefinitionDeparseFlags;
|
||||
|
||||
|
||||
/*
|
||||
* IncludeSequenceDefaults decides on inclusion of DEFAULT clauses for columns
|
||||
* getting their default values from a sequence when creating the definition
|
||||
* of a table.
|
||||
*/
|
||||
typedef enum IncludeSequenceDefaults
|
||||
{
|
||||
NO_SEQUENCE_DEFAULTS = 0, /* don't include sequence defaults */
|
||||
NEXTVAL_SEQUENCE_DEFAULTS = 1, /* include sequence defaults */
|
||||
|
||||
/*
|
||||
* Include sequence defaults, but use worker_nextval instead of nextval
|
||||
* when the default will be called in worker node, and the column type is
|
||||
* int or smallint.
|
||||
*/
|
||||
WORKER_NEXTVAL_SEQUENCE_DEFAULTS = 2,
|
||||
} IncludeSequenceDefaults;
|
||||
|
||||
|
||||
struct TableDDLCommand;
|
||||
typedef struct TableDDLCommand TableDDLCommand;
|
||||
typedef char *(*TableDDLFunction)(void *context);
|
||||
|
@ -193,11 +212,12 @@ extern bool CStoreTable(Oid relationId);
|
|||
extern uint64 GetNextShardId(void);
|
||||
extern uint64 GetNextPlacementId(void);
|
||||
extern Oid ResolveRelationId(text *relationName, bool missingOk);
|
||||
extern List * GetFullTableCreationCommands(Oid relationId, bool includeSequenceDefaults);
|
||||
extern List * GetFullTableCreationCommands(Oid relationId,
|
||||
IncludeSequenceDefaults includeSequenceDefaults);
|
||||
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,
|
||||
bool includeReplicaIdentity);
|
||||
extern List * GetPreLoadTableCreationCommands(Oid relationId,
|
||||
bool includeSequenceDefaults,
|
||||
extern List * GetPreLoadTableCreationCommands(Oid relationId, IncludeSequenceDefaults
|
||||
includeSequenceDefaults,
|
||||
char *accessMethod);
|
||||
extern List * GetTableIndexAndConstraintCommands(Oid relationId, int indexFlags);
|
||||
extern List * GetTableIndexAndConstraintCommandsExcludingReplicaIdentity(Oid relationId,
|
||||
|
|
|
@ -162,5 +162,7 @@ extern Datum worker_find_block_local_path(PG_FUNCTION_ARGS);
|
|||
/* Function declaration for calculating hashed value */
|
||||
extern Datum worker_hash(PG_FUNCTION_ARGS);
|
||||
|
||||
/* Function declaration for calculating nextval() in worker */
|
||||
extern Datum worker_nextval(PG_FUNCTION_ARGS);
|
||||
|
||||
#endif /* WORKER_PROTOCOL_H */
|
||||
|
|
|
@ -810,7 +810,8 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function drop_old_time_partitions(regclass,timestamp with time zone)
|
||||
| function get_missing_time_partition_ranges(regclass,interval,timestamp with time zone,timestamp with time zone) TABLE(partition_name text, range_from_value text, range_to_value text)
|
||||
| function stop_metadata_sync_to_node(text,integer,boolean) void
|
||||
(15 rows)
|
||||
| function worker_nextval(regclass) integer
|
||||
(16 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -985,7 +985,9 @@ SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
|||
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (0);
|
||||
\c - - - :worker_1_port
|
||||
-- Insert doesn't work because the defaults are of type int and smallint
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (1), (3);
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
-- Create an MX table with (BIGSERIAL) sequences
|
||||
|
@ -996,6 +998,7 @@ SELECT create_distributed_table('mx_table_with_sequence', 'a');
|
|||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO mx_table_with_sequence VALUES (0);
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_sequence'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
|
@ -1042,6 +1045,8 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_
|
|||
public | mx_table_with_sequence_c_seq | sequence | postgres
|
||||
(1 row)
|
||||
|
||||
-- Insert works because the defaults are of type bigint
|
||||
INSERT INTO mx_table_with_sequence VALUES (1), (3);
|
||||
-- check that pg_depend records exist on the worker
|
||||
SELECT refobjsubid FROM pg_depend
|
||||
WHERE objid = 'mx_table_with_sequence_b_seq'::regclass AND refobjid = 'mx_table_with_sequence'::regclass;
|
||||
|
@ -1061,13 +1066,13 @@ WHERE objid = 'mx_table_with_sequence_c_seq'::regclass AND refobjid = 'mx_table_
|
|||
SELECT nextval('mx_table_with_sequence_b_seq');
|
||||
nextval
|
||||
---------------------------------------------------------------------
|
||||
281474976710657
|
||||
281474976710659
|
||||
(1 row)
|
||||
|
||||
SELECT nextval('mx_table_with_sequence_c_seq');
|
||||
nextval
|
||||
---------------------------------------------------------------------
|
||||
281474976710657
|
||||
281474976710659
|
||||
(1 row)
|
||||
|
||||
-- Check that adding a new metadata node sets the sequence space correctly
|
||||
|
@ -1119,7 +1124,11 @@ SELECT nextval('mx_table_with_sequence_c_seq');
|
|||
562949953421313
|
||||
(1 row)
|
||||
|
||||
-- Insert doesn't work because the defaults are of type int and smallint
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (2), (4);
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
-- Insert works because the defaults are of type bigint
|
||||
INSERT INTO mx_table_with_sequence VALUES (2), (4);
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
\c - - - :master_port
|
||||
-- check our small sequence values
|
||||
|
@ -1127,10 +1136,17 @@ SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c;
|
|||
a | b | c
|
||||
---------------------------------------------------------------------
|
||||
0 | 1 | 1
|
||||
1 | 268435457 | 4097
|
||||
2 | 536870913 | 8193
|
||||
3 | 268435458 | 4098
|
||||
4 | 536870914 | 8194
|
||||
(1 row)
|
||||
|
||||
--check our bigint sequence values
|
||||
SELECT a, b, c FROM mx_table_with_sequence ORDER BY a,b,c;
|
||||
a | b | c
|
||||
---------------------------------------------------------------------
|
||||
0 | 1 | 1
|
||||
1 | 281474976710657 | 281474976710657
|
||||
2 | 562949953421314 | 562949953421314
|
||||
3 | 281474976710658 | 281474976710658
|
||||
4 | 562949953421315 | 562949953421315
|
||||
(5 rows)
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
|
@ -1672,7 +1688,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
CREATE TABLE public.dist_table_1 (a integer)
|
||||
CREATE TABLE public.mx_ref (col_1 integer, col_2 text)
|
||||
CREATE TABLE public.test_table (id integer DEFAULT nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT nextval('public.mx_test_sequence_1'::regclass))
|
||||
CREATE TABLE public.test_table (id integer DEFAULT worker_nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT worker_nextval('public.mx_test_sequence_1'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster, shouldhaveshards) VALUES (4, 1, 'localhost', 8888, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'default', TRUE),(5, 1, 'localhost', 8889, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'second-cluster', TRUE),(1, 1, 'localhost', 57637, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default', TRUE),(7, 5, 'localhost', 57638, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default', TRUE)
|
||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's')
|
||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's')
|
||||
|
|
|
@ -184,7 +184,7 @@ SELECT create_distributed_table('seq_test_4','x');
|
|||
(1 row)
|
||||
|
||||
CREATE SEQUENCE seq_4;
|
||||
ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4');
|
||||
ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4');
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -195,28 +195,21 @@ DROP SEQUENCE seq_4 CASCADE;
|
|||
NOTICE: drop cascades to default value for column a of table seq_test_4
|
||||
TRUNCATE seq_test_4;
|
||||
CREATE SEQUENCE seq_4;
|
||||
ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4');
|
||||
ALTER TABLE seq_test_4 ADD COLUMN b bigint DEFAULT nextval('seq_4');
|
||||
-- on worker it should generate high sequence number
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *;
|
||||
x | y | a | b
|
||||
x | y | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | | 268435457
|
||||
1 | 2 | | 281474976710657
|
||||
(1 row)
|
||||
|
||||
-- check that we have can properly insert to tables from before metadata sync
|
||||
-- check that we have can't insert to tables from before metadata sync
|
||||
-- seq_test_0 and seq_test_0_local_table have int and smallint column defaults
|
||||
INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *;
|
||||
x | y | z | w00 | w01 | w10 | w11 | w20 | w21
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657
|
||||
(1 row)
|
||||
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *;
|
||||
x | y | z | w00 | w01 | w10 | w11 | w20 | w21
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 268435457 | 4097 | 4097 | 268435457 | 268435457 | 281474976710657 | 281474976710657
|
||||
(1 row)
|
||||
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
|
@ -250,14 +243,10 @@ ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1');
|
|||
---------------------------------------------------------------------
|
||||
integer | 1 | 1 | 2147483647 | 1 | no | 1
|
||||
|
||||
-- check insertion is within int bounds in the worker
|
||||
-- check insertion doesn't work in the worker because type is int
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_1 values (1, 2) RETURNING *;
|
||||
x | y | z
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 268435457
|
||||
(1 row)
|
||||
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
|
@ -291,14 +280,10 @@ ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local
|
|||
---------------------------------------------------------------------
|
||||
integer | 1 | 1 | 2147483647 | 1 | no | 1
|
||||
|
||||
-- check insertion is within int bounds in the worker
|
||||
-- check insertion doesn't work in the worker because type is int
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *;
|
||||
x | y | z
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 268435457
|
||||
(1 row)
|
||||
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
|
@ -436,7 +421,7 @@ SELECT create_distributed_table('seq_test_5','x');
|
|||
|
||||
CREATE SCHEMA sequence_default_1;
|
||||
CREATE SEQUENCE sequence_default_1.seq_5;
|
||||
ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5');
|
||||
ALTER TABLE seq_test_5 ADD COLUMN a bigint DEFAULT nextval('sequence_default_1.seq_5');
|
||||
DROP SCHEMA sequence_default_1 CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to sequence sequence_default_1.seq_5
|
||||
|
@ -451,9 +436,9 @@ INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
|
|||
-- but is still present on worker
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_5 VALUES (1, 2) RETURNING *;
|
||||
x | y | a
|
||||
x | y | a
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | 268435457
|
||||
1 | 2 | 281474976710657
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
|
@ -593,10 +578,10 @@ ERROR: cannot alter OWNED BY option of a sequence already owned by a distribute
|
|||
ALTER SEQUENCE seq_8 SET SCHEMA sequence_default_8;
|
||||
\c - - - :worker_1_port
|
||||
\d sequence_default_8.seq_8
|
||||
Sequence "sequence_default_8.seq_8"
|
||||
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
|
||||
Sequence "sequence_default_8.seq_8"
|
||||
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
|
||||
---------------------------------------------------------------------
|
||||
integer | 268435457 | 268435457 | 536870913 | 3 | yes | 10
|
||||
integer | 1 | 1 | 2147483647 | 3 | yes | 10
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
@ -611,10 +596,10 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
|||
ALTER TABLE sequence_default_8.seq_8 SET SCHEMA sequence_default;
|
||||
\c - - - :worker_1_port
|
||||
\d sequence_default.seq_8
|
||||
Sequence "sequence_default.seq_8"
|
||||
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
|
||||
Sequence "sequence_default.seq_8"
|
||||
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
|
||||
---------------------------------------------------------------------
|
||||
integer | 268435457 | 268435457 | 536870913 | 3 | yes | 10
|
||||
integer | 1 | 1 | 2147483647 | 3 | yes | 10
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
@ -638,9 +623,9 @@ CREATE SEQUENCE seq_9;
|
|||
CREATE SEQUENCE seq_10;
|
||||
CREATE TABLE seq_test_9 (x int, y int DEFAULT nextval('seq_9') - nextval('seq_10'));
|
||||
SELECT create_distributed_table('seq_test_9', 'x');
|
||||
ERROR: More than one sequence in a column default is not supported for distribution
|
||||
ERROR: More than one sequence in a column default is not supported for distribution or for adding local tables to metadata
|
||||
SELECT citus_add_local_table_to_metadata('seq_test_9');
|
||||
ERROR: More than one sequence in a column default is not supported for adding local tables to metadata
|
||||
ERROR: More than one sequence in a column default is not supported for distribution or for adding local tables to metadata
|
||||
ALTER TABLE seq_test_9 ALTER COLUMN y SET DEFAULT nextval('seq_9');
|
||||
SELECT create_distributed_table('seq_test_9', 'x');
|
||||
create_distributed_table
|
||||
|
@ -772,6 +757,200 @@ SELECT create_distributed_table('seq_test_11', 'col1');
|
|||
INSERT INTO sequence_default.seq_test_10 VALUES (1);
|
||||
ERROR: relation "seq_11" does not exist
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
NOTICE: dropping metadata on the node (localhost,57637)
|
||||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Check worker_nextval and setval precautions for int and smallint column defaults
|
||||
-- For details see issue #5126 and PR #5254
|
||||
-- https://github.com/citusdata/citus/issues/5126
|
||||
CREATE SEQUENCE seq_12;
|
||||
CREATE SEQUENCE seq_13;
|
||||
CREATE SEQUENCE seq_14;
|
||||
CREATE TABLE seq_test_12(col0 text, col1 smallint DEFAULT nextval('seq_12'),
|
||||
col2 int DEFAULT nextval('seq_13'),
|
||||
col3 bigint DEFAULT nextval('seq_14'));
|
||||
SELECT create_distributed_table('seq_test_12', 'col0');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO seq_test_12 VALUES ('hello0') RETURNING *;
|
||||
col0 | col1 | col2 | col3
|
||||
---------------------------------------------------------------------
|
||||
hello0 | 1 | 1 | 1
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
table_name | column_name | data_type | column_default
|
||||
---------------------------------------------------------------------
|
||||
seq_test_12 | col0 | text |
|
||||
seq_test_12 | col1 | smallint | worker_nextval('seq_12'::regclass)
|
||||
seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass)
|
||||
seq_test_12 | col3 | bigint | nextval('seq_14'::regclass)
|
||||
(4 rows)
|
||||
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *;
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_12');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_12" (32767)
|
||||
SELECT nextval('seq_13');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647)
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_14');
|
||||
nextval
|
||||
---------------------------------------------------------------------
|
||||
281474976710657
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
TRUNCATE seq_test_12;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col1;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col2;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col3;
|
||||
DROP SEQUENCE seq_12, seq_13, seq_14;
|
||||
CREATE SEQUENCE seq_12;
|
||||
CREATE SEQUENCE seq_13;
|
||||
CREATE SEQUENCE seq_14;
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col1 smallint DEFAULT nextval('seq_14');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col2 int DEFAULT nextval('seq_13');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col3 bigint DEFAULT nextval('seq_12');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col4 smallint;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col4 SET DEFAULT nextval('seq_14');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col5 int;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col5 SET DEFAULT nextval('seq_13');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col6 bigint;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col6 SET DEFAULT nextval('seq_12');
|
||||
INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *;
|
||||
col0 | col1 | col2 | col3 | col4 | col5 | col6
|
||||
---------------------------------------------------------------------
|
||||
hello1 | 1 | 1 | 1 | 2 | 2 | 2
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
table_name | column_name | data_type | column_default
|
||||
---------------------------------------------------------------------
|
||||
seq_test_12 | col0 | text |
|
||||
seq_test_12 | col1 | smallint | worker_nextval('seq_14'::regclass)
|
||||
seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass)
|
||||
seq_test_12 | col3 | bigint | nextval('seq_12'::regclass)
|
||||
seq_test_12 | col4 | smallint | worker_nextval('seq_14'::regclass)
|
||||
seq_test_12 | col5 | integer | worker_nextval('seq_13'::regclass)
|
||||
seq_test_12 | col6 | bigint | nextval('seq_12'::regclass)
|
||||
(7 rows)
|
||||
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_12');
|
||||
nextval
|
||||
---------------------------------------------------------------------
|
||||
281474976710657
|
||||
(1 row)
|
||||
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_13');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647)
|
||||
SELECT nextval('seq_14');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_14" (32767)
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('seq_test_12');
|
||||
NOTICE: creating a new table for sequence_default.seq_test_12
|
||||
NOTICE: moving the data of sequence_default.seq_test_12
|
||||
NOTICE: dropping the old sequence_default.seq_test_12
|
||||
NOTICE: renaming the new table to sequence_default.seq_test_12
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('seq_test_12', 'col0');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$sequence_default.seq_test_12$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
col0 | col1 | col2 | col3 | col4 | col5 | col6
|
||||
---------------------------------------------------------------------
|
||||
hello2 | 3 | 3 | 3 | 4 | 4 | 4
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
table_name | column_name | data_type | column_default
|
||||
---------------------------------------------------------------------
|
||||
seq_test_12 | col0 | text |
|
||||
seq_test_12 | col1 | smallint | worker_nextval('seq_14'::regclass)
|
||||
seq_test_12 | col2 | integer | worker_nextval('seq_13'::regclass)
|
||||
seq_test_12 | col3 | bigint | nextval('seq_12'::regclass)
|
||||
seq_test_12 | col4 | smallint | worker_nextval('seq_14'::regclass)
|
||||
seq_test_12 | col5 | integer | worker_nextval('seq_13'::regclass)
|
||||
seq_test_12 | col6 | bigint | nextval('seq_12'::regclass)
|
||||
(7 rows)
|
||||
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
ERROR: nextval(sequence) calls in worker nodes are not supported for column defaults of type int or smallint
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_12');
|
||||
nextval
|
||||
---------------------------------------------------------------------
|
||||
281474976710658
|
||||
(1 row)
|
||||
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_13');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_13" (2147483647)
|
||||
SELECT nextval('seq_14');
|
||||
ERROR: nextval: reached maximum value of sequence "seq_14" (32767)
|
||||
\c - - - :master_port
|
||||
-- clean up
|
||||
DROP TABLE sequence_default.seq_test_7_par;
|
||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||
|
|
|
@ -207,6 +207,7 @@ ORDER BY 1;
|
|||
function worker_hash_partition_table(bigint,integer,text,text,oid,anyarray)
|
||||
function worker_last_saved_explain_analyze()
|
||||
function worker_merge_files_into_table(bigint,integer,text[],text[])
|
||||
function worker_nextval(regclass)
|
||||
function worker_partial_agg(oid,anyelement)
|
||||
function worker_partial_agg_ffunc(internal)
|
||||
function worker_partial_agg_sfunc(internal,oid,anyelement)
|
||||
|
@ -257,5 +258,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(241 rows)
|
||||
(242 rows)
|
||||
|
||||
|
|
|
@ -452,6 +452,7 @@ SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
|||
INSERT INTO mx_table_with_small_sequence VALUES (0);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
-- Insert doesn't work because the defaults are of type int and smallint
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (1), (3);
|
||||
|
||||
\c - - - :master_port
|
||||
|
@ -460,6 +461,7 @@ SET citus.shard_replication_factor TO 1;
|
|||
-- Create an MX table with (BIGSERIAL) sequences
|
||||
CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_sequence', 'a');
|
||||
INSERT INTO mx_table_with_sequence VALUES (0);
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_sequence'::regclass;
|
||||
\ds mx_table_with_sequence_b_seq
|
||||
\ds mx_table_with_sequence_c_seq
|
||||
|
@ -470,6 +472,9 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_
|
|||
\ds mx_table_with_sequence_b_seq
|
||||
\ds mx_table_with_sequence_c_seq
|
||||
|
||||
-- Insert works because the defaults are of type bigint
|
||||
INSERT INTO mx_table_with_sequence VALUES (1), (3);
|
||||
|
||||
-- check that pg_depend records exist on the worker
|
||||
SELECT refobjsubid FROM pg_depend
|
||||
WHERE objid = 'mx_table_with_sequence_b_seq'::regclass AND refobjid = 'mx_table_with_sequence'::regclass;
|
||||
|
@ -493,7 +498,10 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_
|
|||
SELECT nextval('mx_table_with_sequence_b_seq');
|
||||
SELECT nextval('mx_table_with_sequence_c_seq');
|
||||
|
||||
-- Insert doesn't work because the defaults are of type int and smallint
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (2), (4);
|
||||
-- Insert works because the defaults are of type bigint
|
||||
INSERT INTO mx_table_with_sequence VALUES (2), (4);
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
\c - - - :master_port
|
||||
|
@ -501,6 +509,9 @@ INSERT INTO mx_table_with_small_sequence VALUES (2), (4);
|
|||
-- check our small sequence values
|
||||
SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c;
|
||||
|
||||
--check our bigint sequence values
|
||||
SELECT a, b, c FROM mx_table_with_sequence ORDER BY a,b,c;
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence;
|
||||
\d mx_table_with_sequence
|
||||
|
|
|
@ -90,17 +90,18 @@ ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;
|
|||
CREATE TABLE seq_test_4 (x int, y int);
|
||||
SELECT create_distributed_table('seq_test_4','x');
|
||||
CREATE SEQUENCE seq_4;
|
||||
ALTER TABLE seq_test_4 ADD COLUMN a int DEFAULT nextval('seq_4');
|
||||
ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4');
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
DROP SEQUENCE seq_4 CASCADE;
|
||||
TRUNCATE seq_test_4;
|
||||
CREATE SEQUENCE seq_4;
|
||||
ALTER TABLE seq_test_4 ADD COLUMN b int DEFAULT nextval('seq_4');
|
||||
ALTER TABLE seq_test_4 ADD COLUMN b bigint DEFAULT nextval('seq_4');
|
||||
-- on worker it should generate high sequence number
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_4 VALUES (1,2) RETURNING *;
|
||||
|
||||
-- check that we have can properly insert to tables from before metadata sync
|
||||
-- check that we have can't insert to tables from before metadata sync
|
||||
-- seq_test_0 and seq_test_0_local_table have int and smallint column defaults
|
||||
INSERT INTO sequence_default.seq_test_0 VALUES (1,2) RETURNING *;
|
||||
INSERT INTO sequence_default.seq_test_0_local_table VALUES (1,2) RETURNING *;
|
||||
|
||||
|
@ -119,7 +120,7 @@ SELECT create_distributed_table('seq_test_1','x');
|
|||
ALTER TABLE seq_test_1 ADD COLUMN z int DEFAULT nextval('seq_1');
|
||||
-- type is changed to int
|
||||
\d seq_1
|
||||
-- check insertion is within int bounds in the worker
|
||||
-- check insertion doesn't work in the worker because type is int
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_1 values (1, 2) RETURNING *;
|
||||
\c - - - :master_port
|
||||
|
@ -137,7 +138,7 @@ SELECT citus_add_local_table_to_metadata('seq_test_1_local_table');
|
|||
ALTER TABLE seq_test_1_local_table ADD COLUMN z int DEFAULT nextval('seq_1_local_table');
|
||||
-- type is changed to int
|
||||
\d seq_1_local_table
|
||||
-- check insertion is within int bounds in the worker
|
||||
-- check insertion doesn't work in the worker because type is int
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_1_local_table values (1, 2) RETURNING *;
|
||||
\c - - - :master_port
|
||||
|
@ -218,7 +219,7 @@ CREATE TABLE seq_test_5 (x int, y int);
|
|||
SELECT create_distributed_table('seq_test_5','x');
|
||||
CREATE SCHEMA sequence_default_1;
|
||||
CREATE SEQUENCE sequence_default_1.seq_5;
|
||||
ALTER TABLE seq_test_5 ADD COLUMN a int DEFAULT nextval('sequence_default_1.seq_5');
|
||||
ALTER TABLE seq_test_5 ADD COLUMN a bigint DEFAULT nextval('sequence_default_1.seq_5');
|
||||
DROP SCHEMA sequence_default_1 CASCADE;
|
||||
-- sequence is gone from coordinator
|
||||
INSERT INTO seq_test_5 VALUES (1, 2) RETURNING *;
|
||||
|
@ -372,8 +373,96 @@ SELECT create_distributed_table('seq_test_11', 'col1');
|
|||
\c - - - :worker_1_port
|
||||
INSERT INTO sequence_default.seq_test_10 VALUES (1);
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
||||
|
||||
-- Check worker_nextval and setval precautions for int and smallint column defaults
|
||||
-- For details see issue #5126 and PR #5254
|
||||
-- https://github.com/citusdata/citus/issues/5126
|
||||
CREATE SEQUENCE seq_12;
|
||||
CREATE SEQUENCE seq_13;
|
||||
CREATE SEQUENCE seq_14;
|
||||
CREATE TABLE seq_test_12(col0 text, col1 smallint DEFAULT nextval('seq_12'),
|
||||
col2 int DEFAULT nextval('seq_13'),
|
||||
col3 bigint DEFAULT nextval('seq_14'));
|
||||
SELECT create_distributed_table('seq_test_12', 'col0');
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
INSERT INTO seq_test_12 VALUES ('hello0') RETURNING *;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *;
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_12');
|
||||
SELECT nextval('seq_13');
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_14');
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
TRUNCATE seq_test_12;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col1;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col2;
|
||||
ALTER TABLE seq_test_12 DROP COLUMN col3;
|
||||
DROP SEQUENCE seq_12, seq_13, seq_14;
|
||||
CREATE SEQUENCE seq_12;
|
||||
CREATE SEQUENCE seq_13;
|
||||
CREATE SEQUENCE seq_14;
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col1 smallint DEFAULT nextval('seq_14');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col2 int DEFAULT nextval('seq_13');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col3 bigint DEFAULT nextval('seq_12');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col4 smallint;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col4 SET DEFAULT nextval('seq_14');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col5 int;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col5 SET DEFAULT nextval('seq_13');
|
||||
ALTER TABLE seq_test_12 ADD COLUMN col6 bigint;
|
||||
ALTER TABLE seq_test_12 ALTER COLUMN col6 SET DEFAULT nextval('seq_12');
|
||||
INSERT INTO seq_test_12 VALUES ('hello1') RETURNING *;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_12');
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_13');
|
||||
SELECT nextval('seq_14');
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET search_path = sequence_default, public;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT undistribute_table('seq_test_12');
|
||||
SELECT create_distributed_table('seq_test_12', 'col0');
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path = sequence_default, public;
|
||||
-- we should see worker_nextval for int and smallint columns
|
||||
SELECT table_name, column_name, data_type, column_default FROM information_schema.columns
|
||||
WHERE table_name = 'seq_test_12' ORDER BY column_name;
|
||||
-- insertion from worker should fail
|
||||
INSERT INTO seq_test_12 VALUES ('hello2') RETURNING *;
|
||||
-- nextval from worker should work for bigint sequences
|
||||
SELECT nextval('seq_12');
|
||||
-- nextval from worker should fail for int and smallint sequences
|
||||
SELECT nextval('seq_13');
|
||||
SELECT nextval('seq_14');
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
-- clean up
|
||||
DROP TABLE sequence_default.seq_test_7_par;
|
||||
SET client_min_messages TO error; -- suppress cascading objects dropping
|
||||
|
|
Loading…
Reference in New Issue