From b686d9a02552eb645f02790129015043c6b87bb8 Mon Sep 17 00:00:00 2001 From: Eren Basak Date: Mon, 9 Jan 2017 13:41:32 +0300 Subject: [PATCH] Add Sequence Support for MX Tables This change adds support for serial columns to be used with MX tables. Prior to this change, sequences of serial columns were created in all workers (for being able to create shards) but never used. With MX, we need to set the sequences so that sequences in each worker create unique values. This is done by setting the MINVALUE, MAXVALUE and START values of the sequence. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-15--6.1-16.sql | 12 + src/backend/distributed/citus.control | 2 +- .../master/master_delete_protocol.c | 9 +- .../distributed/master/master_node_protocol.c | 58 +--- .../distributed/metadata/metadata_sync.c | 205 +++++++++++++- .../distributed/utils/citus_ruleutils.c | 36 ++- .../worker/worker_data_fetch_protocol.c | 142 ++++++++++ src/include/distributed/citus_ruleutils.h | 2 + src/include/distributed/metadata_sync.h | 2 + src/test/regress/expected/multi_extension.out | 1 + .../regress/expected/multi_metadata_sync.out | 262 ++++++++++++++++-- .../multi_remove_node_reference_table.out | 44 +-- .../multi_replicate_reference_table.out | 50 ++-- .../multi_unsupported_worker_operations.out | 14 +- src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_metadata_sync.sql | 105 ++++++- .../sql/multi_remove_node_reference_table.sql | 2 + .../sql/multi_replicate_reference_table.sql | 2 + .../multi_unsupported_worker_operations.sql | 2 + 20 files changed, 808 insertions(+), 147 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-15--6.1-16.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 40dc62812..d7065afc8 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -125,6 +125,8 @@ $(EXTENSION)--6.1-14.sql: $(EXTENSION)--6.1-13.sql $(EXTENSION)--6.1-13--6.1-14. cat $^ > $@ $(EXTENSION)--6.1-15.sql: $(EXTENSION)--6.1-14.sql $(EXTENSION)--6.1-14--6.1-15.sql cat $^ > $@ +$(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-15--6.1-16.sql b/src/backend/distributed/citus--6.1-15--6.1-16.sql new file mode 100644 index 000000000..53e19893d --- /dev/null +++ b/src/backend/distributed/citus--6.1-15--6.1-16.sql @@ -0,0 +1,12 @@ +/* citus--6.1-15--6.1-16.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION worker_apply_sequence_command(text) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_apply_sequence_command$$; +COMMENT ON FUNCTION worker_apply_sequence_command(text) + IS 'create a sequence which products globally unique values'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 63dbb4c9e..f0e1fb685 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-15' +default_version = '6.1-16' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 411f1f763..89e418ce7 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -268,8 +268,13 @@ master_drop_sequences(PG_FUNCTION_ARGS) appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText)); } - SendCommandToWorkers(ALL_WORKERS, DISABLE_DDL_PROPAGATION); - SendCommandToWorkers(ALL_WORKERS, dropSeqCommand->data); + if (dropSeqCommand->len != 0) + { + appendStringInfoString(dropSeqCommand, " CASCADE"); + + SendCommandToWorkers(ALL_WORKERS, DISABLE_DDL_PROPAGATION); + SendCommandToWorkers(ALL_WORKERS, dropSeqCommand->data); + } PG_RETURN_VOID(); } diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 1f229261c..a7890d627 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -44,6 +44,7 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/pg_dist_shard.h" #include "distributed/worker_manager.h" #include "foreign/foreign.h" @@ -69,7 +70,6 @@ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); -static char * SchemaOwner(Oid schemaId); /* exports for SQL callable functions */ @@ -617,7 +617,7 @@ GetTableDDLEvents(Oid relationId) ListCell *sequenceIdCell; char *tableSchemaDef = NULL; char *tableColumnOptionsDef = NULL; - char *schemaName = NULL; + char *createSchemaCommand = NULL; Oid schemaId = InvalidOid; Relation pgIndex = NULL; @@ -652,14 +652,10 @@ GetTableDDLEvents(Oid relationId) /* create schema if the table is not in the default namespace (public) */ schemaId = get_rel_namespace(relationId); - schemaName = get_namespace_name(schemaId); - if (strncmp(schemaName, "public", NAMEDATALEN) != 0) + createSchemaCommand = CreateSchemaDDLCommand(schemaId); + if (createSchemaCommand != NULL) { - StringInfo schemaNameDef = makeStringInfo(); - char *ownerName = SchemaOwner(schemaId); - appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName, ownerName); - - tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data); + tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand); } /* create sequences if needed */ @@ -890,47 +886,3 @@ WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor) return workerNodeDatum; } - - -/* - * SchemaOwner returns the name of the owner of the specified schema. - */ -char * -SchemaOwner(Oid schemaId) -{ - const int scanKeyCount = 1; - - Relation namespaceRelation = heap_open(NamespaceRelationId, AccessShareLock); - ScanKeyData scanKeyData[scanKeyCount]; - SysScanDesc scanDescriptor = NULL; - HeapTuple tuple = NULL; - char *ownerName = NULL; - - /* start scan */ - ScanKeyInit(&scanKeyData[0], - ObjectIdAttributeNumber, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(schemaId)); - - scanDescriptor = systable_beginscan(namespaceRelation, NamespaceOidIndexId, true, - SnapshotSelf, 1, &scanKeyData[0]); - tuple = systable_getnext(scanDescriptor); - - if (HeapTupleIsValid(tuple)) - { - Form_pg_namespace nsptup = (Form_pg_namespace) GETSTRUCT(tuple); - Oid ownerId = nsptup->nspowner; - - ownerName = GetUserNameFromId(ownerId, false); - } - else - { - /* if the schema is not found, then return the name of current user */ - ownerName = GetUserNameFromId(GetUserId(), false); - } - - systable_endscan(scanDescriptor); - heap_close(namespaceRelation, NoLock); - - return ownerName; -} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 156f48bff..34dc859a3 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -20,10 +20,13 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/sysattr.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/pg_foreign_server.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" #include "distributed/citus_ruleutils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" @@ -40,11 +43,18 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/tqual.h" static char * LocalGroupIdUpdateCommand(uint32 groupId); static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); +static List * SequenceDDLCommandsForTable(Oid relationId); +static void EnsureSupportedSequenceColumnType(Oid sequenceOid); +static Oid TypeOfColumn(Oid tableId, int16 columnId); static char * TruncateTriggerCreateCommand(Oid relationId); +static char * OwnerName(Oid objectId); +static bool HasMetadataWorkers(void); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); @@ -88,6 +98,8 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) nodeNameString, nodePort))); } + MarkNodeHasMetadata(nodeNameString, nodePort, true); + /* generate and add the local group id's update query */ localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); @@ -113,8 +125,6 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner, recreateMetadataSnapshotCommandList); - MarkNodeHasMetadata(nodeNameString, nodePort, true); - PG_RETURN_VOID(); } @@ -217,11 +227,14 @@ MetadataCreateCommands(void) (DistTableCacheEntry *) lfirst(distributedTableCell); Oid relationId = cacheEntry->relationId; - List *commandList = GetTableDDLEvents(relationId); + List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); + List *ddlCommandList = GetTableDDLEvents(relationId); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - commandList); + workerSequenceDDLCommands); + metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, + ddlCommandList); metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, tableOwnerResetCommand); } @@ -288,12 +301,19 @@ GetDistributedTableDDLEvents(Oid relationId) List *commandList = NIL; List *foreignConstraintCommands = NIL; List *shardMetadataInsertCommandList = NIL; + List *sequenceDDLCommands = NIL; + List *tableDDLCommands = NIL; char *tableOwnerResetCommand = NULL; char *metadataCommand = NULL; char *truncateTriggerCreateCommand = NULL; + /* commands to create sequences */ + sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); + commandList = list_concat(commandList, sequenceDDLCommands); + /* commands to create the table */ - commandList = GetTableDDLEvents(relationId); + tableDDLCommands = GetTableDDLEvents(relationId); + commandList = list_concat(commandList, tableDDLCommands); /* command to reset the table owner */ tableOwnerResetCommand = TableOwnerResetCommand(relationId); @@ -688,6 +708,131 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata) } +/* + * SequenceDDLCommandsForTable returns a list of commands which create sequences (and + * their schemas) to run on workers before creating the relation. The sequence creation + * commands are wrapped with a `worker_apply_sequence_command` call, which sets the + * sequence space uniquely for each worker. Notice that this function is relevant only + * during metadata propagation to workers and adds nothing to the list of sequence + * commands if none of the workers is marked as receiving metadata changes. + */ +List * +SequenceDDLCommandsForTable(Oid relationId) +{ + List *sequenceDDLList = NIL; + List *ownedSequences = getOwnedSequences(relationId); + ListCell *listCell; + char *ownerName = TableOwner(relationId); + + foreach(listCell, ownedSequences) + { + Oid sequenceOid = (Oid) lfirst_oid(listCell); + char *sequenceDef = pg_get_sequencedef_string(sequenceOid); + char *escapedSequenceDef = quote_literal_cstr(sequenceDef); + StringInfo wrappedSequenceDef = makeStringInfo(); + StringInfo sequenceGrantStmt = makeStringInfo(); + Oid schemaId = InvalidOid; + char *createSchemaCommand = NULL; + char *sequenceName = generate_qualified_relation_name(sequenceOid); + + EnsureSupportedSequenceColumnType(sequenceOid); + + /* create schema if needed */ + schemaId = get_rel_namespace(sequenceOid); + createSchemaCommand = CreateSchemaDDLCommand(schemaId); + if (createSchemaCommand != NULL) + { + sequenceDDLList = lappend(sequenceDDLList, createSchemaCommand); + } + + appendStringInfo(wrappedSequenceDef, + WORKER_APPLY_SEQUENCE_COMMAND, + escapedSequenceDef); + + appendStringInfo(sequenceGrantStmt, + "ALTER SEQUENCE %s OWNER TO %s", sequenceName, + quote_identifier(ownerName)); + + sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data); + sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data); + } + + return sequenceDDLList; +} + + +/* + * CreateSchemaDDLCommand returns a "CREATE SCHEMA..." SQL string for creating the given + * schema if not exists and with proper authorization. + */ +char * +CreateSchemaDDLCommand(Oid schemaId) +{ + char *schemaName = get_namespace_name(schemaId); + StringInfo schemaNameDef = NULL; + char *ownerName = NULL; + + if (strncmp(schemaName, "public", NAMEDATALEN) == 0) + { + return NULL; + } + + schemaNameDef = makeStringInfo(); + ownerName = OwnerName(schemaId); + appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName, ownerName); + + return schemaNameDef->data; +} + + +/* + * EnsureSupportedSequenceColumnType looks at the column which depends on this sequence + * (which it Assert's exists) and makes sure its type is suitable for use in a disributed + * manner. + * + * Any column which depends on a sequence (and will therefore be replicated) but which is + * not a BIGINT cannot be used for an mx table, because there aren't enough values to + * ensure that generated numbers are globally unique. + */ +static void +EnsureSupportedSequenceColumnType(Oid sequenceOid) +{ + Oid tableId = InvalidOid; + Oid columnType = InvalidOid; + int32 columnId = 0; + bool shouldSyncMetadata = false; + bool hasMetadataWorkers = HasMetadataWorkers(); + + /* call sequenceIsOwned in order to get the tableId and columnId */ + sequenceIsOwned(sequenceOid, &tableId, &columnId); + + shouldSyncMetadata = ShouldSyncTableMetadata(tableId); + + columnType = TypeOfColumn(tableId, (int16) columnId); + + if (columnType != INT8OID && shouldSyncMetadata && hasMetadataWorkers) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create an mx table with columns which use " + "sequences, but are not BIGINT"))); + } +} + + +/* + * TypeOfColumn returns the Oid of the type of the provided column of the provided table. + */ +static Oid +TypeOfColumn(Oid tableId, int16 columnId) +{ + Relation tableRelation = relation_open(tableId, NoLock); + TupleDesc tupleDescriptor = RelationGetDescr(tableRelation); + Form_pg_attribute attrForm = tupleDescriptor->attrs[columnId - 1]; + relation_close(tableRelation, NoLock); + return attrForm->atttypid; +} + + /* * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger * function, which creates the truncate trigger on the worker. @@ -704,3 +849,53 @@ TruncateTriggerCreateCommand(Oid relationId) return triggerCreateCommand->data; } + + +/* + * OwnerName returns the name of the owner of the specified object. + */ +static char * +OwnerName(Oid objectId) +{ + HeapTuple tuple = NULL; + Oid ownerId = InvalidOid; + char *ownerName = NULL; + + tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(objectId)); + if (HeapTupleIsValid(tuple)) + { + ownerId = ((Form_pg_class) GETSTRUCT(tuple))->relowner; + } + else + { + ownerId = GetUserId(); + } + + ownerName = GetUserNameFromId(ownerId, false); + + return ownerName; +} + + +/* + * HasMetadataWorkers returns true if any of the workers in the cluster has its + * hasmetadata column set to true, which happens when start_metadata_sync_to_node + * command is run. + */ +static bool +HasMetadataWorkers(void) +{ + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (workerNode->hasMetadata) + { + return true; + } + } + + return false; +} diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index 2705080a2..739005a08 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -32,7 +32,6 @@ #include "catalog/pg_index.h" #include "commands/defrem.h" #include "commands/extension.h" -#include "commands/sequence.h" #include "distributed/citus_ruleutils.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" @@ -184,10 +183,32 @@ pg_get_sequencedef_string(Oid sequenceRelationId) char *qualifiedSequenceName = NULL; char *sequenceDef = NULL; Form_pg_sequence pgSequenceForm = NULL; - Relation sequenceRel = NULL; - AclResult permissionCheck = ACLCHECK_NO_PRIV; + + pgSequenceForm = pg_get_sequencedef(sequenceRelationId); + + /* build our DDL command */ + qualifiedSequenceName = generate_relation_name(sequenceRelationId, NIL); + sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName, + pgSequenceForm->increment_by, pgSequenceForm->min_value, + pgSequenceForm->max_value, pgSequenceForm->cache_value, + pgSequenceForm->is_cycled ? "" : "NO "); + + return sequenceDef; +} + + +/* + * pg_get_sequencedef returns the Form_pg_sequence data about the sequence with the given + * object id. + */ +Form_pg_sequence +pg_get_sequencedef(Oid sequenceRelationId) +{ + Form_pg_sequence pgSequenceForm = NULL; SysScanDesc scanDescriptor = NULL; HeapTuple heapTuple = NULL; + Relation sequenceRel = NULL; + AclResult permissionCheck = ACLCHECK_NO_PRIV; /* open and lock sequence */ sequenceRel = heap_open(sequenceRelationId, AccessShareLock); @@ -212,18 +233,11 @@ pg_get_sequencedef_string(Oid sequenceRelationId) pgSequenceForm = (Form_pg_sequence) GETSTRUCT(heapTuple); - /* build our DDL command */ - qualifiedSequenceName = generate_relation_name(sequenceRelationId, NIL); - sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName, - pgSequenceForm->increment_by, pgSequenceForm->min_value, - pgSequenceForm->max_value, pgSequenceForm->cache_value, - pgSequenceForm->is_cycled ? "" : "NO "); - systable_endscan(scanDescriptor); heap_close(sequenceRel, AccessShareLock); - return sequenceDef; + return pgSequenceForm; } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 55be5783b..f86b20726 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -24,8 +24,10 @@ #include "commands/copy.h" #include "commands/dbcommands.h" #include "commands/extension.h" +#include "commands/sequence.h" #include "distributed/citus_ruleutils.h" #include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_server_executor.h" @@ -68,6 +70,8 @@ static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort, static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName); static bool check_log_statement(List *stmt_list); +static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName); +static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg); /* exports for SQL callable functions */ @@ -75,6 +79,7 @@ PG_FUNCTION_INFO_V1(worker_fetch_partition_file); PG_FUNCTION_INFO_V1(worker_fetch_query_results_file); PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command); +PG_FUNCTION_INFO_V1(worker_apply_sequence_command); PG_FUNCTION_INFO_V1(worker_fetch_regular_table); PG_FUNCTION_INFO_V1(worker_fetch_foreign_file); PG_FUNCTION_INFO_V1(worker_append_table_to_shard); @@ -449,6 +454,51 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) } +/* + * worker_apply_sequence_command takes a CREATE SEQUENCE command string, runs the + * CREATE SEQUENCE command then creates and runs an ALTER SEQUENCE statement + * which adjusts the minvalue and maxvalue of the sequence such that the sequence + * creates globally unique values. + */ +Datum +worker_apply_sequence_command(PG_FUNCTION_ARGS) +{ + text *commandText = PG_GETARG_TEXT_P(0); + const char *commandString = text_to_cstring(commandText); + Node *commandNode = ParseTreeNode(commandString); + CreateSeqStmt *createSequenceStatement = NULL; + char *sequenceName = NULL; + char *sequenceSchema = NULL; + Oid sequenceRelationId = InvalidOid; + + NodeTag nodeType = nodeTag(commandNode); + if (nodeType != T_CreateSeqStmt) + { + ereport(ERROR, + (errmsg("must call worker_apply_sequence_command with a CREATE" + " SEQUENCE command string"))); + } + + /* run the CREATE SEQUENCE command */ + ProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); + + createSequenceStatement = (CreateSeqStmt *) commandNode; + + sequenceName = createSequenceStatement->sequence->relname; + sequenceSchema = createSequenceStatement->sequence->schemaname; + createSequenceStatement = (CreateSeqStmt *) commandNode; + + sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence, + AccessShareLock, false); + Assert(sequenceRelationId != InvalidOid); + + AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName); + + PG_RETURN_VOID(); +} + + /* * worker_fetch_regular_table caches the given PostgreSQL table on the local * node. The function caches this table by trying the given list of node names @@ -1244,3 +1294,95 @@ check_log_statement(List *statementList) return false; } + + +/* + * AlterSequenceMinMax arranges the min and max value of the given sequence. The function + * creates ALTER SEQUENCE statemenet which sets the start, minvalue and maxvalue of + * the given sequence. + * + * The function provides the uniqueness by shifting the start of the sequence by + * GetLocalGroupId() << 48 + 1 and sets a maxvalue which stops it from passing out any + * values greater than: (GetLocalGroupID() + 1) << 48. + * + * This is to ensure every group of workers passes out values from a unique range, + * and therefore that all values generated for the sequence are globally unique. + */ +static void +AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName) +{ + Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceId); + int64 startValue = 0; + int64 maxValue = 0; + + /* calculate min/max values that the sequence can generate in this worker */ + startValue = (((int64) GetLocalGroupId()) << 48) + 1; + maxValue = startValue + ((int64) 1 << 48); + + /* + * We alter the sequence if the previously set min and max values are not equal to + * their correct values. This happens when the sequence has been created + * during shard, before the current worker having the metadata. + */ + if (sequenceData->min_value != startValue || sequenceData->max_value != maxValue) + { + StringInfo startNumericString = makeStringInfo(); + StringInfo maxNumericString = makeStringInfo(); + Node *startFloatArg = NULL; + Node *maxFloatArg = NULL; + AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt); + const char *dummyString = "-"; + + alterSequenceStatement->sequence = makeRangeVar(schemaName, sequenceName, -1); + + /* + * DefElem->arg can only hold literal ints up to int4, in order to represent + * larger numbers we need to construct a float represented as a string. + */ + appendStringInfo(startNumericString, "%lu", startValue); + startFloatArg = (Node *) makeFloat(startNumericString->data); + + appendStringInfo(maxNumericString, "%lu", maxValue); + maxFloatArg = (Node *) makeFloat(maxNumericString->data); + + SetDefElemArg(alterSequenceStatement, "start", startFloatArg); + SetDefElemArg(alterSequenceStatement, "minvalue", startFloatArg); + SetDefElemArg(alterSequenceStatement, "maxvalue", maxFloatArg); + + SetDefElemArg(alterSequenceStatement, "restart", startFloatArg); + + /* since the command is an AlterSeqStmt, a dummy command string works fine */ + ProcessUtility((Node *) alterSequenceStatement, dummyString, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + } +} + + +/* + * SetDefElemArg scans through all the DefElem's of an AlterSeqStmt and + * and sets the arg of the one with a defname of name to arg. + * + * If a DefElem with the given defname does not exist it is created and + * added to the AlterSeqStmt. + */ +static void +SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg) +{ + DefElem *defElem = NULL; + ListCell *optionCell = NULL; + + foreach(optionCell, statement->options) + { + defElem = (DefElem *) lfirst(optionCell); + + if (strcmp(defElem->defname, name) == 0) + { + pfree(defElem->arg); + defElem->arg = arg; + return; + } + } + + defElem = makeDefElem((char *) name, arg); + statement->options = lappend(statement->options, defElem); +} diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index e8327c30b..d6faf0a2a 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -14,6 +14,7 @@ #include "postgres.h" /* IWYU pragma: keep */ #include "c.h" +#include "commands/sequence.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" @@ -28,6 +29,7 @@ extern char * pg_get_extensiondef_string(Oid tableRelationId); extern Oid get_extension_schema(Oid ext_oid); extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); +extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 936c7b34e..56e91e5f8 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -30,12 +30,14 @@ extern char * NodeListInsertCommand(List *workerNodeList); extern List * ShardListInsertCommand(List *shardIntervalList); extern char * NodeDeleteCommand(uint32 nodeId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); +extern char * CreateSchemaDDLCommand(Oid schemaId); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" +#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)" #endif /* METADATA_SYNC_H */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ff4f90918..38b310c5d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -73,6 +73,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-12'; ALTER EXTENSION citus UPDATE TO '6.1-13'; ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; +ALTER EXTENSION citus UPDATE TO '6.1-16'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 16651ca77..8522ddd0c 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -34,7 +34,7 @@ SELECT unnest(master_metadata_snapshot()); (3 rows) -- Create a test table with constraints and SERIAL -CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL); +CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL); SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash'); master_create_distributed_table --------------------------------- @@ -57,15 +57,17 @@ SELECT unnest(master_metadata_snapshot()); TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') + ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE - CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) + CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') SELECT worker_create_truncate_trigger('public.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(11 rows) +(13 rows) -- Show that CREATE INDEX commands are included in the metadata snapshot CREATE INDEX mx_index ON mx_test_table(col_2); @@ -77,8 +79,10 @@ SELECT unnest(master_metadata_snapshot()); TRUNCATE pg_dist_node SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) + SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') + ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE - CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) + CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres @@ -86,7 +90,7 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_create_truncate_trigger('public.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(12 rows) +(14 rows) -- Show that schema changes are included in the metadata snapshot CREATE SCHEMA mx_testing_schema; @@ -100,8 +104,11 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres + SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') + ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE - CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) + CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres @@ -109,7 +116,7 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(13 rows) +(16 rows) -- Show that append distributed tables are not included in the metadata snapshot CREATE TABLE non_mx_test_table (col_1 int, col_2 text); @@ -127,8 +134,11 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres + SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') + ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE - CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) + CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres @@ -136,7 +146,7 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(13 rows) +(16 rows) -- Show that range distributed tables are not included in the metadata snapshot UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; @@ -147,8 +157,11 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE) CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres + SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') + ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres + CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE - CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) + CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres @@ -156,7 +169,7 @@ SELECT unnest(master_metadata_snapshot()); SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 100000),(1310001, 1, 0, 'localhost', 57638, 100001),(1310002, 1, 0, 'localhost', 57637, 100002),(1310003, 1, 0, 'localhost', 57638, 100003),(1310004, 1, 0, 'localhost', 57637, 100004),(1310005, 1, 0, 'localhost', 57638, 100005),(1310006, 1, 0, 'localhost', 57637, 100006),(1310007, 1, 0, 'localhost', 57638, 100007) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') -(13 rows) +(16 rows) -- Test start_metadata_sync_to_node UDF -- Ensure that hasmetadata=false for all nodes @@ -190,7 +203,7 @@ SELECT * FROM pg_dist_local_group; SELECT * FROM pg_dist_node ORDER BY nodeid; nodeid | groupid | nodename | nodeport | noderack | hasmetadata --------+---------+-----------+----------+----------+------------- - 1 | 1 | localhost | 57637 | default | f + 1 | 1 | localhost | 57637 | default | t 2 | 2 | localhost | 57638 | default | f (2 rows) @@ -232,7 +245,7 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid; --------+---------+--------------------------------------------------------------------------------- col_1 | integer | col_2 | text | not null - col_3 | integer | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) + col_3 | bigint | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) Indexes: "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) "mx_index" btree (col_2) @@ -358,7 +371,7 @@ SELECT * FROM pg_dist_shard_placement ORDER BY shardid; --------+---------+--------------------------------------------------------------------------------- col_1 | integer | col_2 | text | not null - col_3 | integer | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) + col_3 | bigint | not null default nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) Indexes: "mx_test_table_col_1_key" UNIQUE CONSTRAINT, btree (col_1) "mx_index" btree (col_2) @@ -861,16 +874,45 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te (1 row) DROP TABLE mx_temp_drop_test; --- Create an MX table with sequences +-- Check that MX tables can be created with SERIAL columns, but error out on metadata sync \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL); +SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ERROR: cannot create an mx table with columns which use sequences, but are not BIGINT +DROP TABLE mx_table_with_small_sequence; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node ----------------------------- (1 row) +-- Show that create_distributed_table errors out if the table has a SERIAL column and +-- there are metadata workers +CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL); +SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); +ERROR: cannot create an mx table with columns which use sequences, but are not BIGINT +DROP TABLE mx_table_with_small_sequence; +-- Create an MX table with (BIGSERIAL) sequences CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL); SELECT create_distributed_table('mx_table_with_sequence', 'a'); create_distributed_table @@ -900,7 +942,7 @@ SELECT create_distributed_table('mx_table_with_sequence', 'a'); public | mx_table_with_sequence_c_seq | sequence | postgres (1 row) --- Check that the sequences created on the worker as well +-- Check that the sequences created on the metadata worker as well \c - - - :worker_1_port \d mx_table_with_sequence Table "public.mx_table_with_sequence" @@ -924,8 +966,77 @@ SELECT create_distributed_table('mx_table_with_sequence', 'a'); public | mx_table_with_sequence_c_seq | sequence | postgres (1 row) --- Check that dropping the mx table with sequences works as expected +-- Check that the sequences on the worker have their own space +SELECT nextval('mx_table_with_sequence_b_seq'); + nextval +----------------- + 281474976710657 +(1 row) + +SELECT nextval('mx_table_with_sequence_c_seq'); + nextval +----------------- + 281474976710657 +(1 row) + +-- Check that adding a new metadata node sets the sequence space correctly \c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +\c - - - :worker_2_port +SELECT groupid FROM pg_dist_local_group; + groupid +--------- + 2 +(1 row) + +\d mx_table_with_sequence + Table "public.mx_table_with_sequence" + Column | Type | Modifiers +--------+---------+-------------------------------------------------------------------- + a | integer | + b | bigint | not null default nextval('mx_table_with_sequence_b_seq'::regclass) + c | bigint | not null default nextval('mx_table_with_sequence_c_seq'::regclass) + +\ds mx_table_with_sequence_b_seq + List of relations + Schema | Name | Type | Owner +--------+------------------------------+----------+---------- + public | mx_table_with_sequence_b_seq | sequence | postgres +(1 row) + +\ds mx_table_with_sequence_c_seq + List of relations + Schema | Name | Type | Owner +--------+------------------------------+----------+---------- + public | mx_table_with_sequence_c_seq | sequence | postgres +(1 row) + +SELECT nextval('mx_table_with_sequence_b_seq'); + nextval +----------------- + 562949953421313 +(1 row) + +SELECT nextval('mx_table_with_sequence_c_seq'); + nextval +----------------- + 562949953421313 +(1 row) + +-- Check that dropping the mx table with sequences works as expected, even the metadata +-- syncing is stopped to one of the workers +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + DROP TABLE mx_table_with_sequence; \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq @@ -940,7 +1051,7 @@ DROP TABLE mx_table_with_sequence; --------+------+------+------- (0 rows) --- Check that the sequences are dropped from the worker as well +-- Check that the sequences are dropped from the workers \c - - - :worker_1_port \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq @@ -955,6 +1066,121 @@ DROP TABLE mx_table_with_sequence; --------+------+------+------- (0 rows) +-- Check that the sequences are dropped from the workers +\c - - - :worker_2_port +\ds mx_table_with_sequence_b_seq + List of relations + Schema | Name | Type | Owner +--------+------+------+------- +(0 rows) + +\ds mx_table_with_sequence_c_seq + List of relations + Schema | Name | Type | Owner +--------+------+------+------- +(0 rows) + +-- Check that MX sequences play well with non-super users +\c - - - :master_port +-- Remove a node so that shards and sequences won't be created on table creation. Therefore, +-- we can test that start_metadata_sync_to_node can actually create the sequence with proper +-- owner +CREATE TABLE pg_dist_shard_placement_temp AS SELECT * FROM pg_dist_shard_placement; +CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition; +DELETE FROM pg_dist_shard_placement; +DELETE FROM pg_dist_partition; +SELECT master_remove_node('localhost', :worker_2_port); + master_remove_node +-------------------- + +(1 row) + +CREATE USER mx_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +\c - - - :worker_1_port +CREATE USER mx_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +\c - - - :worker_2_port +CREATE USER mx_user; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +\c - mx_user - :master_port +-- Create an mx table as a different user +CREATE TABLE mx_table (a int, b BIGSERIAL); +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('mx_table', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +\c - postgres - :master_port +SELECT master_add_node('localhost', :worker_2_port); + master_add_node +--------------------------------- + (3,3,localhost,57638,default,f) +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +----------------------------- + +(1 row) + +\c - mx_user - :worker_1_port +SELECT nextval('mx_table_b_seq'); + nextval +----------------- + 281474976710657 +(1 row) + +INSERT INTO mx_table (a) VALUES (37); +INSERT INTO mx_table (a) VALUES (38); +SELECT * FROM mx_table ORDER BY a; + a | b +----+----------------- + 37 | 281474976710658 + 38 | 281474976710659 +(2 rows) + +\c - mx_user - :worker_2_port +SELECT nextval('mx_table_b_seq'); + nextval +----------------- + 844424930131969 +(1 row) + +INSERT INTO mx_table (a) VALUES (39); +INSERT INTO mx_table (a) VALUES (40); +SELECT * FROM mx_table ORDER BY a; + a | b +----+----------------- + 37 | 281474976710658 + 38 | 281474976710659 + 39 | 844424930131970 + 40 | 844424930131971 +(4 rows) + +\c - mx_user - :master_port +DROP TABLE mx_table; +\c - postgres - :master_port +INSERT INTO pg_dist_shard_placement SELECT * FROM pg_dist_shard_placement_temp; +INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp; +DROP TABLE pg_dist_shard_placement_temp; +DROP TABLE pg_dist_partition_temp; +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + stop_metadata_sync_to_node +---------------------------- + +(1 row) + +DROP USER mx_user; +\c - - - :worker_1_port +DROP USER mx_user; +\c - - - :worker_2_port +DROP USER mx_user; -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index 383b2850e..e10a80f06 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -5,6 +5,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; -- create copy of pg_dist_shard_placement to reload after the test CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; @@ -34,9 +36,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); - master_add_node ------------------------------------ - (15,15,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380000,1380000,localhost,57638,default,f) (1 row) -- remove a node with reference table @@ -116,9 +118,9 @@ ERROR: could not find valid entry for node "localhost:57638" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ------------------------------------ - (16,16,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380001,1380001,localhost,57638,default,f) (1 row) -- remove node in a transaction and ROLLBACK @@ -257,9 +259,9 @@ WHERE colocationid IN -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ------------------------------------ - (17,17,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380002,1380002,localhost,57638,default,f) (1 row) -- test inserting a value then removing a node in a transaction @@ -339,9 +341,9 @@ SELECT * FROM remove_node_reference_table; -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ------------------------------------ - (18,18,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380003,1380003,localhost,57638,default,f) (1 row) -- test executing DDL command then removing a node in a transaction @@ -424,9 +426,9 @@ Table "public.remove_node_reference_table" -- re-add the node for next tests SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers - master_add_node ------------------------------------ - (19,19,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380004,1380004,localhost,57638,default,f) (1 row) -- test DROP table after removing a node in a transaction @@ -521,9 +523,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "table1" to all workers - master_add_node ------------------------------------ - (20,20,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380005,1380005,localhost,57638,default,f) (1 row) -- test with master_disable_node @@ -597,9 +599,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "table1" to all workers - master_add_node ------------------------------------ - (21,21,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1380006,1380006,localhost,57638,default,f) (1 row) -- DROP tables to clean workspace diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index ef3586d57..e9d21e600 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -5,6 +5,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; -- remove a node for testing purposes CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; @@ -23,9 +25,9 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (4,4,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370000,1370000,localhost,57638,default,f) (1 row) -- verify node is added @@ -122,9 +124,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_valid" to all workers - master_add_node ---------------------------------- - (6,6,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370002,1370002,localhost,57638,default,f) (1 row) -- status after master_add_node @@ -175,9 +177,9 @@ WHERE colocationid IN (1 row) SELECT master_add_node('localhost', :worker_2_port); - master_add_node ---------------------------------- - (6,6,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370002,1370002,localhost,57638,default,f) (1 row) -- status after master_add_node @@ -243,9 +245,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_rollback" to all workers - master_add_node ---------------------------------- - (7,7,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370003,1370003,localhost,57638,default,f) (1 row) ROLLBACK; @@ -305,9 +307,9 @@ WHERE colocationid IN BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_commit" to all workers - master_add_node ---------------------------------- - (8,8,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370004,1370004,localhost,57638,default,f) (1 row) COMMIT; @@ -396,9 +398,9 @@ WHERE BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_reference_one" to all workers - master_add_node ---------------------------------- - (9,9,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370005,1370005,localhost,57638,default,f) (1 row) SELECT upgrade_to_reference_table('replicate_reference_table_hash'); @@ -521,9 +523,9 @@ SELECT create_reference_table('replicate_reference_table_drop'); BEGIN; SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "replicate_reference_table_drop" to all workers - master_add_node ------------------------------------ - (13,13,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370009,1370009,localhost,57638,default,f) (1 row) DROP TABLE replicate_reference_table_drop; @@ -565,9 +567,9 @@ WHERE colocationid IN SELECT master_add_node('localhost', :worker_2_port); NOTICE: Replicating reference table "table1" to all workers - master_add_node ------------------------------------ - (14,14,localhost,57638,default,f) + master_add_node +--------------------------------------------- + (1370010,1370010,localhost,57638,default,f) (1 row) -- status after master_add_node diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 10b27f766..1d4be75d6 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -4,6 +4,8 @@ -- Tests for ensuring unsupported functions on workers error out. ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1270000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1270000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; -- Set the colocation id to a safe value so that -- it is not affected by future changes to colocation id sequence SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset @@ -179,9 +181,9 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port SELECT master_add_node('localhost', 5432); - master_add_node --------------------------------- - (3,3,localhost,5432,default,f) + master_add_node +-------------------------------------------- + (1370000,1370000,localhost,5432,default,f) (1 row) \c - - - :worker_1_port @@ -189,9 +191,9 @@ SELECT master_remove_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the schema node and run it again. SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; - nodeid | groupid | nodename | nodeport | noderack | hasmetadata ---------+---------+-----------+----------+----------+------------- - 3 | 3 | localhost | 5432 | default | f + nodeid | groupid | nodename | nodeport | noderack | hasmetadata +---------+---------+-----------+----------+----------+------------- + 1370000 | 1370000 | localhost | 5432 | default | f (1 row) \c - - - :master_port diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index ae422001f..38a7560e1 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -73,6 +73,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-12'; ALTER EXTENSION citus UPDATE TO '6.1-13'; ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; +ALTER EXTENSION citus UPDATE TO '6.1-16'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 36ff9d8d4..61df18fc3 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -30,7 +30,7 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; SELECT unnest(master_metadata_snapshot()); -- Create a test table with constraints and SERIAL -CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL); +CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 BIGSERIAL); SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash'); SELECT master_create_worker_shards('mx_test_table', 8, 1); @@ -380,37 +380,132 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te DROP TABLE mx_temp_drop_test; --- Create an MX table with sequences +-- Check that MX tables can be created with SERIAL columns, but error out on metadata sync \c - - - :master_port SET citus.shard_count TO 3; SET citus.shard_replication_factor TO 1; + +SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + +CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL); +SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); + +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +DROP TABLE mx_table_with_small_sequence; SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- Show that create_distributed_table errors out if the table has a SERIAL column and +-- there are metadata workers +CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL); +SELECT create_distributed_table('mx_table_with_small_sequence', 'a'); +DROP TABLE mx_table_with_small_sequence; + +-- Create an MX table with (BIGSERIAL) sequences CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL); SELECT create_distributed_table('mx_table_with_sequence', 'a'); \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq --- Check that the sequences created on the worker as well +-- Check that the sequences created on the metadata worker as well \c - - - :worker_1_port \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq --- Check that dropping the mx table with sequences works as expected +-- Check that the sequences on the worker have their own space +SELECT nextval('mx_table_with_sequence_b_seq'); +SELECT nextval('mx_table_with_sequence_c_seq'); + +-- Check that adding a new metadata node sets the sequence space correctly \c - - - :master_port +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + +\c - - - :worker_2_port +SELECT groupid FROM pg_dist_local_group; +\d mx_table_with_sequence +\ds mx_table_with_sequence_b_seq +\ds mx_table_with_sequence_c_seq +SELECT nextval('mx_table_with_sequence_b_seq'); +SELECT nextval('mx_table_with_sequence_c_seq'); + +-- Check that dropping the mx table with sequences works as expected, even the metadata +-- syncing is stopped to one of the workers +\c - - - :master_port +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); DROP TABLE mx_table_with_sequence; \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq --- Check that the sequences are dropped from the worker as well +-- Check that the sequences are dropped from the workers \c - - - :worker_1_port \d mx_table_with_sequence \ds mx_table_with_sequence_b_seq \ds mx_table_with_sequence_c_seq +-- Check that the sequences are dropped from the workers +\c - - - :worker_2_port +\ds mx_table_with_sequence_b_seq +\ds mx_table_with_sequence_c_seq + +-- Check that MX sequences play well with non-super users +\c - - - :master_port + +-- Remove a node so that shards and sequences won't be created on table creation. Therefore, +-- we can test that start_metadata_sync_to_node can actually create the sequence with proper +-- owner +CREATE TABLE pg_dist_shard_placement_temp AS SELECT * FROM pg_dist_shard_placement; +CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition; +DELETE FROM pg_dist_shard_placement; +DELETE FROM pg_dist_partition; +SELECT master_remove_node('localhost', :worker_2_port); + +CREATE USER mx_user; +\c - - - :worker_1_port +CREATE USER mx_user; +\c - - - :worker_2_port +CREATE USER mx_user; + +\c - mx_user - :master_port +-- Create an mx table as a different user +CREATE TABLE mx_table (a int, b BIGSERIAL); +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('mx_table', 'a'); + +\c - postgres - :master_port +SELECT master_add_node('localhost', :worker_2_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + +\c - mx_user - :worker_1_port +SELECT nextval('mx_table_b_seq'); +INSERT INTO mx_table (a) VALUES (37); +INSERT INTO mx_table (a) VALUES (38); +SELECT * FROM mx_table ORDER BY a; + +\c - mx_user - :worker_2_port +SELECT nextval('mx_table_b_seq'); +INSERT INTO mx_table (a) VALUES (39); +INSERT INTO mx_table (a) VALUES (40); +SELECT * FROM mx_table ORDER BY a; + +\c - mx_user - :master_port +DROP TABLE mx_table; + +\c - postgres - :master_port +INSERT INTO pg_dist_shard_placement SELECT * FROM pg_dist_shard_placement_temp; +INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp; +DROP TABLE pg_dist_shard_placement_temp; +DROP TABLE pg_dist_partition_temp; +SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); + +DROP USER mx_user; +\c - - - :worker_1_port +DROP USER mx_user; +\c - - - :worker_2_port +DROP USER mx_user; + -- Cleanup \c - - - :master_port DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index a5a08314a..5ab58e978 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -7,6 +7,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1380000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1380000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1380000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000; -- create copy of pg_dist_shard_placement to reload after the test CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 9a58f7a90..330e62fee 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -7,6 +7,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1370000; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; -- remove a node for testing purposes diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 1ab19c3e2..e629c2926 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -6,6 +6,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1270000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1270000; +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART 1370000; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1370000; -- Set the colocation id to a safe value so that -- it is not affected by future changes to colocation id sequence