From a225652c29848079d5d51306bc0c244e0705fb5d Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 12 Oct 2020 13:35:05 +0300 Subject: [PATCH] Handle INSERT/UPDATE queries with comp.type field access properly Planner combines multiple fields accesses to same composite type column in a single FieldStore object and "processIndirection" function that we use to deparse such queries does not know how to handle such FieldStore objects.. For that, we split field references in FiedlStore objects into individual ones. --- .../distributed/deparser/citus_ruleutils.c | 83 +++++++++++++++++++ .../distributed/deparser/ruleutils_11.c | 6 +- .../distributed/deparser/ruleutils_12.c | 6 +- .../distributed/deparser/ruleutils_13.c | 6 +- src/include/distributed/citus_ruleutils.h | 1 + .../regress/expected/distributed_types.out | 33 ++++++++ src/test/regress/sql/distributed_types.sql | 18 ++++ 7 files changed, 150 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 2dd90a722..b4f66b8d2 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -74,6 +74,9 @@ static void AppendStorageParametersToString(StringInfo stringBuffer, static const char * convert_aclright_to_string(int aclright); static void simple_quote_literal(StringInfo buf, const char *val); static char * flatten_reloptions(Oid relid); +static List * ExtendFieldStoreTargetEntry(TargetEntry *targetEntry); +static TargetEntry * CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry, + int subFieldIndex); /* * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to @@ -1383,3 +1386,83 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier) } } } + + +/* + * ExtendFieldStoreTargetEntries takes a list of target entries and retuns a + * new list by extending each target entry pointing to a FieldStore by + * executing ExtendFieldStoreTargetEntry. + * + * This is required for deparsing INSERT/UPDATE queries for remote execution + * as postgres planner combines field accesses to same column into in a single + * FieldStore object but processIndirection does not know how to handle such + * FieldStore objects. + */ +List * +ExtendFieldStoreTargetEntries(List *targetList) +{ + List *extendedTargetList = NIL; + + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, targetList) + { + Node *expr = (Node *) targetEntry->expr; + if (IsA(expr, FieldStore)) + { + List *fieldStoreTargetEntries = ExtendFieldStoreTargetEntry(targetEntry); + extendedTargetList = list_concat(extendedTargetList, + fieldStoreTargetEntries); + } + else + { + extendedTargetList = lappend(extendedTargetList, targetEntry); + } + } + + return extendedTargetList; +} + + +/* + * ExtendFieldStoreTargetEntry takes a target entry pointing to a FieldStore + * object and returns a list of TargetEntry's by seperating each element in + * newvals and fieldnums lists into separate TargetEntry objects. + */ +static List * +ExtendFieldStoreTargetEntry(TargetEntry *targetEntry) +{ + Assert(IsA(targetEntry->expr, FieldStore)); + + List *extendedFieldStoreList = NIL; + + FieldStore *fieldStore = (FieldStore *) targetEntry->expr; + int numberOfSubFields = list_length(fieldStore->newvals); + for (int subFieldIndex = 0; subFieldIndex < numberOfSubFields; subFieldIndex++) + { + TargetEntry *subFieldTargetEntry = + CreateFieldStoreTargetEntryForSubField(targetEntry, subFieldIndex); + extendedFieldStoreList = lappend(extendedFieldStoreList, subFieldTargetEntry); + } + + return extendedFieldStoreList; +} + + +/* + * CreateFieldStoreTargetEntryForSubField takes a TargetEntry pointing to a + * FieldStore and returns a new TargetEntry for specified subFieldIndex. + */ +static TargetEntry * +CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry, int subFieldIndex) +{ + TargetEntry *newTargetEntry = copyObject(targetEntry); + FieldStore *fieldStore = (FieldStore *) newTargetEntry->expr; + + int fieldNum = list_nth_int(fieldStore->fieldnums, subFieldIndex); + fieldStore->fieldnums = list_make1_int(fieldNum); + + Node *fieldNewVal = list_nth(fieldStore->newvals, subFieldIndex); + fieldStore->newvals = list_make1(fieldNewVal); + + return newTargetEntry; +} diff --git a/src/backend/distributed/deparser/ruleutils_11.c b/src/backend/distributed/deparser/ruleutils_11.c index f854003e6..d84f65a55 100644 --- a/src/backend/distributed/deparser/ruleutils_11.c +++ b/src/backend/distributed/deparser/ruleutils_11.c @@ -3057,7 +3057,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3333,6 +3335,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/backend/distributed/deparser/ruleutils_12.c b/src/backend/distributed/deparser/ruleutils_12.c index b3b7b2151..ca813f3e7 100644 --- a/src/backend/distributed/deparser/ruleutils_12.c +++ b/src/backend/distributed/deparser/ruleutils_12.c @@ -3069,7 +3069,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3345,6 +3347,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c index 8af283569..af70d6236 100644 --- a/src/backend/distributed/deparser/ruleutils_13.c +++ b/src/backend/distributed/deparser/ruleutils_13.c @@ -2994,7 +2994,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3270,6 +3272,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 6de98c626..53bc042b3 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -42,6 +42,7 @@ extern List * pg_get_table_grants(Oid relationId); extern bool contain_nextval_expression_walker(Node *node, void *context); extern char * pg_get_replica_identity_command(Oid tableRelationId); extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier); +extern List * ExtendFieldStoreTargetEntries(List *targetList); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/test/regress/expected/distributed_types.out b/src/test/regress/expected/distributed_types.out index 242ebfa34..17dfa2a84 100644 --- a/src/test/regress/expected/distributed_types.out +++ b/src/test/regress/expected/distributed_types.out @@ -380,6 +380,39 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN (' (2 rows) RESET citus.enable_create_type_propagation; +CREATE TYPE ct1 as (int_1 int, int_2 int); +CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_1', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2); +SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col; + int_col | ct1_col +--------------------------------------------------------------------- + 0 | (1,2) +(1 row) + +CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int); +CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_2', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2); +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5); +UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4; +SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col; + int_col | ct2_col | ct1_col +--------------------------------------------------------------------- + 1 | (,text1,0) | (,2) + 4 | (,text2,3) | (,10) +(2 rows) + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE; diff --git a/src/test/regress/sql/distributed_types.sql b/src/test/regress/sql/distributed_types.sql index 582b35e57..e4b5b4dcb 100644 --- a/src/test/regress/sql/distributed_types.sql +++ b/src/test/regress/sql/distributed_types.sql @@ -240,6 +240,24 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN (' RESET citus.enable_create_type_propagation; +CREATE TYPE ct1 as (int_1 int, int_2 int); +CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_1', 'int_col'); + +INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2); +SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col; + +CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int); +CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_2', 'int_col'); + +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2); +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5); + +UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4; + +SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col; + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE;