Improve table conversion logic in dist-local joins

pull/4358/head
Sait Talha Nisanci 2020-12-09 01:47:36 +03:00
parent 5618f3a3fc
commit 13c43d5744
9 changed files with 452 additions and 41 deletions

View File

@ -74,25 +74,29 @@ typedef struct ConversionCandidates
List *localTableList; /* local or citus local table */
}ConversionCandidates;
typedef struct IndexColumns
{
List *indexColumnNos;
}IndexColumns;
static bool HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
RelationRestriction *relationRestriction);
static List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
PlannerRestrictionContext *
plannerRestrictionContext);
static ConversionCandidates * CreateConversionCandidates(FromExpr *joinTree,
PlannerRestrictionContext *
static ConversionCandidates * CreateConversionCandidates(PlannerRestrictionContext *
plannerRestrictionContext,
List *rangeTableList,
Oid resultRelationId);
static void GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes);
static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(FromExpr *joinTree,
ConversionCandidates *
static RangeTableEntryDetails * GetNextRTEToConvertToSubquery(ConversionCandidates *
conversionCandidates,
PlannerRestrictionContext *
plannerRestrictionContext);
static void RemoveFromConversionCandidates(ConversionCandidates *conversionCandidates,
int rteIdentity);
static bool AllRangeTableEntriesHaveUniqueIndex(List *rangeTableEntryDetailsList);
static bool FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList);
/*
* RecursivelyPlanLocalTableJoins gets a query and the planner
@ -112,15 +116,14 @@ RecursivelyPlanLocalTableJoins(Query *query,
resultRelationId = ModifyQueryResultRelationId(query);
}
ConversionCandidates *conversionCandidates =
CreateConversionCandidates(query->jointree, plannerRestrictionContext,
CreateConversionCandidates(plannerRestrictionContext,
rangeTableList, resultRelationId);
while (ShouldConvertLocalTableJoinsToSubqueries(query, rangeTableList,
plannerRestrictionContext))
{
FromExpr *joinTree = query->jointree;
RangeTableEntryDetails *rangeTableEntryDetails =
GetNextRTEToConvertToSubquery(joinTree, conversionCandidates,
GetNextRTEToConvertToSubquery(conversionCandidates,
plannerRestrictionContext);
if (rangeTableEntryDetails == NULL)
{
@ -140,11 +143,10 @@ RecursivelyPlanLocalTableJoins(Query *query,
/*
* GetNextRTEToConvertToSubquery returns the range table entry
* which should be converted to a subquery. It considers the local join policy
* and result relation.
* for conversion priorities.
*/
static RangeTableEntryDetails *
GetNextRTEToConvertToSubquery(FromExpr *joinTree,
ConversionCandidates *conversionCandidates,
GetNextRTEToConvertToSubquery(ConversionCandidates *conversionCandidates,
PlannerRestrictionContext *plannerRestrictionContext)
{
RangeTableEntryDetails *localRTECandidate = NULL;
@ -169,6 +171,11 @@ GetNextRTEToConvertToSubquery(FromExpr *joinTree,
}
else
{
/*
* We want to convert distributed tables only if all the distributed tables
* have a constant filter on a unique index, otherwise we would be redundantly
* converting a distributed table as we will convert all the other local tables.
*/
bool allRangeTableEntriesHaveUniqueIndex = AllRangeTableEntriesHaveUniqueIndex(
conversionCandidates->distributedTableList);
@ -284,15 +291,22 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
}
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
List *rteEqualityQuals =
if (ContainsFalseClause(restrictClauseList))
{
/* If there is a WHERE FALSE, we consider it as a constant filter. */
return true;
}
List *rteEqualityColumnsNos =
FetchEqualityAttrNumsForRTE((Node *) restrictClauseList);
List *uniqueIndexAttrNumbers = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
List *uniqueIndexColumnsList = ExecuteFunctionOnEachTableIndex(rangeTableEntry->relid,
GetAllUniqueIndexes);
int columnNumber = 0;
foreach_int(columnNumber, uniqueIndexAttrNumbers)
IndexColumns *indexColumns = NULL;
foreach_ptr(indexColumns, uniqueIndexColumnsList)
{
if (list_member_int(rteEqualityQuals, columnNumber))
List *uniqueIndexColumnNos = indexColumns->indexColumnNos;
if (FirstIntListContainsSecondIntList(rteEqualityColumnsNos,
uniqueIndexColumnNos))
{
return true;
}
@ -301,6 +315,25 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
}
/*
* FirstIntListContainsSecondIntList returns true if the first int List
* contains every element of the second int List.
*/
static bool
FirstIntListContainsSecondIntList(List *firstIntList, List *secondIntList)
{
int curInt = 0;
foreach_int(curInt, secondIntList)
{
if (!list_member_int(firstIntList, curInt))
{
return false;
}
}
return true;
}
/*
* GetAllUniqueIndexes adds the given index's column numbers if it is a
* unique index.
@ -308,15 +341,23 @@ HasConstantFilterOnUniqueColumn(RangeTblEntry *rangeTableEntry,
* probably return true only if all the columns in the index exist in the filter.
*/
static void
GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexes)
GetAllUniqueIndexes(Form_pg_index indexForm, List **uniqueIndexGroups)
{
if (indexForm->indisunique || indexForm->indisprimary)
{
IndexColumns *indexColumns = palloc0(sizeof(IndexColumns));
List *uniqueIndexes = NIL;
for (int i = 0; i < indexForm->indkey.dim1; i++)
{
*uniqueIndexes = list_append_unique_int(*uniqueIndexes,
indexForm->indkey.values[i]);
uniqueIndexes = list_append_unique_int(uniqueIndexes,
indexForm->indkey.values[i]);
}
if (list_length(uniqueIndexes) == 0)
{
return;
}
indexColumns->indexColumnNos = uniqueIndexes;
*uniqueIndexGroups = lappend(*uniqueIndexGroups, indexColumns);
}
}
@ -367,8 +408,7 @@ RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
* be converted to a subquery so that citus planners can work.
*/
static ConversionCandidates *
CreateConversionCandidates(FromExpr *joinTree,
PlannerRestrictionContext *plannerRestrictionContext,
CreateConversionCandidates(PlannerRestrictionContext *plannerRestrictionContext,
List *rangeTableList, Oid resultRelationId)
{
ConversionCandidates *conversionCandidates =

View File

@ -3665,11 +3665,13 @@ FetchEqualityAttrNumsForRTE(Node *node)
return NIL;
}
/*
* FetchEqualityAttrNumsForList fetches the constant equality numbers
* from the given node list.
*/
static List *FetchEqualityAttrNumsForList(List *nodeList)
static List *
FetchEqualityAttrNumsForList(List *nodeList)
{
List *attributeNums = NIL;
Node *node = NULL;
@ -3682,7 +3684,7 @@ static List *FetchEqualityAttrNumsForList(List *nodeList)
attributeNums = list_concat(attributeNums, fetchedEqualityAttrNums);
}
/*
/*
* the given list is in the form of AND'ed expressions
* hence if we have one equality then it is enough.
* E.g: dist.a = 5 AND dist.a > 10

View File

@ -1769,6 +1769,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
/*
* If tupleDesc is NULL we have 2 different cases:
*
@ -1818,6 +1819,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex);
}
/* use the types in the function definition otherwise */
else
{

View File

@ -1,12 +1,7 @@
CREATE SCHEMA citus_local_dist_joins;
SET search_path TO citus_local_dist_joins;
SET client_min_messages to ERROR;
SELECT master_add_node('localhost', :master_port, groupId => 0);
master_add_node
---------------------------------------------------------------------
5
(1 row)
SELECT master_add_node('localhost', :master_port, groupId => 0) AS coordinator_nodeid \gset
CREATE TABLE citus_local(key int, value text);
SELECT create_citus_local_table('citus_local');
create_citus_local_table

View File

@ -756,14 +756,14 @@ SET b = 6
FROM distributed_table_cte
WHERE citus_local_table.a = distributed_table_cte.a;
NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) distributed_table_cte.a)
SET citus.log_local_commands to off;
-- just works
WITH reference_table_cte AS (SELECT * FROM reference_table)
UPDATE citus_local_table
SET b = 6
FROM reference_table_cte
WHERE citus_local_table.a = reference_table_cte.a;
NOTICE: executing the command locally: SELECT a, b FROM citus_local_table_queries.reference_table_1509002 reference_table
NOTICE: executing the command locally: UPDATE citus_local_table_queries.citus_local_table_1509000 citus_local_table SET b = 6 FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table_cte WHERE (citus_local_table.a OPERATOR(pg_catalog.=) reference_table_cte.a)
set citus.log_local_commands to on;
---------------------------------------------------------------------
----- VIEW QUERIES -----
---------------------------------------------------------------------

View File

@ -397,11 +397,82 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
-- We will plan postgres table as the index is on key,value not just key
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10)
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (distributed_table_composite.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 OR distributed_table_composite.key = 20;
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20))
count
---------------------------------------------------------------------
2
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text';
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 AND distributed_table_composite.value = 'text';
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 10) AND (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.key = 20)
AND (distributed_table_composite.value = 'text' OR distributed_table_composite.value = 'text');
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20)) AND ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.value = 'text')
AND (distributed_table_composite.value = 'text' OR distributed_table_composite.key = 30);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) OR (value OPERATOR(pg_catalog.=) 'text'::text)) AND ((value OPERATOR(pg_catalog.=) 'text'::text) OR (key OPERATOR(pg_catalog.=) 30)))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) OR (value OPERATOR(pg_catalog.=) 'text'::text)) AND ((value OPERATOR(pg_catalog.=) 'text'::text) OR (key OPERATOR(pg_catalog.=) 30)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) AND ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 30)))
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text')
OR (distributed_table_composite.value = 'text' AND distributed_table_composite.key = 30);
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.distributed_table_composite JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)) OR ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) AND (distributed_table_composite.key OPERATOR(pg_catalog.=) 30)))
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.key = 20)
OR (distributed_table_composite.value = 'text' AND distributed_table_composite.value = 'text');
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 20)) OR ((value OPERATOR(pg_catalog.=) 'text'::text) AND (value OPERATOR(pg_catalog.=) 'text'::text)))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE (((key OPERATOR(pg_catalog.>) 10) AND (key OPERATOR(pg_catalog.=) 20)) OR ((value OPERATOR(pg_catalog.=) 'text'::text) AND (value OPERATOR(pg_catalog.=) 'text'::text)))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite JOIN local_table_join.postgres_table USING (key)) WHERE (((distributed_table_composite.key OPERATOR(pg_catalog.>) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.=) 20)) OR ((distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text) AND (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text)))
count
---------------------------------------------------------------------
1
@ -764,8 +835,6 @@ ERROR: relation postgres_table is not distributed
UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10;
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, value_2 FROM local_table_join.postgres_table
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.reference_table SET key = 1 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) l WHERE (l.key OPERATOR(pg_catalog.=) 10)
-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE?
-- though then the planner could give an error
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE;
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false
@ -775,6 +844,244 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c
0
(1 row)
SELECT count(*) FROM (SELECT * FROM distributed_table JOIN postgres_table USING(key) WHERE false) foo JOIN local_partitioned_table USING(key);
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE false
DEBUG: Wrapping relation "local_partitioned_table" to a subquery: SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE false
DEBUG: generating subplan XXX_2 for subquery SELECT key, NULL::text AS value FROM local_table_join.local_partitioned_table WHERE false
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table.key, distributed_table.value, distributed_table.value_2, postgres_table.value, postgres_table.value_2 FROM (local_table_join.distributed_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE false) foo(key, value, value_2, value_1, value_2_1) JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) local_partitioned_table USING (key))
count
---------------------------------------------------------------------
0
(1 row)
WITH dist_cte AS (SELECT * FROM distributed_table_pkey WHERE key = 5)
SELECT COUNT(*) FROM dist_cte JOIN postgres_table USING(key) WHERE dist_cte.key = 5;
DEBUG: CTE dist_cte is going to be inlined via distributed planning
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2 FROM local_table_join.distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)) dist_cte JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table USING (key)) WHERE (dist_cte.key OPERATOR(pg_catalog.=) 5)
count
---------------------------------------------------------------------
1
(1 row)
SELECT COUNT(*) FROM postgres_table JOIN distributed_table_pkey USING(key)
WHERE (distributed_table_pkey.key IN (SELECT COUNT(*) AS count FROM postgres_table JOIN distributed_table USING(key)) );
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table USING (key))
DEBUG: Wrapping relation "postgres_table" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: generating subplan XXX_3 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.postgres_table WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) postgres_table JOIN local_table_join.distributed_table_pkey USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)))
count
---------------------------------------------------------------------
1
(1 row)
-- PREPARED statements
PREPARE local_dist_table_join_select(int) AS SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table USING(key) WHERE distributed_table_pkey.key = $1;
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
EXECUTE local_dist_table_join_select(10);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 10)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.postgres_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 10)
count
---------------------------------------------------------------------
1
(1 row)
PREPARE local_dist_table_join_update(int) AS UPDATE postgres_table SET key = 5 FROM distributed_table_pkey WHERE distributed_table_pkey.key = $1;
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
EXECUTE local_dist_table_join_update(20);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 20)
DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE local_table_join.postgres_table SET key = 5 FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 20)
PREPARE local_dist_table_join_subquery(int) AS SELECT COUNT(*) FROM postgres_table JOIN (SELECT * FROM distributed_table_pkey JOIN local_partitioned_table USING(key) WHERE distributed_table_pkey.key = $1) foo USING(key);
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
EXECUTE local_dist_table_join_subquery(5);
DEBUG: Wrapping relation "distributed_table_pkey" to a subquery: SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_1 for subquery SELECT key, NULL::text AS value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_pkey WHERE (key OPERATOR(pg_catalog.=) 5)
DEBUG: generating subplan XXX_2 for subquery SELECT distributed_table_pkey.key, distributed_table_pkey.value, distributed_table_pkey.value_2, local_partitioned_table.value FROM ((SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_pkey JOIN local_table_join.local_partitioned_table USING (key)) WHERE (distributed_table_pkey.key OPERATOR(pg_catalog.=) 5)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.postgres_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2, intermediate_result.value_1 AS value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb, value_1 text)) foo(key, value, value_2, value_1) USING (key))
count
---------------------------------------------------------------------
100
(1 row)
PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key)
WHERE(
distributed_table_composite.key = $1 OR
distributed_table_composite.key = 20 OR
(distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR
distributed_table_composite.value = 'text'
);
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
EXECUTE local_dist_table_join_filters(20);
DEBUG: Wrapping relation "distributed_table_composite" to a subquery: SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: generating subplan XXX_1 for subquery SELECT key, value, NULL::jsonb AS value_2 FROM local_table_join.distributed_table_composite WHERE ((key OPERATOR(pg_catalog.=) 20) OR (key OPERATOR(pg_catalog.=) 20) OR ((key OPERATOR(pg_catalog.=) 10) AND (key OPERATOR(pg_catalog.>) 0)) OR (value OPERATOR(pg_catalog.=) 'text'::text))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (local_table_join.local_partitioned_table JOIN (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.value_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, value_2 jsonb)) distributed_table_composite USING (key)) WHERE ((distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR (distributed_table_composite.key OPERATOR(pg_catalog.=) 20) OR ((distributed_table_composite.key OPERATOR(pg_catalog.=) 10) AND (distributed_table_composite.key OPERATOR(pg_catalog.>) 0)) OR (distributed_table_composite.value OPERATOR(pg_catalog.=) 'text'::text))
count
---------------------------------------------------------------------
2
(1 row)
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA local_table_join CASCADE;

View File

@ -2,7 +2,7 @@ CREATE SCHEMA citus_local_dist_joins;
SET search_path TO citus_local_dist_joins;
SET client_min_messages to ERROR;
SELECT master_add_node('localhost', :master_port, groupId => 0);
SELECT master_add_node('localhost', :master_port, groupId => 0) AS coordinator_nodeid \gset
CREATE TABLE citus_local(key int, value text);

View File

@ -413,13 +413,14 @@ SET b = 6
FROM distributed_table_cte
WHERE citus_local_table.a = distributed_table_cte.a;
SET citus.log_local_commands to off;
-- just works
WITH reference_table_cte AS (SELECT * FROM reference_table)
UPDATE citus_local_table
SET b = 6
FROM reference_table_cte
WHERE citus_local_table.a = reference_table_cte.a;
set citus.log_local_commands to on;
------------------------
----- VIEW QUERIES -----
------------------------

View File

@ -119,8 +119,23 @@ SELECT count(*) FROM (SELECT *, random() FROM distributed_table_pkey) as d1 JOI
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key = d1.key AND d1.key < postgres_table.key) WHERE d1.key = 1 AND false;
SELECT count(*) FROM (SELECT *, random() FROM distributed_partitioned_table) as d1 JOIN postgres_table ON (postgres_table.key::int = d1.key::int AND d1.key < postgres_table.key) WHERE d1.key::int = 1 AND false;
-- TODO:: We should probably recursively plan postgres table here because primary key is on key,value not key.
-- We will plan postgres table as the index is on key,value not just key
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10;
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 OR distributed_table_composite.key = 20;
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text';
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key) WHERE distributed_table_composite.key = 10 AND distributed_table_composite.value = 'text';
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.key = 20)
AND (distributed_table_composite.value = 'text' OR distributed_table_composite.value = 'text');
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 OR distributed_table_composite.value = 'text')
AND (distributed_table_composite.value = 'text' OR distributed_table_composite.key = 30);
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.value = 'text')
OR (distributed_table_composite.value = 'text' AND distributed_table_composite.key = 30);
SELECT count(*) FROM distributed_table_composite JOIN postgres_table USING(key)
WHERE (distributed_table_composite.key > 10 AND distributed_table_composite.key = 20)
OR (distributed_table_composite.value = 'text' AND distributed_table_composite.value = 'text');
-- a unique index on key so dist table should be recursively planned
SELECT count(*) FROM postgres_table JOIN distributed_table_pkey USING(key);
@ -207,10 +222,59 @@ UPDATE reference_table SET key = 1 FROM postgres_table WHERE postgres_table.key
UPDATE reference_table SET key = 1 FROM (SELECT * FROM postgres_table) l WHERE l.key = 10;
-- TODO:: we should probably not wrap postgres_table here as there is a WHERE FALSE?
-- though then the planner could give an error
SELECT count(*) FROM postgres_table JOIN distributed_table USING(key) WHERE FALSE;
SELECT count(*) FROM (SELECT * FROM distributed_table JOIN postgres_table USING(key) WHERE false) foo JOIN local_partitioned_table USING(key);
WITH dist_cte AS (SELECT * FROM distributed_table_pkey WHERE key = 5)
SELECT COUNT(*) FROM dist_cte JOIN postgres_table USING(key) WHERE dist_cte.key = 5;
SELECT COUNT(*) FROM postgres_table JOIN distributed_table_pkey USING(key)
WHERE (distributed_table_pkey.key IN (SELECT COUNT(*) AS count FROM postgres_table JOIN distributed_table USING(key)) );
-- PREPARED statements
PREPARE local_dist_table_join_select(int) AS SELECT COUNT(*) FROM distributed_table_pkey JOIN postgres_table USING(key) WHERE distributed_table_pkey.key = $1;
EXECUTE local_dist_table_join_select(10);
EXECUTE local_dist_table_join_select(10);
EXECUTE local_dist_table_join_select(10);
EXECUTE local_dist_table_join_select(10);
EXECUTE local_dist_table_join_select(10);
EXECUTE local_dist_table_join_select(10);
PREPARE local_dist_table_join_update(int) AS UPDATE postgres_table SET key = 5 FROM distributed_table_pkey WHERE distributed_table_pkey.key = $1;
EXECUTE local_dist_table_join_update(20);
EXECUTE local_dist_table_join_update(20);
EXECUTE local_dist_table_join_update(20);
EXECUTE local_dist_table_join_update(20);
EXECUTE local_dist_table_join_update(20);
EXECUTE local_dist_table_join_update(20);
PREPARE local_dist_table_join_subquery(int) AS SELECT COUNT(*) FROM postgres_table JOIN (SELECT * FROM distributed_table_pkey JOIN local_partitioned_table USING(key) WHERE distributed_table_pkey.key = $1) foo USING(key);
EXECUTE local_dist_table_join_subquery(5);
EXECUTE local_dist_table_join_subquery(5);
EXECUTE local_dist_table_join_subquery(5);
EXECUTE local_dist_table_join_subquery(5);
EXECUTE local_dist_table_join_subquery(5);
EXECUTE local_dist_table_join_subquery(5);
PREPARE local_dist_table_join_filters(int) AS SELECT COUNT(*) FROM local_partitioned_table JOIN distributed_table_composite USING(key)
WHERE(
distributed_table_composite.key = $1 OR
distributed_table_composite.key = 20 OR
(distributed_table_composite.key = 10 AND distributed_table_composite.key > 0) OR
distributed_table_composite.value = 'text'
);
EXECUTE local_dist_table_join_filters(20);
EXECUTE local_dist_table_join_filters(20);
EXECUTE local_dist_table_join_filters(20);
EXECUTE local_dist_table_join_filters(20);
EXECUTE local_dist_table_join_filters(20);
EXECUTE local_dist_table_join_filters(20);
RESET client_min_messages;
\set VERBOSITY terse
DROP SCHEMA local_table_join CASCADE;