Add syscols in queries; extend relnames in indexes

To permit use with ZomboDB (https://github.com/zombodb/zombodb), two
changes were necessary:

  1. Permit use of `tableoid` system column in queries
  2. Extend relation names appearing in index expressions

The first is accomplished by simply changing the deparse logic to allow
system columns in queries destined for distributed tables. The latter
was slightly more complex, given that DDL extension currently occurs on
workers. But since indexes cannot reference tables other than the one
being indexed, it is safe to look for any relation reference ending in
a '*' character and extend their penultimate segments with a shard id.

This change also adds an error to prevent users from distributing any
relations using the WITH (OIDS) feature, which is unsupported.
pull/773/head
Eric B. Ridge 2016-08-25 13:22:30 -06:00 committed by Jason Petersen
parent 1f15d6b162
commit e80f1612a6
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
9 changed files with 132 additions and 5 deletions

View File

@ -75,6 +75,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
Oid distributionMethodOid = PG_GETARG_OID(2); Oid distributionMethodOid = PG_GETARG_OID(2);
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
TupleDesc relationDesc = NULL;
char *distributedRelationName = NULL; char *distributedRelationName = NULL;
char relationKind = '\0'; char relationKind = '\0';
@ -98,6 +99,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
* multiple backends manipulating this relation. * multiple backends manipulating this relation.
*/ */
distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock); distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock);
relationDesc = RelationGetDescr(distributedRelation);
distributedRelationName = RelationGetRelationName(distributedRelation); distributedRelationName = RelationGetRelationName(distributedRelation);
EnsureTableOwner(distributedRelationId); EnsureTableOwner(distributedRelationId);
@ -113,6 +115,15 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
distributedRelationName))); distributedRelationName)));
} }
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
if (relationDesc->tdhasoid)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", distributedRelationName),
errdetail("Distributed relations must not specify the WITH "
"(OIDS) option in their definitions.")));
}
/* verify target relation is either regular or foreign table */ /* verify target relation is either regular or foreign table */
relationKind = distributedRelation->rd_rel->relkind; relationKind = distributedRelation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)

View File

@ -30,6 +30,7 @@
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/nodes.h" #include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "nodes/primnodes.h" #include "nodes/primnodes.h"
@ -41,14 +42,13 @@
#include "utils/palloc.h" #include "utils/palloc.h"
#include "utils/relcache.h" #include "utils/relcache.h"
/* Local functions forward declarations */ /* Local functions forward declarations */
static bool TypeAddIndexConstraint(const AlterTableCmd *command); static bool TypeAddIndexConstraint(const AlterTableCmd *command);
static bool TypeDropIndexConstraint(const AlterTableCmd *command, static bool TypeDropIndexConstraint(const AlterTableCmd *command,
const RangeVar *relation, uint64 shardId); const RangeVar *relation, uint64 shardId);
static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId); static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId);
static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName); static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId);
/* /*
* RelayEventExtendNames extends relation names in the given parse tree for * RelayEventExtendNames extends relation names in the given parse tree for
@ -312,6 +312,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
ereport(ERROR, (errmsg("cannot extend name for null index name"))); ereport(ERROR, (errmsg("cannot extend name for null index name")));
} }
/* extend ColumnRef nodes in the IndexStmt with the shardId */
UpdateWholeRowColumnReferencesWalker((Node *) indexStmt->indexParams,
&shardId);
/* prefix with schema name if it is not added already */ /* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName); SetSchemaNameIfNotExist(relationSchemaName, schemaName);
@ -536,6 +540,64 @@ AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId)
} }
/*
* UpdateWholeRowColumnReferencesWalker extends ColumnRef nodes that end with A_Star
* with the given shardId.
*
* ColumnRefs that don't reference A_Star are not extended as catalog access isn't
* allowed here and we don't otherwise have enough context to disambiguate a
* field name that is identical to the table name.
*/
static bool
UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId)
{
bool walkIsComplete = false;
if (node == NULL)
{
return false;
}
if (IsA(node, IndexElem))
{
IndexElem *indexElem = (IndexElem *) node;
walkIsComplete = raw_expression_tree_walker(indexElem->expr,
UpdateWholeRowColumnReferencesWalker,
shardId);
}
else if (IsA(node, ColumnRef))
{
ColumnRef *columnRef = (ColumnRef *) node;
Node *lastField = llast(columnRef->fields);
if (IsA(lastField, A_Star))
{
/*
* ColumnRef fields list ends with an A_Star, so we can blindly
* extend the penultimate element with the shardId.
*/
int colrefFieldCount = list_length(columnRef->fields);
Value *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2);
Assert(IsA(relnameValue, String));
AppendShardIdToName(&relnameValue->val.str, *shardId);
}
/* might be more than one ColumnRef to visit */
walkIsComplete = false;
}
else
{
walkIsComplete = raw_expression_tree_walker(node,
UpdateWholeRowColumnReferencesWalker,
shardId);
}
return walkIsComplete;
}
/* /*
* SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set * SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set
* it sets its value to given newSchemaName. * it sets its value to given newSchemaName.

View File

@ -3621,6 +3621,11 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context)
attname = colinfo->colnames[attnum - 1]; attname = colinfo->colnames[attnum - 1];
Assert(attname != NULL); Assert(attname != NULL);
} }
else if (GetRangeTblKind(rte) == CITUS_RTE_SHARD)
{
/* System column on a Citus shard */
attname = get_relid_attribute_name(rte->relid, attnum);
}
else else
{ {
/* System column - name is fixed, get it from the catalog */ /* System column - name is fixed, get it from the catalog */

View File

@ -40,6 +40,13 @@ CREATE TABLE table_to_distribute (
json_data json, json_data json,
test_type_data dummy_type test_type_data dummy_type
); );
-- use the table WITH (OIDS) set
ALTER TABLE table_to_distribute SET WITH OIDS;
SELECT master_create_distributed_table('table_to_distribute', 'id', 'hash');
ERROR: cannot distribute relation: table_to_distribute
DETAIL: Distributed relations must not specify the WITH (OIDS) option in their definitions.
-- revert WITH (OIDS) from above
ALTER TABLE table_to_distribute SET WITHOUT OIDS;
-- use an index instead of table name -- use an index instead of table name
SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash');
ERROR: cannot distribute relation: table_to_distribute_pkey ERROR: cannot distribute relation: table_to_distribute_pkey

View File

@ -70,6 +70,7 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC); CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC);
CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate) CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate)
WHERE l_shipdate < '1995-01-01'; WHERE l_shipdate < '1995-01-01';
CREATE INDEX lineitem_colref_index ON lineitem (record_ne(lineitem.*, NULL));
SET client_min_messages = ERROR; -- avoid version dependant warning about WAL SET client_min_messages = ERROR; -- avoid version dependant warning about WAL
CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey); CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey);
CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a); CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a);
@ -85,19 +86,20 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b) public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b)
public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a) public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a)
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date) public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date)
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
(10 rows) (11 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1); SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1);
count count
------- -------
6 7
(1 row) (1 row)
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%'; SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%';
@ -161,13 +163,14 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b) public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b)
public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a) public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a)
public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b)
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record))
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey)
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey)
public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date) public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date)
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC)
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber)
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate)
(10 rows) (11 rows)
-- --
-- DROP INDEX -- DROP INDEX
@ -183,6 +186,7 @@ ERROR: dropping indexes concurrently on distributed tables is currently unsuppo
DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index;
DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partkey_desc_index;
DROP INDEX lineitem_partial_index; DROP INDEX lineitem_partial_index;
DROP INDEX lineitem_colref_index;
-- Verify that we handle if exists statements correctly -- Verify that we handle if exists statements correctly
DROP INDEX non_existent_index; DROP INDEX non_existent_index;
ERROR: index "non_existent_index" does not exist ERROR: index "non_existent_index" does not exist

View File

@ -502,4 +502,20 @@ DEBUG: pruning merge fetch taskId 11
DETAIL: Creating dependency on merge taskId 14 DETAIL: Creating dependency on merge taskId 14
ERROR: cannot use real time executor with repartition jobs ERROR: cannot use real time executor with repartition jobs
HINT: Set citus.task_executor_type to "task-tracker". HINT: Set citus.task_executor_type to "task-tracker".
-- system columns from shard tables can be queried and retrieved
SELECT count(*) FROM (
SELECT tableoid, ctid, cmin, cmax, xmin, xmax
FROM articles
WHERE tableoid IS NOT NULL OR
ctid IS NOT NULL OR
cmin IS NOT NULL OR
cmax IS NOT NULL OR
xmin IS NOT NULL OR
xmax IS NOT NULL
) x;
count
-------
50
(1 row)
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';

View File

@ -52,6 +52,13 @@ CREATE TABLE table_to_distribute (
test_type_data dummy_type test_type_data dummy_type
); );
-- use the table WITH (OIDS) set
ALTER TABLE table_to_distribute SET WITH OIDS;
SELECT master_create_distributed_table('table_to_distribute', 'id', 'hash');
-- revert WITH (OIDS) from above
ALTER TABLE table_to_distribute SET WITHOUT OIDS;
-- use an index instead of table name -- use an index instead of table name
SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash');

View File

@ -43,6 +43,8 @@ CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC);
CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate) CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate)
WHERE l_shipdate < '1995-01-01'; WHERE l_shipdate < '1995-01-01';
CREATE INDEX lineitem_colref_index ON lineitem (record_ne(lineitem.*, NULL));
SET client_min_messages = ERROR; -- avoid version dependant warning about WAL SET client_min_messages = ERROR; -- avoid version dependant warning about WAL
CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey); CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey);
CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a); CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a);
@ -97,6 +99,7 @@ DROP INDEX CONCURRENTLY lineitem_orderkey_index;
DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index;
DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partkey_desc_index;
DROP INDEX lineitem_partial_index; DROP INDEX lineitem_partial_index;
DROP INDEX lineitem_colref_index;
-- Verify that we handle if exists statements correctly -- Verify that we handle if exists statements correctly

View File

@ -260,4 +260,16 @@ SELECT *
FROM articles a, articles b FROM articles a, articles b
WHERE a.id = b.id AND a.author_id = 1; WHERE a.id = b.id AND a.author_id = 1;
-- system columns from shard tables can be queried and retrieved
SELECT count(*) FROM (
SELECT tableoid, ctid, cmin, cmax, xmin, xmax
FROM articles
WHERE tableoid IS NOT NULL OR
ctid IS NOT NULL OR
cmin IS NOT NULL OR
cmax IS NOT NULL OR
xmin IS NOT NULL OR
xmax IS NOT NULL
) x;
SET client_min_messages to 'NOTICE'; SET client_min_messages to 'NOTICE';