diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index 2dd90a722..b4f66b8d2 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -74,6 +74,9 @@ static void AppendStorageParametersToString(StringInfo stringBuffer, static const char * convert_aclright_to_string(int aclright); static void simple_quote_literal(StringInfo buf, const char *val); static char * flatten_reloptions(Oid relid); +static List * ExtendFieldStoreTargetEntry(TargetEntry *targetEntry); +static TargetEntry * CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry, + int subFieldIndex); /* * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to @@ -1383,3 +1386,83 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier) } } } + + +/* + * ExtendFieldStoreTargetEntries takes a list of target entries and retuns a + * new list by extending each target entry pointing to a FieldStore by + * executing ExtendFieldStoreTargetEntry. + * + * This is required for deparsing INSERT/UPDATE queries for remote execution + * as postgres planner combines field accesses to same column into in a single + * FieldStore object but processIndirection does not know how to handle such + * FieldStore objects. + */ +List * +ExtendFieldStoreTargetEntries(List *targetList) +{ + List *extendedTargetList = NIL; + + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, targetList) + { + Node *expr = (Node *) targetEntry->expr; + if (IsA(expr, FieldStore)) + { + List *fieldStoreTargetEntries = ExtendFieldStoreTargetEntry(targetEntry); + extendedTargetList = list_concat(extendedTargetList, + fieldStoreTargetEntries); + } + else + { + extendedTargetList = lappend(extendedTargetList, targetEntry); + } + } + + return extendedTargetList; +} + + +/* + * ExtendFieldStoreTargetEntry takes a target entry pointing to a FieldStore + * object and returns a list of TargetEntry's by seperating each element in + * newvals and fieldnums lists into separate TargetEntry objects. + */ +static List * +ExtendFieldStoreTargetEntry(TargetEntry *targetEntry) +{ + Assert(IsA(targetEntry->expr, FieldStore)); + + List *extendedFieldStoreList = NIL; + + FieldStore *fieldStore = (FieldStore *) targetEntry->expr; + int numberOfSubFields = list_length(fieldStore->newvals); + for (int subFieldIndex = 0; subFieldIndex < numberOfSubFields; subFieldIndex++) + { + TargetEntry *subFieldTargetEntry = + CreateFieldStoreTargetEntryForSubField(targetEntry, subFieldIndex); + extendedFieldStoreList = lappend(extendedFieldStoreList, subFieldTargetEntry); + } + + return extendedFieldStoreList; +} + + +/* + * CreateFieldStoreTargetEntryForSubField takes a TargetEntry pointing to a + * FieldStore and returns a new TargetEntry for specified subFieldIndex. + */ +static TargetEntry * +CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry, int subFieldIndex) +{ + TargetEntry *newTargetEntry = copyObject(targetEntry); + FieldStore *fieldStore = (FieldStore *) newTargetEntry->expr; + + int fieldNum = list_nth_int(fieldStore->fieldnums, subFieldIndex); + fieldStore->fieldnums = list_make1_int(fieldNum); + + Node *fieldNewVal = list_nth(fieldStore->newvals, subFieldIndex); + fieldStore->newvals = list_make1(fieldNewVal); + + return newTargetEntry; +} diff --git a/src/backend/distributed/deparser/ruleutils_11.c b/src/backend/distributed/deparser/ruleutils_11.c index f854003e6..d84f65a55 100644 --- a/src/backend/distributed/deparser/ruleutils_11.c +++ b/src/backend/distributed/deparser/ruleutils_11.c @@ -3057,7 +3057,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3333,6 +3335,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/backend/distributed/deparser/ruleutils_12.c b/src/backend/distributed/deparser/ruleutils_12.c index b3b7b2151..ca813f3e7 100644 --- a/src/backend/distributed/deparser/ruleutils_12.c +++ b/src/backend/distributed/deparser/ruleutils_12.c @@ -3069,7 +3069,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3345,6 +3347,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/backend/distributed/deparser/ruleutils_13.c b/src/backend/distributed/deparser/ruleutils_13.c index 8af283569..af70d6236 100644 --- a/src/backend/distributed/deparser/ruleutils_13.c +++ b/src/backend/distributed/deparser/ruleutils_13.c @@ -2994,7 +2994,9 @@ get_insert_query_def(Query *query, deparse_context *context) sep = ""; if (query->targetList) appendStringInfoChar(buf, '('); - foreach(l, query->targetList) + + List *targetList = ExtendFieldStoreTargetEntries(query->targetList); + foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); @@ -3270,6 +3272,8 @@ get_update_query_targetlist_def(Query *query, List *targetList, /* Add the comma separated list of 'attname = value' */ sep = ""; + + targetList = ExtendFieldStoreTargetEntries(targetList); foreach(l, targetList) { TargetEntry *tle = (TargetEntry *) lfirst(l); diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 6de98c626..53bc042b3 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -42,6 +42,7 @@ extern List * pg_get_table_grants(Oid relationId); extern bool contain_nextval_expression_walker(Node *node, void *context); extern char * pg_get_replica_identity_command(Oid tableRelationId); extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier); +extern List * ExtendFieldStoreTargetEntries(List *targetList); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/test/regress/expected/distributed_types.out b/src/test/regress/expected/distributed_types.out index 242ebfa34..17dfa2a84 100644 --- a/src/test/regress/expected/distributed_types.out +++ b/src/test/regress/expected/distributed_types.out @@ -380,6 +380,39 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN (' (2 rows) RESET citus.enable_create_type_propagation; +CREATE TYPE ct1 as (int_1 int, int_2 int); +CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_1', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2); +SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col; + int_col | ct1_col +--------------------------------------------------------------------- + 0 | (1,2) +(1 row) + +CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int); +CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_2', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2); +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5); +UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4; +SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col; + int_col | ct2_col | ct1_col +--------------------------------------------------------------------- + 1 | (,text1,0) | (,2) + 4 | (,text2,3) | (,10) +(2 rows) + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE; diff --git a/src/test/regress/sql/distributed_types.sql b/src/test/regress/sql/distributed_types.sql index 582b35e57..e4b5b4dcb 100644 --- a/src/test/regress/sql/distributed_types.sql +++ b/src/test/regress/sql/distributed_types.sql @@ -240,6 +240,24 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN (' RESET citus.enable_create_type_propagation; +CREATE TYPE ct1 as (int_1 int, int_2 int); +CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_1', 'int_col'); + +INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2); +SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col; + +CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int); +CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1); +SELECT create_distributed_table('field_indirection_test_2', 'int_col'); + +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2); +INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5); + +UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4; + +SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col; + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE;