Handle INSERT/UPDATE queries with comp.type field access properly

Planner combines multiple fields accesses to same composite type column
in a single FieldStore object and "processIndirection" function that we
use to deparse such queries does not know how to handle such FieldStore
objects..

For that, we split field references in FiedlStore objects into
individual ones.
pull/4234/head
Onur Tirtir 2020-10-12 13:35:05 +03:00
parent 93764a3782
commit a225652c29
7 changed files with 150 additions and 3 deletions

View File

@ -74,6 +74,9 @@ static void AppendStorageParametersToString(StringInfo stringBuffer,
static const char * convert_aclright_to_string(int aclright);
static void simple_quote_literal(StringInfo buf, const char *val);
static char * flatten_reloptions(Oid relid);
static List * ExtendFieldStoreTargetEntry(TargetEntry *targetEntry);
static TargetEntry * CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry,
int subFieldIndex);
/*
* pg_get_extensiondef_string finds the foreign data wrapper that corresponds to
@ -1383,3 +1386,83 @@ RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier)
}
}
}
/*
* ExtendFieldStoreTargetEntries takes a list of target entries and retuns a
* new list by extending each target entry pointing to a FieldStore by
* executing ExtendFieldStoreTargetEntry.
*
* This is required for deparsing INSERT/UPDATE queries for remote execution
* as postgres planner combines field accesses to same column into in a single
* FieldStore object but processIndirection does not know how to handle such
* FieldStore objects.
*/
List *
ExtendFieldStoreTargetEntries(List *targetList)
{
List *extendedTargetList = NIL;
TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, targetList)
{
Node *expr = (Node *) targetEntry->expr;
if (IsA(expr, FieldStore))
{
List *fieldStoreTargetEntries = ExtendFieldStoreTargetEntry(targetEntry);
extendedTargetList = list_concat(extendedTargetList,
fieldStoreTargetEntries);
}
else
{
extendedTargetList = lappend(extendedTargetList, targetEntry);
}
}
return extendedTargetList;
}
/*
* ExtendFieldStoreTargetEntry takes a target entry pointing to a FieldStore
* object and returns a list of TargetEntry's by seperating each element in
* newvals and fieldnums lists into separate TargetEntry objects.
*/
static List *
ExtendFieldStoreTargetEntry(TargetEntry *targetEntry)
{
Assert(IsA(targetEntry->expr, FieldStore));
List *extendedFieldStoreList = NIL;
FieldStore *fieldStore = (FieldStore *) targetEntry->expr;
int numberOfSubFields = list_length(fieldStore->newvals);
for (int subFieldIndex = 0; subFieldIndex < numberOfSubFields; subFieldIndex++)
{
TargetEntry *subFieldTargetEntry =
CreateFieldStoreTargetEntryForSubField(targetEntry, subFieldIndex);
extendedFieldStoreList = lappend(extendedFieldStoreList, subFieldTargetEntry);
}
return extendedFieldStoreList;
}
/*
* CreateFieldStoreTargetEntryForSubField takes a TargetEntry pointing to a
* FieldStore and returns a new TargetEntry for specified subFieldIndex.
*/
static TargetEntry *
CreateFieldStoreTargetEntryForSubField(TargetEntry *targetEntry, int subFieldIndex)
{
TargetEntry *newTargetEntry = copyObject(targetEntry);
FieldStore *fieldStore = (FieldStore *) newTargetEntry->expr;
int fieldNum = list_nth_int(fieldStore->fieldnums, subFieldIndex);
fieldStore->fieldnums = list_make1_int(fieldNum);
Node *fieldNewVal = list_nth(fieldStore->newvals, subFieldIndex);
fieldStore->newvals = list_make1(fieldNewVal);
return newTargetEntry;
}

View File

@ -3057,7 +3057,9 @@ get_insert_query_def(Query *query, deparse_context *context)
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
List *targetList = ExtendFieldStoreTargetEntries(query->targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
@ -3333,6 +3335,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/* Add the comma separated list of 'attname = value' */
sep = "";
targetList = ExtendFieldStoreTargetEntries(targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

View File

@ -3069,7 +3069,9 @@ get_insert_query_def(Query *query, deparse_context *context)
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
List *targetList = ExtendFieldStoreTargetEntries(query->targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
@ -3345,6 +3347,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/* Add the comma separated list of 'attname = value' */
sep = "";
targetList = ExtendFieldStoreTargetEntries(targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

View File

@ -2994,7 +2994,9 @@ get_insert_query_def(Query *query, deparse_context *context)
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
List *targetList = ExtendFieldStoreTargetEntries(query->targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);
@ -3270,6 +3272,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/* Add the comma separated list of 'attname = value' */
sep = "";
targetList = ExtendFieldStoreTargetEntries(targetList);
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

View File

@ -42,6 +42,7 @@ extern List * pg_get_table_grants(Oid relationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
extern const char * RoleSpecString(RoleSpec *spec, bool withQuoteIdentifier);
extern List * ExtendFieldStoreTargetEntries(List *targetList);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -380,6 +380,39 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN ('
(2 rows)
RESET citus.enable_create_type_propagation;
CREATE TYPE ct1 as (int_1 int, int_2 int);
CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1);
SELECT create_distributed_table('field_indirection_test_1', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2);
SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col;
int_col | ct1_col
---------------------------------------------------------------------
0 | (1,2)
(1 row)
CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int);
CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1);
SELECT create_distributed_table('field_indirection_test_2', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2);
INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5);
UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4;
SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col;
int_col | ct2_col | ct1_col
---------------------------------------------------------------------
1 | (,text1,0) | (,2)
4 | (,text2,3) | (,10)
(2 rows)
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;

View File

@ -240,6 +240,24 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_type where typname IN ('
RESET citus.enable_create_type_propagation;
CREATE TYPE ct1 as (int_1 int, int_2 int);
CREATE TABLE field_indirection_test_1 (int_col int, ct1_col ct1);
SELECT create_distributed_table('field_indirection_test_1', 'int_col');
INSERT INTO field_indirection_test_1 (int_col, ct1_col.int_1, ct1_col.int_2) VALUES (0, 1, 2);
SELECT * FROM field_indirection_test_1 ORDER BY int_col, ct1_col;
CREATE TYPE ct2 as (int_2 int, text_1 text, int_1 int);
CREATE TABLE field_indirection_test_2 (int_col int, ct2_col ct2, ct1_col ct1);
SELECT create_distributed_table('field_indirection_test_2', 'int_col');
INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (0, 1, 'text1', 2);
INSERT INTO field_indirection_test_2 (ct2_col.int_1, int_col, ct2_col.text_1, ct1_col.int_2) VALUES (3, 4, 'text1', 5);
UPDATE field_indirection_test_2 SET (ct2_col.text_1, ct1_col.int_2) = ('text2', 10) WHERE int_col=4;
SELECT * FROM field_indirection_test_2 ORDER BY int_col, ct2_col, ct1_col;
-- clear objects
SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA type_tests CASCADE;