From e80f1612a6766f457ecfafb4db3ae00cdc0bb03c Mon Sep 17 00:00:00 2001 From: "Eric B. Ridge" Date: Thu, 25 Aug 2016 13:22:30 -0600 Subject: [PATCH] Add syscols in queries; extend relnames in indexes To permit use with ZomboDB (https://github.com/zombodb/zombodb), two changes were necessary: 1. Permit use of `tableoid` system column in queries 2. Extend relation names appearing in index expressions The first is accomplished by simply changing the deparse logic to allow system columns in queries destined for distributed tables. The latter was slightly more complex, given that DDL extension currently occurs on workers. But since indexes cannot reference tables other than the one being indexed, it is safe to look for any relation reference ending in a '*' character and extend their penultimate segments with a shard id. This change also adds an error to prevent users from distributing any relations using the WITH (OIDS) feature, which is unsupported. --- .../commands/create_distributed_table.c | 11 ++++ .../distributed/relay/relay_event_utility.c | 66 ++++++++++++++++++- src/backend/distributed/utils/ruleutils_95.c | 5 ++ .../regress/expected/multi_create_shards.out | 7 ++ .../expected/multi_index_statements.out | 10 ++- .../regress/expected/multi_simple_queries.out | 16 +++++ src/test/regress/sql/multi_create_shards.sql | 7 ++ .../regress/sql/multi_index_statements.sql | 3 + src/test/regress/sql/multi_simple_queries.sql | 12 ++++ 9 files changed, 132 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 22402b9af..8ce82d0bb 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -75,6 +75,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Oid distributionMethodOid = PG_GETARG_OID(2); Relation distributedRelation = NULL; + TupleDesc relationDesc = NULL; char *distributedRelationName = NULL; char relationKind = '\0'; @@ -98,6 +99,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) * multiple backends manipulating this relation. */ distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock); + relationDesc = RelationGetDescr(distributedRelation); distributedRelationName = RelationGetRelationName(distributedRelation); EnsureTableOwner(distributedRelationId); @@ -113,6 +115,15 @@ master_create_distributed_table(PG_FUNCTION_ARGS) distributedRelationName))); } + /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ + if (relationDesc->tdhasoid) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation: %s", distributedRelationName), + errdetail("Distributed relations must not specify the WITH " + "(OIDS) option in their definitions."))); + } + /* verify target relation is either regular or foreign table */ relationKind = distributedRelation->rd_rel->relkind; if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 66dd80fa3..76b5f0177 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -30,6 +30,7 @@ #include "distributed/relay_utility.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" +#include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" #include "nodes/primnodes.h" @@ -41,14 +42,13 @@ #include "utils/palloc.h" #include "utils/relcache.h" - /* Local functions forward declarations */ static bool TypeAddIndexConstraint(const AlterTableCmd *command); static bool TypeDropIndexConstraint(const AlterTableCmd *command, const RangeVar *relation, uint64 shardId); static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId); static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName); - +static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId); /* * RelayEventExtendNames extends relation names in the given parse tree for @@ -312,6 +312,10 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) ereport(ERROR, (errmsg("cannot extend name for null index name"))); } + /* extend ColumnRef nodes in the IndexStmt with the shardId */ + UpdateWholeRowColumnReferencesWalker((Node *) indexStmt->indexParams, + &shardId); + /* prefix with schema name if it is not added already */ SetSchemaNameIfNotExist(relationSchemaName, schemaName); @@ -536,6 +540,64 @@ AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId) } +/* + * UpdateWholeRowColumnReferencesWalker extends ColumnRef nodes that end with A_Star + * with the given shardId. + * + * ColumnRefs that don't reference A_Star are not extended as catalog access isn't + * allowed here and we don't otherwise have enough context to disambiguate a + * field name that is identical to the table name. + */ +static bool +UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId) +{ + bool walkIsComplete = false; + + if (node == NULL) + { + return false; + } + + if (IsA(node, IndexElem)) + { + IndexElem *indexElem = (IndexElem *) node; + + walkIsComplete = raw_expression_tree_walker(indexElem->expr, + UpdateWholeRowColumnReferencesWalker, + shardId); + } + else if (IsA(node, ColumnRef)) + { + ColumnRef *columnRef = (ColumnRef *) node; + Node *lastField = llast(columnRef->fields); + + if (IsA(lastField, A_Star)) + { + /* + * ColumnRef fields list ends with an A_Star, so we can blindly + * extend the penultimate element with the shardId. + */ + int colrefFieldCount = list_length(columnRef->fields); + Value *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); + Assert(IsA(relnameValue, String)); + + AppendShardIdToName(&relnameValue->val.str, *shardId); + } + + /* might be more than one ColumnRef to visit */ + walkIsComplete = false; + } + else + { + walkIsComplete = raw_expression_tree_walker(node, + UpdateWholeRowColumnReferencesWalker, + shardId); + } + + return walkIsComplete; +} + + /* * SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set * it sets its value to given newSchemaName. diff --git a/src/backend/distributed/utils/ruleutils_95.c b/src/backend/distributed/utils/ruleutils_95.c index 5e5cf5e13..ffc8e79b0 100644 --- a/src/backend/distributed/utils/ruleutils_95.c +++ b/src/backend/distributed/utils/ruleutils_95.c @@ -3621,6 +3621,11 @@ get_variable(Var *var, int levelsup, bool istoplevel, deparse_context *context) attname = colinfo->colnames[attnum - 1]; Assert(attname != NULL); } + else if (GetRangeTblKind(rte) == CITUS_RTE_SHARD) + { + /* System column on a Citus shard */ + attname = get_relid_attribute_name(rte->relid, attnum); + } else { /* System column - name is fixed, get it from the catalog */ diff --git a/src/test/regress/expected/multi_create_shards.out b/src/test/regress/expected/multi_create_shards.out index cf3dfc3cd..deda65b43 100644 --- a/src/test/regress/expected/multi_create_shards.out +++ b/src/test/regress/expected/multi_create_shards.out @@ -40,6 +40,13 @@ CREATE TABLE table_to_distribute ( json_data json, test_type_data dummy_type ); +-- use the table WITH (OIDS) set +ALTER TABLE table_to_distribute SET WITH OIDS; +SELECT master_create_distributed_table('table_to_distribute', 'id', 'hash'); +ERROR: cannot distribute relation: table_to_distribute +DETAIL: Distributed relations must not specify the WITH (OIDS) option in their definitions. +-- revert WITH (OIDS) from above +ALTER TABLE table_to_distribute SET WITHOUT OIDS; -- use an index instead of table name SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); ERROR: cannot distribute relation: table_to_distribute_pkey diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 2a77a0e83..e1ed574ac 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -70,6 +70,7 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC); CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate) WHERE l_shipdate < '1995-01-01'; +CREATE INDEX lineitem_colref_index ON lineitem (record_ne(lineitem.*, NULL)); SET client_min_messages = ERROR; -- avoid version dependant warning about WAL CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey); CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a); @@ -85,19 +86,20 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b) public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a) public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) + public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date) public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(10 rows) +(11 rows) \c - - - :worker_1_port SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1); count ------- - 6 + 7 (1 row) SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash%'; @@ -161,13 +163,14 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t public | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON index_test_hash USING btree (a, b) public | index_test_range | index_test_range_index_a | | CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range USING btree (a) public | index_test_range | index_test_range_index_a_b | | CREATE UNIQUE INDEX index_test_range_index_a_b ON index_test_range USING btree (a, b) + public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON lineitem USING btree (record_ne(lineitem.*, NULL::record)) public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey) public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON lineitem USING btree (l_orderkey) public | lineitem | lineitem_partial_index | | CREATE INDEX lineitem_partial_index ON lineitem USING btree (l_shipdate) WHERE (l_shipdate < '01-01-1995'::date) public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON lineitem USING btree (l_partkey DESC) public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON lineitem USING btree (l_orderkey, l_linenumber) public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON lineitem USING btree (l_shipdate) -(10 rows) +(11 rows) -- -- DROP INDEX @@ -183,6 +186,7 @@ ERROR: dropping indexes concurrently on distributed tables is currently unsuppo DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; +DROP INDEX lineitem_colref_index; -- Verify that we handle if exists statements correctly DROP INDEX non_existent_index; ERROR: index "non_existent_index" does not exist diff --git a/src/test/regress/expected/multi_simple_queries.out b/src/test/regress/expected/multi_simple_queries.out index 8ea598c34..29af5caf4 100644 --- a/src/test/regress/expected/multi_simple_queries.out +++ b/src/test/regress/expected/multi_simple_queries.out @@ -502,4 +502,20 @@ DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 14 ERROR: cannot use real time executor with repartition jobs HINT: Set citus.task_executor_type to "task-tracker". +-- system columns from shard tables can be queried and retrieved +SELECT count(*) FROM ( + SELECT tableoid, ctid, cmin, cmax, xmin, xmax + FROM articles + WHERE tableoid IS NOT NULL OR + ctid IS NOT NULL OR + cmin IS NOT NULL OR + cmax IS NOT NULL OR + xmin IS NOT NULL OR + xmax IS NOT NULL +) x; + count +------- + 50 +(1 row) + SET client_min_messages to 'NOTICE'; diff --git a/src/test/regress/sql/multi_create_shards.sql b/src/test/regress/sql/multi_create_shards.sql index c5e38fa63..b49f0c0f4 100644 --- a/src/test/regress/sql/multi_create_shards.sql +++ b/src/test/regress/sql/multi_create_shards.sql @@ -52,6 +52,13 @@ CREATE TABLE table_to_distribute ( test_type_data dummy_type ); +-- use the table WITH (OIDS) set +ALTER TABLE table_to_distribute SET WITH OIDS; +SELECT master_create_distributed_table('table_to_distribute', 'id', 'hash'); + +-- revert WITH (OIDS) from above +ALTER TABLE table_to_distribute SET WITHOUT OIDS; + -- use an index instead of table name SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); diff --git a/src/test/regress/sql/multi_index_statements.sql b/src/test/regress/sql/multi_index_statements.sql index a71d5b82d..268fd90fb 100644 --- a/src/test/regress/sql/multi_index_statements.sql +++ b/src/test/regress/sql/multi_index_statements.sql @@ -43,6 +43,8 @@ CREATE INDEX lineitem_partkey_desc_index ON lineitem (l_partkey DESC); CREATE INDEX lineitem_partial_index ON lineitem (l_shipdate) WHERE l_shipdate < '1995-01-01'; +CREATE INDEX lineitem_colref_index ON lineitem (record_ne(lineitem.*, NULL)); + SET client_min_messages = ERROR; -- avoid version dependant warning about WAL CREATE INDEX lineitem_orderkey_hash_index ON lineitem USING hash (l_partkey); CREATE UNIQUE INDEX index_test_range_index_a ON index_test_range(a); @@ -97,6 +99,7 @@ DROP INDEX CONCURRENTLY lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; +DROP INDEX lineitem_colref_index; -- Verify that we handle if exists statements correctly diff --git a/src/test/regress/sql/multi_simple_queries.sql b/src/test/regress/sql/multi_simple_queries.sql index 7e3cf9f3c..98f73cec4 100644 --- a/src/test/regress/sql/multi_simple_queries.sql +++ b/src/test/regress/sql/multi_simple_queries.sql @@ -260,4 +260,16 @@ SELECT * FROM articles a, articles b WHERE a.id = b.id AND a.author_id = 1; +-- system columns from shard tables can be queried and retrieved +SELECT count(*) FROM ( + SELECT tableoid, ctid, cmin, cmax, xmin, xmax + FROM articles + WHERE tableoid IS NOT NULL OR + ctid IS NOT NULL OR + cmin IS NOT NULL OR + cmax IS NOT NULL OR + xmin IS NOT NULL OR + xmax IS NOT NULL +) x; + SET client_min_messages to 'NOTICE';