Support SERIAL/BIGSERIAL non-partition columns

This adds support for SERIAL/BIGSERIAL column types. Because we now can
evaluate functions on the master (during execution), adding this is a
matter of ensuring the table creation step works properly.

To accomplish this, I've added some logic to detect sequences owned by
a table (i.e. those related to its columns). Simply creating a sequence
and using it in a default value is insufficient; users who do so must
ensure the sequence is owned by the column using it.

Fortunately, this is exactly what SERIAL and BIGSERIAL do, which is the
use case we're targeting with this feature. While testing this, I found
that worker_apply_shard_ddl_command actually adds shard identifiers to
sequence names, though I found no places that use or test this path. I
removed that code so that sequence names are not mutated and will match
those used by a SERIAL default value expression.

Our use of the new-to-9.5 CREATE SEQUENCE IF NOT EXISTS syntax means we
are dropping support for 9.4 (which is being done regardless, but makes
this change simpler). I've removed 9.4 from the Travis build matrix.

Some edge cases are possible in ALTER SEQUENCE, COPY FROM (on workers),
and CREATE SEQUENCE OWNED BY. I've added errors for each so that users
understand when and why certain operations are prohibited.
pull/673/head
Jason Petersen 2016-07-25 22:47:20 -06:00
parent 30d8b74245
commit abe7304898
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
20 changed files with 679 additions and 112 deletions

View File

@ -12,3 +12,4 @@
# ignore latest install file # ignore latest install file
citus--5.0.sql citus--5.0.sql
citus--5.?-*.sql citus--5.?-*.sql
!citus--5.?-*--5.?-*.sql

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus MODULE_big = citus
EXTENSION = citus EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -47,6 +47,8 @@ $(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql $(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,72 @@
CREATE FUNCTION pg_catalog.master_drop_sequences(sequence_names text[],
node_name text,
node_port bigint)
RETURNS bool
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_drop_sequences$$;
COMMENT ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint)
IS 'drop specified sequences from a node';
REVOKE ALL ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
node_names text[] := '{}';
node_ports bigint[] := '{}';
node_name text;
node_port bigint;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
-- Must accumulate set of affected nodes before deleting placements, as
-- master_drop_all_shards will erase their rows, making it impossible for
-- us to know where to drop sequences (which must be dropped after shards,
-- since they have default value expressions which depend on sequences).
SELECT array_agg(sp.nodename), array_agg(sp.nodeport)
INTO node_names, node_ports
FROM pg_event_trigger_dropped_objects() AS dobj,
pg_dist_shard AS s,
pg_dist_shard_placement AS sp
WHERE dobj.object_type IN ('table', 'foreign table')
AND dobj.objid = s.logicalrelid
AND s.shardid = sp.shardid;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF v_obj.object_type NOT IN ('table', 'foreign table') THEN
CONTINUE;
END IF;
-- nothing to do if not a distributed table
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN
CONTINUE;
END IF;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- delete partition entry
DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
FOR node_name, node_port IN
SELECT DISTINCT name, port
FROM unnest(node_names, node_ports) AS nodes(name, port)
LOOP
PERFORM master_drop_sequences(sequence_names, node_name, node_port);
END LOOP;
END;
$cdbdt$;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '5.1-7' default_version = '5.1-8'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -7,41 +7,68 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "c.h"
#include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "port.h"
#include <string.h>
#include "access/attnum.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/tupdesc.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_class.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_utility.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "foreign/foreign.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "parser/parser.h" #include "foreign/foreign.h"
#include "parser/parse_utilcmd.h" #include "lib/stringinfo.h"
#include "nodes/bitmapset.h"
#include "nodes/nodes.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "tcop/pquery.h" #include "storage/lock.h"
#include "tcop/dest.h"
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
@ -81,6 +108,9 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement); static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement); static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
@ -158,6 +188,16 @@ multi_ProcessUtility(Node *parsetree,
} }
} }
if (IsA(parsetree, CreateSeqStmt))
{
ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
}
if (IsA(parsetree, AlterSeqStmt))
{
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
}
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */ /* ddl commands are propagated to workers only if EnableDDLPropagation is set */
if (EnableDDLPropagation) if (EnableDDLPropagation)
{ {
@ -848,6 +888,33 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
case AT_AddColumn: case AT_AddColumn:
{ {
if (IsA(command->def, ColumnDef))
{
ColumnDef *column = (ColumnDef *) command->def;
/*
* Check for SERIAL pseudo-types. The structure of this
* check is copied from transformColumnDefinition.
*/
if (column->typeName && list_length(column->typeName->names) == 1 &&
!column->typeName->pct_type)
{
char *typeName = strVal(linitial(column->typeName->names));
if (strcmp(typeName, "smallserial") == 0 ||
strcmp(typeName, "serial2") == 0 ||
strcmp(typeName, "serial") == 0 ||
strcmp(typeName, "serial4") == 0 ||
strcmp(typeName, "bigserial") == 0 ||
strcmp(typeName, "serial8") == 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot execute ADD COLUMN commands "
"involving serial pseudotypes")));
}
}
}
break; break;
} }
@ -901,6 +968,126 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
} }
/*
* ErrorIfUnsupportedSeqStmt errors out if the provided create sequence
* statement specifies a distributed table in its OWNED BY clause.
*/
static void
ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt)
{
Oid ownedByTableId = InvalidOid;
/* create is easy: just prohibit any distributed OWNED BY */
if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId))
{
if (IsDistributedTable(ownedByTableId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create sequences that specify a distributed "
"table in their OWNED BY option"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* ErrorIfDistributedAlterSeqOwnedBy errors out if the provided alter sequence
* statement attempts to change the owned by property of a distributed sequence
* or attempt to change a local sequence to be owned by a distributed table.
*/
static void
ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
{
Oid sequenceId = RangeVarGetRelid(alterSeqStmt->sequence, AccessShareLock,
alterSeqStmt->missing_ok);
Oid ownedByTableId = InvalidOid;
Oid newOwnedByTableId = InvalidOid;
int32 ownedByColumnId = 0;
bool hasDistributedOwner = false;
/* alter statement referenced nonexistent sequence; return */
if (sequenceId == InvalidOid)
{
return;
}
/* see whether the sequences is already owned by a distributed table */
if (sequenceIsOwned(sequenceId, &ownedByTableId, &ownedByColumnId))
{
hasDistributedOwner = IsDistributedTable(ownedByTableId);
}
if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId))
{
/* if a distributed sequence tries to change owner, error */
if (hasDistributedOwner && ownedByTableId != newOwnedByTableId)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter OWNED BY option of a sequence "
"already owned by a distributed table")));
}
else if (!hasDistributedOwner && IsDistributedTable(newOwnedByTableId))
{
/* and don't let local sequences get a distributed OWNED BY */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot associate an existing sequence with a "
"distributed table"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
* identifier for the specified table is placed in the Oid out parameter before
* returning true. Returns false if no such option is found. Still returns true
* for OWNED BY NONE, but leaves the out paramter set to InvalidOid.
*/
static bool
OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId)
{
ListCell *optionCell = NULL;
foreach(optionCell, optionList)
{
DefElem *defElem = (DefElem *) lfirst(optionCell);
if (strcmp(defElem->defname, "owned_by") == 0)
{
List *ownedByNames = defGetQualifiedName(defElem);
int nameCount = list_length(ownedByNames);
/* if only one name is present, this is OWNED BY NONE */
if (nameCount == 1)
{
*ownedByTableId = InvalidOid;
return true;
}
else
{
/*
* Otherwise, we have a list of schema, table, column, which we
* need to truncate to simply the schema and table to determine
* the relevant relation identifier.
*/
List *relNameList = list_truncate(list_copy(ownedByNames), nameCount - 1);
RangeVar *rangeVar = makeRangeVarFromNameList(relNameList);
bool failOK = true;
*ownedByTableId = RangeVarGetRelid(rangeVar, NoLock, failOK);
return true;
}
}
}
return false;
}
/* /*
* ErrorIfDistributedRenameStmt errors out if the corresponding rename statement * ErrorIfDistributedRenameStmt errors out if the corresponding rename statement
* operates on a distributed table or its objects. * operates on a distributed table or its objects.
@ -1002,6 +1189,13 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
{ {
applyDDLCommand = true; applyDDLCommand = true;
} }
else if ((IsA(ddlCommandNode, CreateSeqStmt)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy to table with serial column from worker"),
errhint("Connect to the master node to COPY to tables which "
"use serial column types.")));
}
/* run only a selected set of DDL commands */ /* run only a selected set of DDL commands */
if (applyDDLCommand) if (applyDDLCommand)
@ -1449,7 +1643,6 @@ ReplicateGrantStmt(Node *parsetree)
StringInfoData ddlString; StringInfoData ddlString;
ListCell *granteeCell = NULL; ListCell *granteeCell = NULL;
ListCell *objectCell = NULL; ListCell *objectCell = NULL;
ListCell *privilegeCell = NULL;
bool isFirst = true; bool isFirst = true;
initStringInfo(&privsString); initStringInfo(&privsString);
@ -1474,6 +1667,8 @@ ReplicateGrantStmt(Node *parsetree)
} }
else else
{ {
ListCell *privilegeCell = NULL;
isFirst = true; isFirst = true;
foreach(privilegeCell, grantStmt->privileges) foreach(privilegeCell, grantStmt->privileges)
{ {

View File

@ -13,34 +13,43 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "c.h"
#include "fmgr.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "port.h"
#include <stddef.h>
#include "access/xact.h" #include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/event_trigger.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/predtest.h" #include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h" #include "optimizer/restrictinfo.h"
#include "optimizer/var.h" #include "storage/lock.h"
#include "nodes/makefuncs.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/array.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/datum.h" #include "utils/elog.h"
#include "utils/inval.h" #include "utils/errcodes.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -59,6 +68,7 @@ static bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort,
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_apply_delete_command); PG_FUNCTION_INFO_V1(master_apply_delete_command);
PG_FUNCTION_INFO_V1(master_drop_all_shards); PG_FUNCTION_INFO_V1(master_drop_all_shards);
PG_FUNCTION_INFO_V1(master_drop_sequences);
/* /*
@ -229,6 +239,61 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
} }
/*
* master_drop_sequences attempts to drop a list of sequences on a specified
* node. The "IF EXISTS" clause is used to permit dropping sequences even if
* they may not exist. Returns true on success, false on failure.
*/
Datum
master_drop_sequences(PG_FUNCTION_ARGS)
{
ArrayType *sequenceNamesArray = PG_GETARG_ARRAYTYPE_P(0);
text *nodeText = PG_GETARG_TEXT_P(1);
int64 nodePort = PG_GETARG_INT64(2);
bool dropSuccessful = false;
char *nodeName = TextDatumGetCString(nodeText);
ArrayIterator sequenceIterator = NULL;
Datum sequenceText = 0;
bool isNull = false;
StringInfo dropSeqCommand = makeStringInfo();
/* iterate over sequence names to build single command to DROP them all */
sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL);
while (array_iterate(sequenceIterator, &sequenceText, &isNull))
{
if (isNull)
{
ereport(ERROR, (errmsg("unexpected NULL sequence name"),
errcode(ERRCODE_INVALID_PARAMETER_VALUE)));
}
/* append command portion if we haven't added any sequence names yet */
if (dropSeqCommand->len == 0)
{
appendStringInfoString(dropSeqCommand, "DROP SEQUENCE IF EXISTS");
}
else
{
/* otherwise, add a comma to separate subsequent sequence names */
appendStringInfoChar(dropSeqCommand, ',');
}
appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText));
}
dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand);
if (!dropSuccessful)
{
ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT
"\"", nodeName, nodePort)));
}
PG_RETURN_BOOL(dropSuccessful);
}
/* /*
* DropShards drops all given shards in a relation. The id, name and schema * DropShards drops all given shards in a relation. The id, name and schema
* for the relation are explicitly provided, since this function may be * for the relation are explicitly provided, since this function may be

View File

@ -12,15 +12,25 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include "funcapi.h" #include "funcapi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include <string.h>
#include "access/attnum.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/catalog.h" #include "access/skey.h"
#include "access/stratnum.h"
#include "access/tupdesc.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h" #include "catalog/pg_index.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/sequence.h" #include "commands/sequence.h"
@ -28,21 +38,20 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "libpq/ip.h" #include "lib/stringinfo.h"
#include "libpq/libpq-be.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/palloc.h"
#include "utils/relcache.h"
#include "utils/ruleutils.h" #include "utils/ruleutils.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
/* Shard related configuration */ /* Shard related configuration */
@ -544,6 +553,8 @@ GetTableDDLEvents(Oid relationId)
{ {
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
char tableType = 0; char tableType = 0;
List *sequenceIdlist = getOwnedSequences(relationId);
ListCell *sequenceIdCell;
char *tableSchemaDef = NULL; char *tableSchemaDef = NULL;
char *tableColumnOptionsDef = NULL; char *tableColumnOptionsDef = NULL;
char *schemaName = NULL; char *schemaName = NULL;
@ -590,6 +601,15 @@ GetTableDDLEvents(Oid relationId)
tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data); tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data);
} }
/* create sequences if needed */
foreach(sequenceIdCell, sequenceIdlist)
{
Oid sequenceRelid = lfirst_oid(sequenceIdCell);
char *sequenceDef = pg_get_sequencedef_string(sequenceRelid);
tableDDLEventList = lappend(tableDDLEventList, sequenceDef);
}
/* fetch table schema and column option definitions */ /* fetch table schema and column option definitions */
tableSchemaDef = pg_get_tableschemadef_string(relationId); tableSchemaDef = pg_get_tableschemadef_string(relationId);
tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId); tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);

View File

@ -13,24 +13,33 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "c.h"
#include <stdio.h>
#include <string.h>
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/htup.h"
#include "access/skey.h" #include "access/skey.h"
#include "access/xact.h" #include "access/stratnum.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_constraint.h" #include "catalog/pg_constraint.h"
#include "commands/defrem.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "parser/parse_utilcmd.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "tcop/utility.h" #include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/palloc.h"
#include "utils/tqual.h" #include "utils/relcache.h"
/* Local functions forward declarations */ /* Local functions forward declarations */
@ -43,43 +52,31 @@ static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
/* /*
* RelayEventExtendNames extends relation names in the given parse tree for * RelayEventExtendNames extends relation names in the given parse tree for
* certain utility commands. The function more specifically extends table, * certain utility commands. The function more specifically extends table and
* sequence, and index names in the parse tree by appending the given shardId; * index names in the parse tree by appending the given shardId; thereby
* thereby avoiding name collisions in the database among sharded tables. This * avoiding name collisions in the database among sharded tables. This function
* function has the side effect of extending relation names in the parse tree. * has the side effect of extending relation names in the parse tree.
*/ */
void void
RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{ {
/* we don't extend names in extension or schema commands */ /* we don't extend names in extension or schema commands */
NodeTag nodeType = nodeTag(parseTree); NodeTag nodeType = nodeTag(parseTree);
if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt) if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt ||
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt)
{ {
return; return;
} }
switch (nodeType) switch (nodeType)
{ {
case T_AlterSeqStmt:
{
AlterSeqStmt *alterSeqStmt = (AlterSeqStmt *) parseTree;
char **sequenceName = &(alterSeqStmt->sequence->relname);
char **sequenceSchemaName = &(alterSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
}
case T_AlterTableStmt: case T_AlterTableStmt:
{ {
/* /*
* We append shardId to the very end of table, sequence and index * We append shardId to the very end of table and index names to
* names to avoid name collisions. We usually do not touch * avoid name collisions. We usually do not touch constraint names,
* constraint names, except for cases where they refer to index * except for cases where they refer to index names. In such cases,
* names. In those cases, we also append to constraint names. * we also append to constraint names.
*/ */
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree; AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
@ -144,19 +141,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
break; break;
} }
case T_CreateSeqStmt:
{
CreateSeqStmt *createSeqStmt = (CreateSeqStmt *) parseTree;
char **sequenceName = &(createSeqStmt->sequence->relname);
char **sequenceSchemaName = &(createSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
}
case T_CreateForeignServerStmt: case T_CreateForeignServerStmt:
{ {
CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree; CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree;
@ -198,9 +182,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
DropStmt *dropStmt = (DropStmt *) parseTree; DropStmt *dropStmt = (DropStmt *) parseTree;
ObjectType objectType = dropStmt->removeType; ObjectType objectType = dropStmt->removeType;
if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE || if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX ||
objectType == OBJECT_INDEX || objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER)
objectType == OBJECT_FOREIGN_SERVER)
{ {
List *relationNameList = NULL; List *relationNameList = NULL;
int relationNameListLength = 0; int relationNameListLength = 0;
@ -216,11 +199,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
} }
/* /*
* We now need to extend a single relation, sequence or index * We now need to extend a single relation or index name. To be
* name. To be able to do this extension, we need to extract the * able to do this extension, we need to extract the names'
* names' addresses from the value objects they are stored in. * addresses from the value objects they are stored in. Other-
* Otherwise, the repalloc called in AppendShardIdToName() will * wise, the repalloc called in AppendShardIdToName() will not
* not have the correct memory address for the name. * have the correct memory address for the name.
*/ */
relationNameList = (List *) linitial(dropStmt->objects); relationNameList = (List *) linitial(dropStmt->objects);
@ -370,8 +353,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
RenameStmt *renameStmt = (RenameStmt *) parseTree; RenameStmt *renameStmt = (RenameStmt *) parseTree;
ObjectType objectType = renameStmt->renameType; ObjectType objectType = renameStmt->renameType;
if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE || if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX)
objectType == OBJECT_INDEX)
{ {
char **oldRelationName = &(renameStmt->relation->relname); char **oldRelationName = &(renameStmt->relation->relname);
char **newRelationName = &(renameStmt->newname); char **newRelationName = &(renameStmt->newname);

View File

@ -8,48 +8,51 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "c.h"
#include "miscadmin.h"
#include <unistd.h> #include <stddef.h>
#include <fcntl.h>
#include "access/attnum.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/skey.h"
#include "access/stratnum.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/tupdesc.h"
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/pg_aggregate.h" #include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
#include "catalog/pg_extension.h" #include "catalog/pg_extension.h"
#include "catalog/pg_foreign_data_wrapper.h" #include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_opclass.h" #include "catalog/pg_index.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "funcapi.h" #include "lib/stringinfo.h"
#include "mb/pg_wchar.h" #include "nodes/nodes.h"
#include "miscadmin.h" #include "nodes/parsenodes.h"
#include "nodes/makefuncs.h" #include "nodes/pg_list.h"
#include "nodes/nodeFuncs.h" #include "storage/lock.h"
#include "optimizer/tlist.h" #include "utils/acl.h"
#include "parser/keywords.h" #include "utils/array.h"
#include "parser/parse_agg.h"
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "parser/parser.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteHandler.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/palloc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/ruleutils.h" #include "utils/ruleutils.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/xml.h"
static void AppendOptionListToString(StringInfo stringData, List *options); static void AppendOptionListToString(StringInfo stringData, List *options);
static const char * convert_aclright_to_string(int aclright); static const char * convert_aclright_to_string(int aclright);
@ -206,6 +209,59 @@ AppendOptionListToString(StringInfo stringBuffer, List *optionList)
} }
/*
* pg_get_sequencedef_string returns the definition of a given sequence. This
* definition includes explicit values for all CREATE SEQUENCE options.
*/
char *
pg_get_sequencedef_string(Oid sequenceRelationId)
{
char *qualifiedSequenceName = NULL;
char *sequenceDef = NULL;
Form_pg_sequence pgSequenceForm = NULL;
Relation sequenceRel = NULL;
AclResult permissionCheck = ACLCHECK_NO_PRIV;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
/* open and lock sequence */
sequenceRel = heap_open(sequenceRelationId, AccessShareLock);
/* check permissions to read sequence attributes */
permissionCheck = pg_class_aclcheck(sequenceRelationId, GetUserId(),
ACL_SELECT | ACL_USAGE);
if (permissionCheck != ACLCHECK_OK)
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for sequence %s",
RelationGetRelationName(sequenceRel))));
}
/* retrieve attributes from first tuple */
scanDescriptor = systable_beginscan(sequenceRel, InvalidOid, false, NULL, 0, NULL);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find specified sequence")));
}
pgSequenceForm = (Form_pg_sequence) GETSTRUCT(heapTuple);
/* build our DDL command */
qualifiedSequenceName = generate_relation_name(sequenceRelationId, NIL);
sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName,
pgSequenceForm->increment_by, pgSequenceForm->min_value,
pgSequenceForm->max_value, pgSequenceForm->cache_value,
pgSequenceForm->is_cycled ? "" : "NO ");
systable_endscan(scanDescriptor);
heap_close(sequenceRel, AccessShareLock);
return sequenceDef;
}
/* /*
* pg_get_tableschemadef_string returns the definition of a given table. This * pg_get_tableschemadef_string returns the definition of a given table. This
* definition includes table's schema, default column values, not null and check * definition includes table's schema, default column values, not null and check
@ -266,11 +322,12 @@ pg_get_tableschemadef_string(Oid tableRelationId)
for (attributeIndex = 0; attributeIndex < tupleDescriptor->natts; attributeIndex++) for (attributeIndex = 0; attributeIndex < tupleDescriptor->natts; attributeIndex++)
{ {
Form_pg_attribute attributeForm = tupleDescriptor->attrs[attributeIndex]; Form_pg_attribute attributeForm = tupleDescriptor->attrs[attributeIndex];
const char *attributeName = NULL;
const char *attributeTypeName = NULL;
if (!attributeForm->attisdropped && attributeForm->attinhcount == 0) if (!attributeForm->attisdropped && attributeForm->attinhcount == 0)
{ {
const char *attributeName = NULL;
const char *attributeTypeName = NULL;
if (firstAttributePrinted) if (firstAttributePrinted)
{ {
appendStringInfoString(&buffer, ", "); appendStringInfoString(&buffer, ", ");
@ -399,7 +456,6 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
char relationKind = 0; char relationKind = 0;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
AttrNumber attributeIndex = 0; AttrNumber attributeIndex = 0;
char *columnOptionStatement = NULL;
List *columnOptionList = NIL; List *columnOptionList = NIL;
ListCell *columnOptionCell = NULL; ListCell *columnOptionCell = NULL;
bool firstOptionPrinted = false; bool firstOptionPrinted = false;
@ -511,6 +567,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
*/ */
foreach(columnOptionCell, columnOptionList) foreach(columnOptionCell, columnOptionList)
{ {
char *columnOptionStatement = NULL;
if (!firstOptionPrinted) if (!firstOptionPrinted)
{ {
initStringInfo(&buffer); initStringInfo(&buffer);
@ -591,9 +649,7 @@ pg_get_table_grants(Oid relationId)
List *defs = NIL; List *defs = NIL;
HeapTuple classTuple = NULL; HeapTuple classTuple = NULL;
Datum aclDatum = 0; Datum aclDatum = 0;
Acl *acl = NULL;
bool isNull = false; bool isNull = false;
int offtype = 0;
relation = relation_open(relationId, AccessShareLock); relation = relation_open(relationId, AccessShareLock);
relationName = generate_relation_name(relationId, NIL); relationName = generate_relation_name(relationId, NIL);
@ -619,7 +675,8 @@ pg_get_table_grants(Oid relationId)
{ {
int i = 0; int i = 0;
AclItem *aidat = NULL; AclItem *aidat = NULL;
Acl *acl = NULL;
int offtype = 0;
/* /*
* First revoke all default permissions, so we can start adding the * First revoke all default permissions, so we can start adding the

View File

@ -11,18 +11,27 @@
#ifndef CITUS_RULEUTILS_H #ifndef CITUS_RULEUTILS_H
#define CITUS_RULEUTILS_H #define CITUS_RULEUTILS_H
#include "postgres.h" /* IWYU pragma: keep */
#include "c.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#define CREATE_SEQUENCE_COMMAND \
"CREATE SEQUENCE IF NOT EXISTS %s INCREMENT BY " INT64_FORMAT " MINVALUE " \
INT64_FORMAT " MAXVALUE " INT64_FORMAT " START WITH " INT64_FORMAT " %sCYCLE"
/* Function declarations for version independent Citus ruleutils wrapper functions */ /* Function declarations for version independent Citus ruleutils wrapper functions */
extern char * pg_get_extensiondef_string(Oid tableRelationId); extern char * pg_get_extensiondef_string(Oid tableRelationId);
extern Oid get_extension_schema(Oid ext_oid);
extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern char * pg_get_tableschemadef_string(Oid tableRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId); extern List * pg_get_table_grants(Oid relationId);
extern Oid get_extension_schema(Oid ext_oid);
/* Function declarations for version dependent PostgreSQL ruleutils functions */ /* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer); extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -14,7 +14,10 @@
#ifndef MASTER_PROTOCOL_H #ifndef MASTER_PROTOCOL_H
#define MASTER_PROTOCOL_H #define MASTER_PROTOCOL_H
#include "postgres.h"
#include "c.h"
#include "fmgr.h" #include "fmgr.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -108,6 +111,7 @@ extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS); extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20; LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30 DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977" DEBUG: building index "pg_toast_16992_index" on table "pg_toast_16992"
o_custkey | total_order_count o_custkey | total_order_count
-----------+------------------- -----------+-------------------
1466 | 1 1466 | 1

View File

@ -21,6 +21,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -586,3 +586,36 @@ SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;
----------+------ ----------+------
(0 rows) (0 rows)
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('app_analytics_events', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
id
----
1
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
id
----
2
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
id | app_id | name
----+--------+------
3 | 103 | Mynt
(1 row)

View File

@ -23,12 +23,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
ERROR: cannot execute ALTER TABLE command involving partition column ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the table cannot be dropped in a transaction block -- verify that the table cannot be dropped in a transaction block
\set VERBOSITY terse
BEGIN; BEGIN;
DROP TABLE testtableddl; DROP TABLE testtableddl;
ERROR: DROP distributed table cannot run inside a transaction block ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
ROLLBACK; ROLLBACK;
\set VERBOSITY default
-- verify that the table can be dropped -- verify that the table can be dropped
DROP TABLE testtableddl; DROP TABLE testtableddl;
-- verify that the table can dropped even if shards exist -- verify that the table can dropped even if shards exist
@ -65,3 +65,54 @@ SELECT * FROM pg_dist_shard_placement;
-- check that the extension now can be dropped (and recreated) -- check that the extension now can be dropped (and recreated)
DROP EXTENSION citus; DROP EXTENSION citus;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('testserialtable', 2, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- should not be able to add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes
-- and we shouldn't be able to change a distributed sequence's owner
ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table
-- or create a sequence with a distributed owner
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot create sequences that specify a distributed table in their OWNED BY option
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot associate an existing sequence with a distributed table
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
--------+------------------------+----------+----------
public | testserialtable_id_seq | sequence | postgres
(1 row)
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;
-- verify owned sequence is dropped
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)

View File

@ -319,6 +319,19 @@ ORDER BY
LIMIT LIMIT
5; 5;
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index -- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append ( CREATE TABLE customer_worker_copy_append (
c_custkey integer , c_custkey integer ,

View File

@ -427,6 +427,22 @@ LIMIT
560137 | 57637 560137 | 57637
(5 rows) (5 rows)
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
ERROR: cannot copy to table with serial column from worker
HINT: Connect to the master node to COPY to tables which use serial column types.
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index -- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append ( CREATE TABLE customer_worker_copy_append (
c_custkey integer , c_custkey integer ,

View File

@ -26,6 +26,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;

View File

@ -401,3 +401,13 @@ DELETE FROM multiple_hash WHERE category = '1' RETURNING category;
-- check -- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data; SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data; SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash');
SELECT master_create_worker_shards('app_analytics_events', 4, 1);
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;

View File

@ -21,9 +21,11 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text;
ALTER TABLE testtableddl DROP COLUMN distributecol; ALTER TABLE testtableddl DROP COLUMN distributecol;
-- verify that the table cannot be dropped in a transaction block -- verify that the table cannot be dropped in a transaction block
\set VERBOSITY terse
BEGIN; BEGIN;
DROP TABLE testtableddl; DROP TABLE testtableddl;
ROLLBACK; ROLLBACK;
\set VERBOSITY default
-- verify that the table can be dropped -- verify that the table can be dropped
DROP TABLE testtableddl; DROP TABLE testtableddl;
@ -42,3 +44,36 @@ SELECT * FROM pg_dist_shard_placement;
-- check that the extension now can be dropped (and recreated) -- check that the extension now can be dropped (and recreated)
DROP EXTENSION citus; DROP EXTENSION citus;
CREATE EXTENSION citus; CREATE EXTENSION citus;
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');
SELECT master_create_worker_shards('testserialtable', 2, 1);
-- should not be able to add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
-- and we shouldn't be able to change a distributed sequence's owner
ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
-- or create a sequence with a distributed owner
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;
-- verify owned sequence is dropped
\c - - - :worker_1_port
\ds