Merge pull request #673 from citusdata/feature/improve_sequence_support

Support SERIAL/BIGSERIAL non-partition columns

cr: @anarazel
pull/675/head
Jason Petersen 2016-07-29 00:11:33 -06:00 committed by GitHub
commit f1ed052c2b
20 changed files with 679 additions and 112 deletions

View File

@ -12,3 +12,4 @@
# ignore latest install file
citus--5.0.sql
citus--5.?-*.sql
!citus--5.?-*--5.?-*.sql

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -47,6 +47,8 @@ $(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql
cat $^ > $@
$(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql
cat $^ > $@
$(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,72 @@
CREATE FUNCTION pg_catalog.master_drop_sequences(sequence_names text[],
node_name text,
node_port bigint)
RETURNS bool
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_drop_sequences$$;
COMMENT ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint)
IS 'drop specified sequences from a node';
REVOKE ALL ON FUNCTION pg_catalog.master_drop_sequences(text[], text, bigint) FROM PUBLIC;
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
node_names text[] := '{}';
node_ports bigint[] := '{}';
node_name text;
node_port bigint;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
-- Must accumulate set of affected nodes before deleting placements, as
-- master_drop_all_shards will erase their rows, making it impossible for
-- us to know where to drop sequences (which must be dropped after shards,
-- since they have default value expressions which depend on sequences).
SELECT array_agg(sp.nodename), array_agg(sp.nodeport)
INTO node_names, node_ports
FROM pg_event_trigger_dropped_objects() AS dobj,
pg_dist_shard AS s,
pg_dist_shard_placement AS sp
WHERE dobj.object_type IN ('table', 'foreign table')
AND dobj.objid = s.logicalrelid
AND s.shardid = sp.shardid;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF v_obj.object_type NOT IN ('table', 'foreign table') THEN
CONTINUE;
END IF;
-- nothing to do if not a distributed table
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN
CONTINUE;
END IF;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- delete partition entry
DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
FOR node_name, node_port IN
SELECT DISTINCT name, port
FROM unnest(node_names, node_ports) AS nodes(name, port)
LOOP
PERFORM master_drop_sequences(sequence_names, node_name, node_port);
END LOOP;
END;
$cdbdt$;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.1-7'
default_version = '5.1-8'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -7,41 +7,68 @@
*/
#include "postgres.h"
#include "c.h"
#include "libpq-fe.h"
#include "miscadmin.h"
#include "port.h"
#include <string.h>
#include "access/attnum.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/tupdesc.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_class.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_utility.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "foreign/foreign.h"
#include "executor/executor.h"
#include "parser/parser.h"
#include "parser/parse_utilcmd.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/bitmapset.h"
#include "nodes/nodes.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "storage/lock.h"
#include "tcop/dest.h"
#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
@ -81,6 +108,9 @@ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSch
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */
@ -158,6 +188,16 @@ multi_ProcessUtility(Node *parsetree,
}
}
if (IsA(parsetree, CreateSeqStmt))
{
ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
}
if (IsA(parsetree, AlterSeqStmt))
{
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
}
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
if (EnableDDLPropagation)
{
@ -848,6 +888,33 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{
case AT_AddColumn:
{
if (IsA(command->def, ColumnDef))
{
ColumnDef *column = (ColumnDef *) command->def;
/*
* Check for SERIAL pseudo-types. The structure of this
* check is copied from transformColumnDefinition.
*/
if (column->typeName && list_length(column->typeName->names) == 1 &&
!column->typeName->pct_type)
{
char *typeName = strVal(linitial(column->typeName->names));
if (strcmp(typeName, "smallserial") == 0 ||
strcmp(typeName, "serial2") == 0 ||
strcmp(typeName, "serial") == 0 ||
strcmp(typeName, "serial4") == 0 ||
strcmp(typeName, "bigserial") == 0 ||
strcmp(typeName, "serial8") == 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot execute ADD COLUMN commands "
"involving serial pseudotypes")));
}
}
}
break;
}
@ -901,6 +968,126 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
}
/*
* ErrorIfUnsupportedSeqStmt errors out if the provided create sequence
* statement specifies a distributed table in its OWNED BY clause.
*/
static void
ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt)
{
Oid ownedByTableId = InvalidOid;
/* create is easy: just prohibit any distributed OWNED BY */
if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId))
{
if (IsDistributedTable(ownedByTableId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create sequences that specify a distributed "
"table in their OWNED BY option"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* ErrorIfDistributedAlterSeqOwnedBy errors out if the provided alter sequence
* statement attempts to change the owned by property of a distributed sequence
* or attempt to change a local sequence to be owned by a distributed table.
*/
static void
ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
{
Oid sequenceId = RangeVarGetRelid(alterSeqStmt->sequence, AccessShareLock,
alterSeqStmt->missing_ok);
Oid ownedByTableId = InvalidOid;
Oid newOwnedByTableId = InvalidOid;
int32 ownedByColumnId = 0;
bool hasDistributedOwner = false;
/* alter statement referenced nonexistent sequence; return */
if (sequenceId == InvalidOid)
{
return;
}
/* see whether the sequences is already owned by a distributed table */
if (sequenceIsOwned(sequenceId, &ownedByTableId, &ownedByColumnId))
{
hasDistributedOwner = IsDistributedTable(ownedByTableId);
}
if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId))
{
/* if a distributed sequence tries to change owner, error */
if (hasDistributedOwner && ownedByTableId != newOwnedByTableId)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter OWNED BY option of a sequence "
"already owned by a distributed table")));
}
else if (!hasDistributedOwner && IsDistributedTable(newOwnedByTableId))
{
/* and don't let local sequences get a distributed OWNED BY */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot associate an existing sequence with a "
"distributed table"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
* identifier for the specified table is placed in the Oid out parameter before
* returning true. Returns false if no such option is found. Still returns true
* for OWNED BY NONE, but leaves the out paramter set to InvalidOid.
*/
static bool
OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId)
{
ListCell *optionCell = NULL;
foreach(optionCell, optionList)
{
DefElem *defElem = (DefElem *) lfirst(optionCell);
if (strcmp(defElem->defname, "owned_by") == 0)
{
List *ownedByNames = defGetQualifiedName(defElem);
int nameCount = list_length(ownedByNames);
/* if only one name is present, this is OWNED BY NONE */
if (nameCount == 1)
{
*ownedByTableId = InvalidOid;
return true;
}
else
{
/*
* Otherwise, we have a list of schema, table, column, which we
* need to truncate to simply the schema and table to determine
* the relevant relation identifier.
*/
List *relNameList = list_truncate(list_copy(ownedByNames), nameCount - 1);
RangeVar *rangeVar = makeRangeVarFromNameList(relNameList);
bool failOK = true;
*ownedByTableId = RangeVarGetRelid(rangeVar, NoLock, failOK);
return true;
}
}
}
return false;
}
/*
* ErrorIfDistributedRenameStmt errors out if the corresponding rename statement
* operates on a distributed table or its objects.
@ -1002,6 +1189,13 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
{
applyDDLCommand = true;
}
else if ((IsA(ddlCommandNode, CreateSeqStmt)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy to table with serial column from worker"),
errhint("Connect to the master node to COPY to tables which "
"use serial column types.")));
}
/* run only a selected set of DDL commands */
if (applyDDLCommand)
@ -1449,7 +1643,6 @@ ReplicateGrantStmt(Node *parsetree)
StringInfoData ddlString;
ListCell *granteeCell = NULL;
ListCell *objectCell = NULL;
ListCell *privilegeCell = NULL;
bool isFirst = true;
initStringInfo(&privsString);
@ -1474,6 +1667,8 @@ ReplicateGrantStmt(Node *parsetree)
}
else
{
ListCell *privilegeCell = NULL;
isFirst = true;
foreach(privilegeCell, grantStmt->privileges)
{

View File

@ -13,34 +13,43 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "c.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "port.h"
#include <stddef.h>
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "commands/dbcommands.h"
#include "commands/event_trigger.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "optimizer/predtest.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
#include "nodes/makefuncs.h"
#include "storage/lock.h"
#include "tcop/tcopprot.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/inval.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/lsyscache.h"
@ -59,6 +68,7 @@ static bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort,
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_apply_delete_command);
PG_FUNCTION_INFO_V1(master_drop_all_shards);
PG_FUNCTION_INFO_V1(master_drop_sequences);
/*
@ -229,6 +239,61 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
}
/*
* master_drop_sequences attempts to drop a list of sequences on a specified
* node. The "IF EXISTS" clause is used to permit dropping sequences even if
* they may not exist. Returns true on success, false on failure.
*/
Datum
master_drop_sequences(PG_FUNCTION_ARGS)
{
ArrayType *sequenceNamesArray = PG_GETARG_ARRAYTYPE_P(0);
text *nodeText = PG_GETARG_TEXT_P(1);
int64 nodePort = PG_GETARG_INT64(2);
bool dropSuccessful = false;
char *nodeName = TextDatumGetCString(nodeText);
ArrayIterator sequenceIterator = NULL;
Datum sequenceText = 0;
bool isNull = false;
StringInfo dropSeqCommand = makeStringInfo();
/* iterate over sequence names to build single command to DROP them all */
sequenceIterator = array_create_iterator(sequenceNamesArray, 0, NULL);
while (array_iterate(sequenceIterator, &sequenceText, &isNull))
{
if (isNull)
{
ereport(ERROR, (errmsg("unexpected NULL sequence name"),
errcode(ERRCODE_INVALID_PARAMETER_VALUE)));
}
/* append command portion if we haven't added any sequence names yet */
if (dropSeqCommand->len == 0)
{
appendStringInfoString(dropSeqCommand, "DROP SEQUENCE IF EXISTS");
}
else
{
/* otherwise, add a comma to separate subsequent sequence names */
appendStringInfoChar(dropSeqCommand, ',');
}
appendStringInfo(dropSeqCommand, " %s", TextDatumGetCString(sequenceText));
}
dropSuccessful = ExecuteRemoteCommand(nodeName, nodePort, dropSeqCommand);
if (!dropSuccessful)
{
ereport(WARNING, (errmsg("could not delete sequences from node \"%s:" INT64_FORMAT
"\"", nodeName, nodePort)));
}
PG_RETURN_BOOL(dropSuccessful);
}
/*
* DropShards drops all given shards in a relation. The id, name and schema
* for the relation are explicitly provided, since this function may be

View File

@ -12,15 +12,25 @@
*/
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include <string.h>
#include "access/attnum.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "access/skey.h"
#include "access/stratnum.h"
#include "access/tupdesc.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "catalog/pg_type.h"
#include "commands/sequence.h"
@ -28,21 +38,20 @@
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_manager.h"
#include "foreign/foreign.h"
#include "libpq/ip.h"
#include "libpq/libpq-be.h"
#include "lib/stringinfo.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "storage/lock.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/palloc.h"
#include "utils/relcache.h"
#include "utils/ruleutils.h"
#include "utils/syscache.h"
#include "utils/tqual.h"
/* Shard related configuration */
@ -544,6 +553,8 @@ GetTableDDLEvents(Oid relationId)
{
List *tableDDLEventList = NIL;
char tableType = 0;
List *sequenceIdlist = getOwnedSequences(relationId);
ListCell *sequenceIdCell;
char *tableSchemaDef = NULL;
char *tableColumnOptionsDef = NULL;
char *schemaName = NULL;
@ -590,6 +601,15 @@ GetTableDDLEvents(Oid relationId)
tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data);
}
/* create sequences if needed */
foreach(sequenceIdCell, sequenceIdlist)
{
Oid sequenceRelid = lfirst_oid(sequenceIdCell);
char *sequenceDef = pg_get_sequencedef_string(sequenceRelid);
tableDDLEventList = lappend(tableDDLEventList, sequenceDef);
}
/* fetch table schema and column option definitions */
tableSchemaDef = pg_get_tableschemadef_string(relationId);
tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);

View File

@ -13,24 +13,33 @@
*/
#include "postgres.h"
#include "c.h"
#include <stdio.h>
#include <string.h>
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/htup.h"
#include "access/skey.h"
#include "access/xact.h"
#include "access/stratnum.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "commands/defrem.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "parser/parse_utilcmd.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "nodes/value.h"
#include "storage/lock.h"
#include "tcop/utility.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/tqual.h"
#include "utils/palloc.h"
#include "utils/relcache.h"
/* Local functions forward declarations */
@ -43,43 +52,31 @@ static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
/*
* RelayEventExtendNames extends relation names in the given parse tree for
* certain utility commands. The function more specifically extends table,
* sequence, and index names in the parse tree by appending the given shardId;
* thereby avoiding name collisions in the database among sharded tables. This
* function has the side effect of extending relation names in the parse tree.
* certain utility commands. The function more specifically extends table and
* index names in the parse tree by appending the given shardId; thereby
* avoiding name collisions in the database among sharded tables. This function
* has the side effect of extending relation names in the parse tree.
*/
void
RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{
/* we don't extend names in extension or schema commands */
NodeTag nodeType = nodeTag(parseTree);
if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt)
if (nodeType == T_CreateExtensionStmt || nodeType == T_CreateSchemaStmt ||
nodeType == T_CreateSeqStmt || nodeType == T_AlterSeqStmt)
{
return;
}
switch (nodeType)
{
case T_AlterSeqStmt:
{
AlterSeqStmt *alterSeqStmt = (AlterSeqStmt *) parseTree;
char **sequenceName = &(alterSeqStmt->sequence->relname);
char **sequenceSchemaName = &(alterSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
}
case T_AlterTableStmt:
{
/*
* We append shardId to the very end of table, sequence and index
* names to avoid name collisions. We usually do not touch
* constraint names, except for cases where they refer to index
* names. In those cases, we also append to constraint names.
* We append shardId to the very end of table and index names to
* avoid name collisions. We usually do not touch constraint names,
* except for cases where they refer to index names. In such cases,
* we also append to constraint names.
*/
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
@ -144,19 +141,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
break;
}
case T_CreateSeqStmt:
{
CreateSeqStmt *createSeqStmt = (CreateSeqStmt *) parseTree;
char **sequenceName = &(createSeqStmt->sequence->relname);
char **sequenceSchemaName = &(createSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
}
case T_CreateForeignServerStmt:
{
CreateForeignServerStmt *serverStmt = (CreateForeignServerStmt *) parseTree;
@ -198,9 +182,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
DropStmt *dropStmt = (DropStmt *) parseTree;
ObjectType objectType = dropStmt->removeType;
if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE ||
objectType == OBJECT_INDEX || objectType == OBJECT_FOREIGN_TABLE ||
objectType == OBJECT_FOREIGN_SERVER)
if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX ||
objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER)
{
List *relationNameList = NULL;
int relationNameListLength = 0;
@ -216,11 +199,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
}
/*
* We now need to extend a single relation, sequence or index
* name. To be able to do this extension, we need to extract the
* names' addresses from the value objects they are stored in.
* Otherwise, the repalloc called in AppendShardIdToName() will
* not have the correct memory address for the name.
* We now need to extend a single relation or index name. To be
* able to do this extension, we need to extract the names'
* addresses from the value objects they are stored in. Other-
* wise, the repalloc called in AppendShardIdToName() will not
* have the correct memory address for the name.
*/
relationNameList = (List *) linitial(dropStmt->objects);
@ -370,8 +353,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
RenameStmt *renameStmt = (RenameStmt *) parseTree;
ObjectType objectType = renameStmt->renameType;
if (objectType == OBJECT_TABLE || objectType == OBJECT_SEQUENCE ||
objectType == OBJECT_INDEX)
if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX)
{
char **oldRelationName = &(renameStmt->relation->relname);
char **newRelationName = &(renameStmt->newname);

View File

@ -8,48 +8,51 @@
*/
#include "postgres.h"
#include "c.h"
#include "miscadmin.h"
#include <unistd.h>
#include <fcntl.h>
#include <stddef.h>
#include "access/attnum.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/skey.h"
#include "access/stratnum.h"
#include "access/sysattr.h"
#include "access/tupdesc.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_class.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_foreign_data_wrapper.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/citus_ruleutils.h"
#include "catalog/pg_index.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "foreign/foreign.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/tlist.h"
#include "parser/keywords.h"
#include "parser/parse_agg.h"
#include "parser/parse_func.h"
#include "parser/parse_oper.h"
#include "parser/parser.h"
#include "parser/parsetree.h"
#include "rewrite/rewriteHandler.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "storage/lock.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/ruleutils.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "utils/xml.h"
static void AppendOptionListToString(StringInfo stringData, List *options);
static const char * convert_aclright_to_string(int aclright);
@ -206,6 +209,59 @@ AppendOptionListToString(StringInfo stringBuffer, List *optionList)
}
/*
* pg_get_sequencedef_string returns the definition of a given sequence. This
* definition includes explicit values for all CREATE SEQUENCE options.
*/
char *
pg_get_sequencedef_string(Oid sequenceRelationId)
{
char *qualifiedSequenceName = NULL;
char *sequenceDef = NULL;
Form_pg_sequence pgSequenceForm = NULL;
Relation sequenceRel = NULL;
AclResult permissionCheck = ACLCHECK_NO_PRIV;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
/* open and lock sequence */
sequenceRel = heap_open(sequenceRelationId, AccessShareLock);
/* check permissions to read sequence attributes */
permissionCheck = pg_class_aclcheck(sequenceRelationId, GetUserId(),
ACL_SELECT | ACL_USAGE);
if (permissionCheck != ACLCHECK_OK)
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied for sequence %s",
RelationGetRelationName(sequenceRel))));
}
/* retrieve attributes from first tuple */
scanDescriptor = systable_beginscan(sequenceRel, InvalidOid, false, NULL, 0, NULL);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find specified sequence")));
}
pgSequenceForm = (Form_pg_sequence) GETSTRUCT(heapTuple);
/* build our DDL command */
qualifiedSequenceName = generate_relation_name(sequenceRelationId, NIL);
sequenceDef = psprintf(CREATE_SEQUENCE_COMMAND, qualifiedSequenceName,
pgSequenceForm->increment_by, pgSequenceForm->min_value,
pgSequenceForm->max_value, pgSequenceForm->cache_value,
pgSequenceForm->is_cycled ? "" : "NO ");
systable_endscan(scanDescriptor);
heap_close(sequenceRel, AccessShareLock);
return sequenceDef;
}
/*
* pg_get_tableschemadef_string returns the definition of a given table. This
* definition includes table's schema, default column values, not null and check
@ -266,11 +322,12 @@ pg_get_tableschemadef_string(Oid tableRelationId)
for (attributeIndex = 0; attributeIndex < tupleDescriptor->natts; attributeIndex++)
{
Form_pg_attribute attributeForm = tupleDescriptor->attrs[attributeIndex];
const char *attributeName = NULL;
const char *attributeTypeName = NULL;
if (!attributeForm->attisdropped && attributeForm->attinhcount == 0)
{
const char *attributeName = NULL;
const char *attributeTypeName = NULL;
if (firstAttributePrinted)
{
appendStringInfoString(&buffer, ", ");
@ -399,7 +456,6 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
char relationKind = 0;
TupleDesc tupleDescriptor = NULL;
AttrNumber attributeIndex = 0;
char *columnOptionStatement = NULL;
List *columnOptionList = NIL;
ListCell *columnOptionCell = NULL;
bool firstOptionPrinted = false;
@ -511,6 +567,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
*/
foreach(columnOptionCell, columnOptionList)
{
char *columnOptionStatement = NULL;
if (!firstOptionPrinted)
{
initStringInfo(&buffer);
@ -591,9 +649,7 @@ pg_get_table_grants(Oid relationId)
List *defs = NIL;
HeapTuple classTuple = NULL;
Datum aclDatum = 0;
Acl *acl = NULL;
bool isNull = false;
int offtype = 0;
relation = relation_open(relationId, AccessShareLock);
relationName = generate_relation_name(relationId, NIL);
@ -619,7 +675,8 @@ pg_get_table_grants(Oid relationId)
{
int i = 0;
AclItem *aidat = NULL;
Acl *acl = NULL;
int offtype = 0;
/*
* First revoke all default permissions, so we can start adding the

View File

@ -11,18 +11,27 @@
#ifndef CITUS_RULEUTILS_H
#define CITUS_RULEUTILS_H
#include "postgres.h" /* IWYU pragma: keep */
#include "c.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#define CREATE_SEQUENCE_COMMAND \
"CREATE SEQUENCE IF NOT EXISTS %s INCREMENT BY " INT64_FORMAT " MINVALUE " \
INT64_FORMAT " MAXVALUE " INT64_FORMAT " START WITH " INT64_FORMAT " %sCYCLE"
/* Function declarations for version independent Citus ruleutils wrapper functions */
extern char * pg_get_extensiondef_string(Oid tableRelationId);
extern Oid get_extension_schema(Oid ext_oid);
extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern char * pg_get_tableschemadef_string(Oid tableRelationId);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId);
extern Oid get_extension_schema(Oid ext_oid);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -14,7 +14,10 @@
#ifndef MASTER_PROTOCOL_H
#define MASTER_PROTOCOL_H
#include "postgres.h"
#include "c.h"
#include "fmgr.h"
#include "nodes/pg_list.h"
@ -108,6 +111,7 @@ extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977"
DEBUG: building index "pg_toast_16992_index" on table "pg_toast_16992"
o_custkey | total_order_count
-----------+-------------------
1466 | 1

View File

@ -21,6 +21,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -586,3 +586,36 @@ SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;
----------+------
(0 rows)
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('app_analytics_events', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
id
----
1
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
id
----
2
(1 row)
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;
id | app_id | name
----+--------+------
3 | 103 | Mynt
(1 row)

View File

@ -23,12 +23,12 @@ ERROR: cannot execute ALTER TABLE command involving partition column
ALTER TABLE testtableddl DROP COLUMN distributecol;
ERROR: cannot execute ALTER TABLE command involving partition column
-- verify that the table cannot be dropped in a transaction block
\set VERBOSITY terse
BEGIN;
DROP TABLE testtableddl;
ERROR: DROP distributed table cannot run inside a transaction block
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 15 at PERFORM
ROLLBACK;
\set VERBOSITY default
-- verify that the table can be dropped
DROP TABLE testtableddl;
-- verify that the table can dropped even if shards exist
@ -65,3 +65,54 @@ SELECT * FROM pg_dist_shard_placement;
-- check that the extension now can be dropped (and recreated)
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('testserialtable', 2, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- should not be able to add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes
-- and we shouldn't be able to change a distributed sequence's owner
ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
ERROR: cannot alter OWNED BY option of a sequence already owned by a distributed table
-- or create a sequence with a distributed owner
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot create sequences that specify a distributed table in their OWNED BY option
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
ERROR: cannot associate an existing sequence with a distributed table
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
--------+------------------------+----------+----------
public | testserialtable_id_seq | sequence | postgres
(1 row)
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;
-- verify owned sequence is dropped
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)

View File

@ -319,6 +319,19 @@ ORDER BY
LIMIT
5;
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,

View File

@ -427,6 +427,22 @@ LIMIT
560137 | 57637
(5 rows)
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
ERROR: cannot copy to table with serial column from worker
HINT: Connect to the master node to COPY to tables which use serial column types.
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,

View File

@ -26,6 +26,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -401,3 +401,13 @@ DELETE FROM multiple_hash WHERE category = '1' RETURNING category;
-- check
SELECT * FROM multiple_hash WHERE category = '1' ORDER BY category, data;
SELECT * FROM multiple_hash WHERE category = '2' ORDER BY category, data;
-- verify interaction of default values, SERIAL, and RETURNING
\set QUIET on
CREATE TABLE app_analytics_events (id serial, app_id integer, name text);
SELECT master_create_distributed_table('app_analytics_events', 'app_id', 'hash');
SELECT master_create_worker_shards('app_analytics_events', 4, 1);
INSERT INTO app_analytics_events VALUES (DEFAULT, 101, 'Fauxkemon Geaux') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (102, 'Wayz') RETURNING id;
INSERT INTO app_analytics_events (app_id, name) VALUES (103, 'Mynt') RETURNING *;

View File

@ -21,9 +21,11 @@ ALTER TABLE testtableddl ALTER COLUMN distributecol TYPE text;
ALTER TABLE testtableddl DROP COLUMN distributecol;
-- verify that the table cannot be dropped in a transaction block
\set VERBOSITY terse
BEGIN;
DROP TABLE testtableddl;
ROLLBACK;
\set VERBOSITY default
-- verify that the table can be dropped
DROP TABLE testtableddl;
@ -42,3 +44,36 @@ SELECT * FROM pg_dist_shard_placement;
-- check that the extension now can be dropped (and recreated)
DROP EXTENSION citus;
CREATE EXTENSION citus;
-- create a table with a SERIAL column
CREATE TABLE testserialtable(id serial, group_id integer);
SELECT master_create_distributed_table('testserialtable', 'group_id', 'hash');
SELECT master_create_worker_shards('testserialtable', 2, 1);
-- should not be able to add additional serial columns
ALTER TABLE testserialtable ADD COLUMN other_id serial;
-- and we shouldn't be able to change a distributed sequence's owner
ALTER SEQUENCE testserialtable_id_seq OWNED BY NONE;
-- or create a sequence with a distributed owner
CREATE SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- or even change a manual sequence to be owned by a distributed table
CREATE SEQUENCE standalone_sequence;
ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
-- drop distributed table
\c - - - :master_port
DROP TABLE testserialtable;
-- verify owned sequence is dropped
\c - - - :worker_1_port
\ds