diff --git a/src/backend/distributed/.gitignore b/src/backend/distributed/.gitignore index 5d871b7d7..7d81b0cbe 100644 --- a/src/backend/distributed/.gitignore +++ b/src/backend/distributed/.gitignore @@ -12,3 +12,4 @@ # ignore latest install file citus--5.0.sql citus--5.?-*.sql +!citus--5.?-*--5.?-*.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 9005365cb..c3b96342b 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -6,7 +6,7 @@ citus_top_builddir = ../../.. MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ - 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 + 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -47,6 +47,8 @@ $(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql cat $^ > $@ $(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql cat $^ > $@ +$(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.1-7--5.1-8.sql b/src/backend/distributed/citus--5.1-7--5.1-8.sql new file mode 100644 index 000000000..9aabce0ef --- /dev/null +++ b/src/backend/distributed/citus--5.1-7--5.1-8.sql @@ -0,0 +1,72 @@ +CREATE FUNCTION pg_catalog.master_drop_sequences(sequence_names text[], + node_name text, + node_port bigint) + RETURNS bool + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_drop_sequences$$; +COMMENT ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint) + IS 'drop specified sequences from a node'; + +REVOKE ALL ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint) FROM PUBLIC; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger() + RETURNS event_trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog + AS $cdbdt$ +DECLARE + v_obj record; + sequence_names text[] := '{}'; + node_names text[] := '{}'; + node_ports bigint[] := '{}'; + node_name text; + node_port bigint; +BEGIN + -- collect set of dropped sequences to drop on workers later + SELECT array_agg(object_identity) INTO sequence_names + FROM pg_event_trigger_dropped_objects() + WHERE object_type = 'sequence'; + + -- Must accumulate set of affected nodes before deleting placements, as + -- master_drop_all_shards will erase their rows, making it impossible for + -- us to know where to drop sequences (which must be dropped after shards, + -- since they have default value expressions which depend on sequences). + SELECT array_agg(sp.nodename), array_agg(sp.nodeport) + INTO node_names, node_ports + FROM pg_event_trigger_dropped_objects() AS dobj, + pg_dist_shard AS s, + pg_dist_shard_placement AS sp + WHERE dobj.object_type IN ('table', 'foreign table') + AND dobj.objid = s.logicalrelid + AND s.shardid = sp.shardid; + + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP + IF v_obj.object_type NOT IN ('table', 'foreign table') THEN + CONTINUE; + END IF; + + -- nothing to do if not a distributed table + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN + CONTINUE; + END IF; + + -- ensure all shards are dropped + PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + + -- delete partition entry + DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + END LOOP; + + IF cardinality(sequence_names) = 0 THEN + RETURN; + END IF; + + FOR node_name, node_port IN + SELECT DISTINCT name, port + FROM unnest(node_names, node_ports) AS nodes(name, port) + LOOP + PERFORM master_drop_sequences(sequence_names, node_name, node_port); + END LOOP; +END; +$cdbdt$; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index f34725785..2f2de87d7 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.1-7' +default_version = '5.1-8' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 2d32f464e..38ec7ae19 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -7,41 +7,68 @@ */ #include "postgres.h" +#include "c.h" +#include "libpq-fe.h" #include "miscadmin.h" +#include "port.h" +#include + +#include "access/attnum.h" +#include "access/heapam.h" +#include "access/htup.h" #include "access/htup_details.h" #include "access/sysattr.h" +#include "access/tupdesc.h" #include "access/xact.h" #include "catalog/catalog.h" +#include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/namespace.h" +#include "catalog/pg_attribute.h" +#include "catalog/pg_class.h" #include "commands/defrem.h" #include "commands/tablecmds.h" #include "distributed/citus_ruleutils.h" #include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" +#include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" -#include "distributed/multi_utility.h" #include "distributed/multi_join_order.h" #include "distributed/multi_shard_transaction.h" +#include "distributed/multi_utility.h" /* IWYU pragma: keep */ +#include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" -#include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" -#include "foreign/foreign.h" #include "executor/executor.h" -#include "parser/parser.h" -#include "parser/parse_utilcmd.h" +#include "foreign/foreign.h" +#include "lib/stringinfo.h" +#include "nodes/bitmapset.h" +#include "nodes/nodes.h" +#include "nodes/params.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#include "nodes/value.h" #include "storage/lmgr.h" -#include "tcop/pquery.h" +#include "storage/lock.h" +#include "tcop/dest.h" #include "tcop/utility.h" +#include "utils/acl.h" #include "utils/builtins.h" +#include "utils/elog.h" +#include "utils/errcodes.h" +#include "utils/guc.h" +#include "utils/hsearch.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/palloc.h" #include "utils/rel.h" +#include "utils/relcache.h" #include "utils/syscache.h" @@ -81,6 +108,9 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement); static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement); +static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); +static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); +static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ @@ -158,6 +188,16 @@ multi_ProcessUtility(Node *parsetree, } } + if (IsA(parsetree, CreateSeqStmt)) + { + ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree); + } + + if (IsA(parsetree, AlterSeqStmt)) + { + ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree); + } + /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ if (EnableDDLPropagation) { @@ -848,6 +888,33 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) { case AT_AddColumn: { + if (IsA(command->def, ColumnDef)) + { + ColumnDef *column = (ColumnDef *) command->def; + + /* + * Check for SERIAL pseudo-types. The structure of this + * check is copied from transformColumnDefinition. + */ + if (column->typeName && list_length(column->typeName->names) == 1 && + !column->typeName->pct_type) + { + char *typeName = strVal(linitial(column->typeName->names)); + + if (strcmp(typeName, "smallserial") == 0 || + strcmp(typeName, "serial2") == 0 || + strcmp(typeName, "serial") == 0 || + strcmp(typeName, "serial4") == 0 || + strcmp(typeName, "bigserial") == 0 || + strcmp(typeName, "serial8") == 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute ADD COLUMN commands " + "involving serial pseudotypes"))); + } + } + } + break; } @@ -901,6 +968,126 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) } +/* + * ErrorIfUnsupportedSeqStmt errors out if the provided create sequence + * statement specifies a distributed table in its OWNED BY clause. + */ +static void +ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt) +{ + Oid ownedByTableId = InvalidOid; + + /* create is easy: just prohibit any distributed OWNED BY */ + if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId)) + { + if (IsDistributedTable(ownedByTableId)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create sequences that specify a distributed " + "table in their OWNED BY option"), + errhint("Use a sequence in a distributed table by specifying " + "a serial column type before creating any shards."))); + } + } +} + + +/* + * ErrorIfDistributedAlterSeqOwnedBy errors out if the provided alter sequence + * statement attempts to change the owned by property of a distributed sequence + * or attempt to change a local sequence to be owned by a distributed table. + */ +static void +ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt) +{ + Oid sequenceId = RangeVarGetRelid(alterSeqStmt->sequence, AccessShareLock, + alterSeqStmt->missing_ok); + Oid ownedByTableId = InvalidOid; + Oid newOwnedByTableId = InvalidOid; + int32 ownedByColumnId = 0; + bool hasDistributedOwner = false; + + /* alter statement referenced nonexistent sequence; return */ + if (sequenceId == InvalidOid) + { + return; + } + + /* see whether the sequences is already owned by a distributed table */ + if (sequenceIsOwned(sequenceId, &ownedByTableId, &ownedByColumnId)) + { + hasDistributedOwner = IsDistributedTable(ownedByTableId); + } + + if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId)) + { + /* if a distributed sequence tries to change owner, error */ + if (hasDistributedOwner && ownedByTableId != newOwnedByTableId) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot alter OWNED BY option of a sequence " + "already owned by a distributed table"))); + } + else if (!hasDistributedOwner && IsDistributedTable(newOwnedByTableId)) + { + /* and don't let local sequences get a distributed OWNED BY */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot associate an existing sequence with a " + "distributed table"), + errhint("Use a sequence in a distributed table by specifying " + "a serial column type before creating any shards."))); + } + } +} + + +/* + * OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER + * SEQUENCE command, extracting the first OWNED BY option it encounters. The + * identifier for the specified table is placed in the Oid out parameter before + * returning true. Returns false if no such option is found. Still returns true + * for OWNED BY NONE, but leaves the out paramter set to InvalidOid. + */ +static bool +OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId) +{ + ListCell *optionCell = NULL; + + foreach(optionCell, optionList) + { + DefElem *defElem = (DefElem *) lfirst(optionCell); + if (strcmp(defElem->defname, "owned_by") == 0) + { + List *ownedByNames = defGetQualifiedName(defElem); + int nameCount = list_length(ownedByNames); + + /* if only one name is present, this is OWNED BY NONE */ + if (nameCount == 1) + { + *ownedByTableId = InvalidOid; + return true; + } + else + { + /* + * Otherwise, we have a list of schema, table, column, which we + * need to truncate to simply the schema and table to determine + * the relevant relation identifier. + */ + List *relNameList = list_truncate(list_copy(ownedByNames), nameCount - 1); + RangeVar *rangeVar = makeRangeVarFromNameList(relNameList); + bool failOK = true; + + *ownedByTableId = RangeVarGetRelid(rangeVar, NoLock, failOK); + return true; + } + } + } + + return false; +} + + /* * ErrorIfDistributedRenameStmt errors out if the corresponding rename statement * operates on a distributed table or its objects. @@ -1002,6 +1189,13 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) { applyDDLCommand = true; } + else if ((IsA(ddlCommandNode, CreateSeqStmt))) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot copy to table with serial column from worker"), + errhint("Connect to the master node to COPY to tables which " + "use serial column types."))); + } /* run only a selected set of DDL commands */ if (applyDDLCommand) @@ -1449,7 +1643,6 @@ ReplicateGrantStmt(Node *parsetree) StringInfoData ddlString; ListCell *granteeCell = NULL; ListCell *objectCell = NULL; - ListCell *privilegeCell = NULL; bool isFirst = true; initStringInfo(&privsString); @@ -1474,6 +1667,8 @@ ReplicateGrantStmt(Node *parsetree) } else { + ListCell *privilegeCell = NULL; + isFirst = true; foreach(privilegeCell, grantStmt->privileges) { diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 5b46fa763..967be29fe 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -13,34 +13,43 @@ *------------------------------------------------------------------------- */ - #include "postgres.h" -#include "funcapi.h" +#include "c.h" +#include "fmgr.h" #include "miscadmin.h" +#include "port.h" + +#include #include "access/xact.h" #include "catalog/namespace.h" -#include "catalog/pg_class.h" #include "commands/dbcommands.h" -#include "commands/event_trigger.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" -#include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" +#include "distributed/multi_join_order.h" +#include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/relay_utility.h" #include "distributed/worker_protocol.h" +#include "lib/stringinfo.h" +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#include "nodes/relation.h" #include "optimizer/clauses.h" #include "optimizer/predtest.h" #include "optimizer/restrictinfo.h" -#include "optimizer/var.h" -#include "nodes/makefuncs.h" +#include "storage/lock.h" #include "tcop/tcopprot.h" +#include "utils/array.h" #include "utils/builtins.h" -#include "utils/datum.h" -#include "utils/inval.h" +#include "utils/elog.h" +#include "utils/errcodes.h" #include "utils/lsyscache.h" @@ -59,6 +68,7 @@ static bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_apply_delete_command); PG_FUNCTION_INFO_V1(master_drop_all_shards); +PG_FUNCTION_INFO_V1(master_drop_sequences); /* @@ -229,6 +239,61 @@ master_drop_all_shards(PG_FUNCTION_ARGS) } +/* + * master_drop_sequences attempts to drop a list of sequences on a specified + * node. The "IF EXISTS" clause is used to permit dropping sequences even if + * they may not exist. Returns true on success, false on failure. + */ +Datum +master_drop_sequences(PG_FUNCTION_ARGS) +{ + ArrayType *sequenceNamesArray = PG_GETARG_ARRAYTYPE_P(0); + text *nodeText = PG_GETARG_TEXT_P(1); + int64 nodePort = PG_GETARG_INT64(2); + bool dropSuccessful = false; + char *nodeName = TextDatumGetCString(nodeText); + + ArrayIterator sequenceIterator = NULL; + Datum sequenceText = 0; + bool isNull = false; + + StringInfo dropSeqCommand = makeStringInfo(); + + /* iterate over sequence names to build single command to DROP them all */ + sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL); + while (array_iterate(sequenceIterator, &sequenceText, &isNull)) + { + if (isNull) + { + ereport(ERROR, (errmsg("unexpected NULL sequence name"), + errcode(ERRCODE_INVALID_PARAMETER_VALUE))); + } + + /* append command portion if we haven't added any sequence names yet */ + if (dropSeqCommand->len == 0) + { + appendStringInfoString(dropSeqCommand, "DROP SEQUENCE IF EXISTS"); + } + else + { + /* otherwise, add a comma to separate subsequent sequence names */ + appendStringInfoChar(dropSeqCommand, ','); + } + + appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText)); + } + + dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand); + if (!dropSuccessful) + { + ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT + "\"", nodeName, nodePort))); + } + + PG_RETURN_BOOL(dropSuccessful); +} + + /* * DropShards drops all given shards in a relation. The id, name and schema * for the relation are explicitly provided, since this function may be diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 5cbef6eb8..e3835b20c 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -12,15 +12,25 @@ */ #include "postgres.h" - +#include "c.h" +#include "fmgr.h" #include "funcapi.h" #include "miscadmin.h" +#include + +#include "access/attnum.h" +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup.h" #include "access/htup_details.h" -#include "catalog/catalog.h" +#include "access/skey.h" +#include "access/stratnum.h" +#include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_type.h" #include "commands/sequence.h" @@ -28,21 +38,20 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_shard.h" -#include "distributed/pg_dist_partition.h" #include "distributed/worker_manager.h" #include "foreign/foreign.h" -#include "libpq/ip.h" -#include "libpq/libpq-be.h" +#include "lib/stringinfo.h" #include "nodes/pg_list.h" +#include "nodes/primnodes.h" #include "storage/lock.h" #include "utils/builtins.h" +#include "utils/elog.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/palloc.h" +#include "utils/relcache.h" #include "utils/ruleutils.h" -#include "utils/syscache.h" -#include "utils/tqual.h" /* Shard related configuration */ @@ -544,6 +553,8 @@ GetTableDDLEvents(Oid relationId) { List *tableDDLEventList = NIL; char tableType = 0; + List *sequenceIdlist = getOwnedSequences(relationId); + ListCell *sequenceIdCell; char *tableSchemaDef = NULL; char *tableColumnOptionsDef = NULL; char *schemaName = NULL; @@ -590,6 +601,15 @@ GetTableDDLEvents(Oid relationId) tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data); } + /* create sequences if needed */ + foreach(sequenceIdCell, sequenceIdlist) + { + Oid sequenceRelid = lfirst_oid(sequenceIdCell); + char *sequenceDef = pg_get_sequencedef_string(sequenceRelid); + + tableDDLEventList = lappend(tableDDLEventList, sequenceDef); + } + /* fetch table schema and column option definitions */ tableSchemaDef = pg_get_tableschemadef_string(relationId); tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId); diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index e5805f3eb..66dd80fa3 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -13,24 +13,33 @@ */ #include "postgres.h" +#include "c.h" + +#include +#include #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/htup.h" #include "access/skey.h" -#include "access/xact.h" +#include "access/stratnum.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" -#include "commands/defrem.h" #include "distributed/relay_utility.h" +#include "lib/stringinfo.h" +#include "nodes/nodes.h" #include "nodes/parsenodes.h" -#include "parser/parse_utilcmd.h" +#include "nodes/pg_list.h" +#include "nodes/primnodes.h" +#include "nodes/value.h" #include "storage/lock.h" -#include "tcop/utility.h" +#include "utils/elog.h" +#include "utils/errcodes.h" #include "utils/fmgroids.h" -#include "utils/lsyscache.h" -#include "utils/tqual.h" +#include "utils/palloc.h" +#include "utils/relcache.h" /* Local functions forward declarations */ @@ -43,43 +52,31 @@ static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName); /* * RelayEventExtendNames extends relation names in the given parse tree for - * certain utility commands. The function more specifically extends table, - * sequence, and index names in the parse tree by appending the given shardId; - * thereby avoiding name collisions in the database among sharded tables. This - * function has the side effect of extending relation names in the parse tree. + * certain utility commands. The function more specifically extends table and + * index names in the parse tree by appending the given shardId; thereby + * avoiding name collisions in the database among sharded tables. This function + * has the side effect of extending relation names in the parse tree. */ void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) { /* we don't extend names in extension or schema commands */ NodeTag nodeType = nodeTag(parseTree); - if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt) + if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt || + nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt) { return; } switch (nodeType) { - case T_AlterSeqStmt: - { - AlterSeqStmt *alterSeqStmt = (AlterSeqStmt *) parseTree; - char **sequenceName = &(alterSeqStmt->sequence->relname); - char **sequenceSchemaName = &(alterSeqStmt->sequence->schemaname); - - /* prefix with schema name if it is not added already */ - SetSchemaNameIfNotExist(sequenceSchemaName, schemaName); - - AppendShardIdToName(sequenceName, shardId); - break; - } - case T_AlterTableStmt: { /* - * We append shardId to the very end of table, sequence and index - * names to avoid name collisions. We usually do not touch - * constraint names, except for cases where they refer to index - * names. In those cases, we also append to constraint names. + * We append shardId to the very end of table and index names to + * avoid name collisions. We usually do not touch constraint names, + * except for cases where they refer to index names. In such cases, + * we also append to constraint names. */ AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree; @@ -144,19 +141,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) break; } - case T_CreateSeqStmt: - { - CreateSeqStmt *createSeqStmt = (CreateSeqStmt *) parseTree; - char **sequenceName = &(createSeqStmt->sequence->relname); - char **sequenceSchemaName = &(createSeqStmt->sequence->schemaname); - - /* prefix with schema name if it is not added already */ - SetSchemaNameIfNotExist(sequenceSchemaName, schemaName); - - AppendShardIdToName(sequenceName, shardId); - break; - } - case T_CreateForeignServerStmt: { CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree; @@ -198,9 +182,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) DropStmt *dropStmt = (DropStmt *) parseTree; ObjectType objectType = dropStmt->removeType; - if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE || - objectType == OBJECT_INDEX || objectType == OBJECT_FOREIGN_TABLE || - objectType == OBJECT_FOREIGN_SERVER) + if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX || + objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER) { List *relationNameList = NULL; int relationNameListLength = 0; @@ -216,11 +199,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) } /* - * We now need to extend a single relation, sequence or index - * name. To be able to do this extension, we need to extract the - * names' addresses from the value objects they are stored in. - * Otherwise, the repalloc called in AppendShardIdToName() will - * not have the correct memory address for the name. + * We now need to extend a single relation or index name. To be + * able to do this extension, we need to extract the names' + * addresses from the value objects they are stored in. Other- + * wise, the repalloc called in AppendShardIdToName() will not + * have the correct memory address for the name. */ relationNameList = (List *) linitial(dropStmt->objects); @@ -370,8 +353,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) RenameStmt *renameStmt = (RenameStmt *) parseTree; ObjectType objectType = renameStmt->renameType; - if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE || - objectType == OBJECT_INDEX) + if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX) { char **oldRelationName = &(renameStmt->relation->relname); char **newRelationName = &(renameStmt->newname); diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index ccf490885..337aa3d03 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -8,48 +8,51 @@ */ #include "postgres.h" +#include "c.h" +#include "miscadmin.h" -#include -#include +#include +#include "access/attnum.h" +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup.h" #include "access/htup_details.h" +#include "access/skey.h" +#include "access/stratnum.h" #include "access/sysattr.h" +#include "access/tupdesc.h" #include "catalog/dependency.h" #include "catalog/indexing.h" -#include "catalog/pg_aggregate.h" +#include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" +#include "catalog/pg_class.h" #include "catalog/pg_extension.h" #include "catalog/pg_foreign_data_wrapper.h" -#include "catalog/pg_opclass.h" -#include "catalog/pg_operator.h" -#include "catalog/pg_proc.h" -#include "catalog/pg_type.h" -#include "distributed/citus_nodefuncs.h" -#include "distributed/citus_ruleutils.h" +#include "catalog/pg_index.h" #include "commands/defrem.h" #include "commands/extension.h" +#include "commands/sequence.h" +#include "distributed/citus_ruleutils.h" #include "foreign/foreign.h" -#include "funcapi.h" -#include "mb/pg_wchar.h" -#include "miscadmin.h" -#include "nodes/makefuncs.h" -#include "nodes/nodeFuncs.h" -#include "optimizer/tlist.h" -#include "parser/keywords.h" -#include "parser/parse_agg.h" -#include "parser/parse_func.h" -#include "parser/parse_oper.h" -#include "parser/parser.h" -#include "parser/parsetree.h" -#include "rewrite/rewriteHandler.h" +#include "lib/stringinfo.h" +#include "nodes/nodes.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "storage/lock.h" +#include "utils/acl.h" +#include "utils/array.h" #include "utils/builtins.h" +#include "utils/elog.h" +#include "utils/errcodes.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/palloc.h" #include "utils/rel.h" +#include "utils/relcache.h" #include "utils/ruleutils.h" #include "utils/syscache.h" -#include "utils/typcache.h" -#include "utils/xml.h" + static void AppendOptionListToString(StringInfo stringData, List *options); static const char * convert_aclright_to_string(int aclright); @@ -206,6 +209,59 @@ AppendOptionListToString(StringInfo stringBuffer, List *optionList) } +/* + * pg_get_sequencedef_string returns the definition of a given sequence. This + * definition includes explicit values for all CREATE SEQUENCE options. + */ +char * +pg_get_sequencedef_string(Oid sequenceRelationId) +{ + char *qualifiedSequenceName = NULL; + char *sequenceDef = NULL; + Form_pg_sequence pgSequenceForm = NULL; + Relation sequenceRel = NULL; + AclResult permissionCheck = ACLCHECK_NO_PRIV; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + + /* open and lock sequence */ + sequenceRel = heap_open(sequenceRelationId, AccessShareLock); + + /* check permissions to read sequence attributes */ + permissionCheck = pg_class_aclcheck(sequenceRelationId, GetUserId(), + ACL_SELECT | ACL_USAGE); + if (permissionCheck != ACLCHECK_OK) + { + ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied for sequence %s", + RelationGetRelationName(sequenceRel)))); + } + + /* retrieve attributes from first tuple */ + scanDescriptor = systable_beginscan(sequenceRel, InvalidOid, false, NULL, 0, NULL); + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find specified sequence"))); + } + + pgSequenceForm = (Form_pg_sequence) GETSTRUCT(heapTuple); + + /* build our DDL command */ + qualifiedSequenceName = generate_relation_name(sequenceRelationId, NIL); + sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName, + pgSequenceForm->increment_by, pgSequenceForm->min_value, + pgSequenceForm->max_value, pgSequenceForm->cache_value, + pgSequenceForm->is_cycled ? "" : "NO "); + + systable_endscan(scanDescriptor); + + heap_close(sequenceRel, AccessShareLock); + + return sequenceDef; +} + + /* * pg_get_tableschemadef_string returns the definition of a given table. This * definition includes table's schema, default column values, not null and check @@ -266,11 +322,12 @@ pg_get_tableschemadef_string(Oid tableRelationId) for (attributeIndex = 0; attributeIndex < tupleDescriptor->natts; attributeIndex++) { Form_pg_attribute attributeForm = tupleDescriptor->attrs[attributeIndex]; - const char *attributeName = NULL; - const char *attributeTypeName = NULL; if (!attributeForm->attisdropped && attributeForm->attinhcount == 0) { + const char *attributeName = NULL; + const char *attributeTypeName = NULL; + if (firstAttributePrinted) { appendStringInfoString(&buffer, ", "); @@ -399,7 +456,6 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) char relationKind = 0; TupleDesc tupleDescriptor = NULL; AttrNumber attributeIndex = 0; - char *columnOptionStatement = NULL; List *columnOptionList = NIL; ListCell *columnOptionCell = NULL; bool firstOptionPrinted = false; @@ -511,6 +567,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) */ foreach(columnOptionCell, columnOptionList) { + char *columnOptionStatement = NULL; + if (!firstOptionPrinted) { initStringInfo(&buffer); @@ -591,9 +649,7 @@ pg_get_table_grants(Oid relationId) List *defs = NIL; HeapTuple classTuple = NULL; Datum aclDatum = 0; - Acl *acl = NULL; bool isNull = false; - int offtype = 0; relation = relation_open(relationId, AccessShareLock); relationName = generate_relation_name(relationId, NIL); @@ -619,7 +675,8 @@ pg_get_table_grants(Oid relationId) { int i = 0; AclItem *aidat = NULL; - + Acl *acl = NULL; + int offtype = 0; /* * First revoke all default permissions, so we can start adding the diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 861cb56d7..57640378d 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -11,18 +11,27 @@ #ifndef CITUS_RULEUTILS_H #define CITUS_RULEUTILS_H +#include "postgres.h" /* IWYU pragma: keep */ +#include "c.h" + #include "lib/stringinfo.h" #include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#define CREATE_SEQUENCE_COMMAND \ + "CREATE SEQUENCE IF NOT EXISTS %s INCREMENT BY " INT64_FORMAT " MINVALUE " \ + INT64_FORMAT " MAXVALUE " INT64_FORMAT " START WITH " INT64_FORMAT " %sCYCLE" + /* Function declarations for version independent Citus ruleutils wrapper functions */ extern char * pg_get_extensiondef_string(Oid tableRelationId); +extern Oid get_extension_schema(Oid ext_oid); extern char * pg_get_serverdef_string(Oid tableRelationId); +extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern char * pg_get_tableschemadef_string(Oid tableRelationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); -extern Oid get_extension_schema(Oid ext_oid); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index c0ee27ee1..4b4e5a8fb 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -14,7 +14,10 @@ #ifndef MASTER_PROTOCOL_H #define MASTER_PROTOCOL_H +#include "postgres.h" +#include "c.h" #include "fmgr.h" + #include "nodes/pg_list.h" @@ -108,6 +111,7 @@ extern Datum master_create_empty_shard(PG_FUNCTION_ARGS); extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS); extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); +extern Datum master_drop_sequences(PG_FUNCTION_ARGS); extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 1354f4c00..fd6bea47a 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -390,7 +390,7 @@ ORDER BY customer_keys.o_custkey DESC LIMIT 10 OFFSET 20; DEBUG: push down of limit count: 30 -DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977" +DEBUG: building index "pg_toast_16992_index" on table "pg_toast_16992" o_custkey | total_order_count -----------+------------------- 1466 | 1 diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index d4d8b1b27..a2a2fbb68 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -21,6 +21,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; +ALTER EXTENSION citus UPDATE TO '5.1-8'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_modifications.out b/src/test/regress/expected/multi_modifications.out index f6a9dd0b2..3838f76e2 100644 --- a/src/test/regress/expected/multi_modifications.out +++ b/src/test/regress/expected/multi_modifications.out @@ -586,3 +586,36 @@ SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; ----------+------ (0 rows) +-- verify interaction of default values, SERIAL, and RETURNING +\set QUIET on +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('app_analytics_events', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; + id +---- + 1 +(1 row) + +INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; + id +---- + 2 +(1 row) + +INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; + id | app_id | name +----+--------+------ + 3 | 103 | Mynt +(1 row) + diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 27400cf5e..59f398bb4 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -23,12 +23,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column ALTER TABLE testtableddl DROP COLUMN distributecol; ERROR: cannot execute ALTER TABLE command involving partition column -- verify that the table cannot be dropped in a transaction block +\set VERBOSITY terse BEGIN; DROP TABLE testtableddl; ERROR: DROP distributed table cannot run inside a transaction block -CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" -PL/pgSQL function citus_drop_trigger() line 15 at PERFORM ROLLBACK; +\set VERBOSITY default -- verify that the table can be dropped DROP TABLE testtableddl; -- verify that the table can dropped even if shards exist @@ -65,3 +65,54 @@ SELECT * FROM pg_dist_shard_placement; -- check that the extension now can be dropped (and recreated) DROP EXTENSION citus; CREATE EXTENSION citus; +-- create a table with a SERIAL column +CREATE TABLE testserialtable(id serial, group_id integer); +SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('testserialtable', 2, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- should not be able to add additional serial columns +ALTER TABLE testserialtable ADD COLUMN other_id serial; +ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes +-- and we shouldn't be able to change a distributed sequence's owner +ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE; +ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table +-- or create a sequence with a distributed owner +CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; +ERROR: cannot create sequences that specify a distributed table in their OWNED BY option +HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards. +-- or even change a manual sequence to be owned by a distributed table +CREATE SEQUENCE standalone_sequence; +ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; +ERROR: cannot associate an existing sequence with a distributed table +HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards. +-- an edge case, but it's OK to change an owner to the same distributed table +ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; +-- verify sequence was created on worker +\c - - - :worker_1_port +\ds + List of relations + Schema | Name | Type | Owner +--------+------------------------+----------+---------- + public | testserialtable_id_seq | sequence | postgres +(1 row) + +-- drop distributed table +\c - - - :master_port +DROP TABLE testserialtable; +-- verify owned sequence is dropped +\c - - - :worker_1_port +\ds + List of relations + Schema | Name | Type | Owner +--------+------+------+------- +(0 rows) + diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index c5454061e..da6507b44 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -319,6 +319,19 @@ ORDER BY LIMIT 5; +-- Ensure that copy from worker node of table with serial column fails +CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial); +SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append'); + +-- Connect to the first worker node +\c - - - 57637 + +-- Test copy from the worker node +COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); + +-- Connect back to the master node +\c - - - 57636 + -- Create customer table for the worker copy with constraint and index CREATE TABLE customer_worker_copy_append ( c_custkey integer , diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index e00bdd16f..116795614 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -427,6 +427,22 @@ LIMIT 560137 | 57637 (5 rows) +-- Ensure that copy from worker node of table with serial column fails +CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial); +SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Connect to the first worker node +\c - - - 57637 +-- Test copy from the worker node +COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); +ERROR: cannot copy to table with serial column from worker +HINT: Connect to the master node to COPY to tables which use serial column types. +-- Connect back to the master node +\c - - - 57636 -- Create customer table for the worker copy with constraint and index CREATE TABLE customer_worker_copy_append ( c_custkey integer , diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 14cafa7d8..3fd2deca7 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -26,6 +26,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; +ALTER EXTENSION citus UPDATE TO '5.1-8'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_modifications.sql b/src/test/regress/sql/multi_modifications.sql index 5839f4188..03decd887 100644 --- a/src/test/regress/sql/multi_modifications.sql +++ b/src/test/regress/sql/multi_modifications.sql @@ -401,3 +401,13 @@ DELETE FROM multiple_hash WHERE category = '1' RETURNING category; -- check SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; + +-- verify interaction of default values, SERIAL, and RETURNING +\set QUIET on +CREATE TABLE app_analytics_events (id serial, app_id integer, name text); +SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash'); +SELECT master_create_worker_shards('app_analytics_events', 4, 1); + +INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id; +INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id; +INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *; diff --git a/src/test/regress/sql/multi_table_ddl.sql b/src/test/regress/sql/multi_table_ddl.sql index c79703777..31ded4bc9 100644 --- a/src/test/regress/sql/multi_table_ddl.sql +++ b/src/test/regress/sql/multi_table_ddl.sql @@ -21,9 +21,11 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text; ALTER TABLE testtableddl DROP COLUMN distributecol; -- verify that the table cannot be dropped in a transaction block +\set VERBOSITY terse BEGIN; DROP TABLE testtableddl; ROLLBACK; +\set VERBOSITY default -- verify that the table can be dropped DROP TABLE testtableddl; @@ -42,3 +44,36 @@ SELECT * FROM pg_dist_shard_placement; -- check that the extension now can be dropped (and recreated) DROP EXTENSION citus; CREATE EXTENSION citus; + +-- create a table with a SERIAL column +CREATE TABLE testserialtable(id serial, group_id integer); +SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash'); +SELECT master_create_worker_shards('testserialtable', 2, 1); + +-- should not be able to add additional serial columns +ALTER TABLE testserialtable ADD COLUMN other_id serial; + +-- and we shouldn't be able to change a distributed sequence's owner +ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE; + +-- or create a sequence with a distributed owner +CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; + +-- or even change a manual sequence to be owned by a distributed table +CREATE SEQUENCE standalone_sequence; +ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id; + +-- an edge case, but it's OK to change an owner to the same distributed table +ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; + +-- verify sequence was created on worker +\c - - - :worker_1_port +\ds + +-- drop distributed table +\c - - - :master_port +DROP TABLE testserialtable; + +-- verify owned sequence is dropped +\c - - - :worker_1_port +\ds