Apply planner changes for citus local tables

pull/4143/head
Onur Tirtir 2020-08-31 11:52:21 +03:00
parent 0b1cc118a9
commit 3a73fba810
33 changed files with 4100 additions and 139 deletions

View File

@ -775,7 +775,7 @@ TableReferencing(Oid relationId)
/*
* ConstraintIsAForeignKey is a wrapper around GetForeignKeyOidByName that
* returns true if the given constraint name identifies a foreign key
* contraint defined on relation with relationId.
* constraint defined on relation with relationId.
*/
bool
ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)

View File

@ -211,7 +211,6 @@ static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */
static bool IsCitusTableViaCatalog(Oid relationId);
static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId);
static CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId);
static void BuildCachedShardList(CitusTableCacheEntry *cacheEntry);
static void PrepareWorkerNodeCache(void);
@ -954,7 +953,7 @@ GetCitusTableCacheEntry(Oid distributedRelationId)
* passed relationId. For efficiency it caches lookups. This function returns
* NULL if the relation isn't a distributed table.
*/
static CitusTableCacheEntry *
CitusTableCacheEntry *
LookupCitusTableCacheEntry(Oid relationId)
{
bool foundInCache = false;

View File

@ -123,6 +123,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla
Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
/* Distributed planner hook */
@ -2262,3 +2263,69 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
boundParams);
}
}
/*
* GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
* returns RTEListProperties for the rte list retrieved from query.
*/
RTEListProperties *
GetRTEListPropertiesForQuery(Query *query)
{
List *rteList = ExtractRangeTableEntryList(query);
return GetRTEListProperties(rteList);
}
/*
* GetRTEListProperties returns RTEListProperties struct processing the given
* rangeTableList.
*/
static RTEListProperties *
GetRTEListProperties(List *rangeTableList)
{
RTEListProperties *rteListProperties = palloc0(sizeof(RTEListProperties));
RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList)
{
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
continue;
}
Oid relationId = rangeTableEntry->relid;
CitusTableCacheEntry *cacheEntry = LookupCitusTableCacheEntry(relationId);
if (!cacheEntry)
{
rteListProperties->hasPostgresLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
{
rteListProperties->hasReferenceTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
{
rteListProperties->hasCitusLocalTable = true;
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
{
rteListProperties->hasDistributedTable = true;
}
else
{
/* it's not expected, but let's do a bug catch here */
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("encountered with an unexpected citus "
"table type while processing range table "
"entries of query")));
}
}
rteListProperties->hasCitusTable = (rteListProperties->hasDistributedTable ||
rteListProperties->hasReferenceTable ||
rteListProperties->hasCitusLocalTable);
return rteListProperties;
}

View File

@ -556,6 +556,40 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(subquery);
if (subqueryRteListProperties->hasDistributedTable &&
(subqueryRteListProperties->hasCitusLocalTable ||
subqueryRteListProperties->hasPostgresLocalTable))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot select from "
"distributed tables and local tables at the same time",
NULL, NULL);
}
if (subqueryRteListProperties->hasDistributedTable &&
IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot insert into a "
"citus local table",
NULL, NULL);
}
/*
* In some cases, it might be possible to allow postgres local tables
* in distributed insert select. However, we want to behave consistent
* on all cases including Citus MX, and let insert select via coordinator
* to kick-in.
*/
if (subqueryRteListProperties->hasPostgresLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot select from "
"a local table", NULL, NULL);
return NULL;
}
/* we do not expect to see a view in modify target */
foreach(rangeTableCell, queryTree->rtable)
{
@ -584,12 +618,19 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
return error;
}
/*
* If we're inserting into a reference table, all participating tables
* should be reference tables as well.
*/
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
if (IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
{
/*
* If we're inserting into a citus local table, it is ok because we've
* checked the non-existence of distributed tables in the subquery.
*/
}
else if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
{
/*
* If we're inserting into a reference table, all participating tables
* should be reference tables as well.
*/
if (!allReferenceTables)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
@ -726,7 +767,9 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
* prevent shard pruning logic (i.e, namely UpdateRelationNames())
* modifies range table entries, which makes hard to add the quals.
*/
if (!allReferenceTables)
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
copiedSubquery);
if (subqueryRteListProperties->hasDistributedTable)
{
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
}

View File

@ -311,7 +311,9 @@ NodeTryGetRteRelid(Node *node)
}
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (rangeTableEntry->rtekind != RTE_RELATION)
if (!(rangeTableEntry->rtekind == RTE_RELATION &&
rangeTableEntry->relkind == RELKIND_RELATION))
{
return InvalidOid;
}
@ -332,6 +334,18 @@ IsCitusTableRTE(Node *node)
}
/*
* IsPostgresLocalTableRte gets a node and returns true if the node is a
* range table relation entry that points to a postgres local table.
*/
bool
IsPostgresLocalTableRte(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return OidIsValid(relationId) && !IsCitusTable(relationId);
}
/*
* IsDistributedTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a distributed relation,
@ -357,6 +371,18 @@ IsReferenceTableRTE(Node *node)
}
/*
* IsCitusLocalTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a citus local table.
*/
bool
IsCitusLocalTableRTE(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return OidIsValid(relationId) && IsCitusTableType(relationId, CITUS_LOCAL_TABLE);
}
/*
* FullCompositeFieldList gets a composite field list, and checks if all fields
* of composite type are used in the list.
@ -926,6 +952,16 @@ DeferErrorIfQueryNotSupported(Query *queryTree)
errorMessage = "subquery in OFFSET is not supported in multi-shard queries";
}
RTEListProperties *queryRteListProperties = GetRTEListPropertiesForQuery(queryTree);
if (queryRteListProperties->hasCitusLocalTable ||
queryRteListProperties->hasPostgresLocalTable)
{
preconditionsSatisfied = false;
errorMessage = "direct joins between distributed and local tables are "
"not supported";
errorHint = LOCAL_TABLE_SUBQUERY_CTE_HINT;
}
/* finally check and error out if not satisfied */
if (!preconditionsSatisfied)
{

View File

@ -130,6 +130,12 @@ static bool IsTidColumn(Node *node);
static DeferredErrorMessage * ModifyPartialQuerySupported(Query *queryTree, bool
multiShardQuery,
Oid *distributedTableId);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithLocalTable(
Query *query);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId);
static DeferredErrorMessage * DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId);
static DeferredErrorMessage * MultiShardModifyQuerySupported(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
@ -570,22 +576,14 @@ static DeferredErrorMessage *
ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
Oid *distributedTableIdOutput)
{
uint32 rangeTableId = 1;
CmdType commandType = queryTree->commandType;
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
*distributedTableIdOutput = distributedTableId;
if (!IsCitusTable(distributedTableId))
DeferredErrorMessage *deferredError =
DeferErrorIfUnsupportedModifyQueryWithLocalTable(queryTree);
if (deferredError != NULL)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications of local tables involving "
"distributed tables",
NULL, NULL);
return deferredError;
}
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
DeferredErrorMessage *deferredError = DeferErrorIfModifyView(queryTree);
deferredError = DeferErrorIfModifyView(queryTree);
if (deferredError != NULL)
{
return deferredError;
@ -675,6 +673,11 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
}
}
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
uint32 rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
CmdType commandType = queryTree->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
@ -788,6 +791,97 @@ ModifyPartialQuerySupported(Query *queryTree, bool multiShardQuery,
return deferredError;
}
/* set it for caller to use when we don't return any errors */
*distributedTableIdOutput = distributedTableId;
return NULL;
}
/*
* DeferErrorIfUnsupportedModifyQueryWithLocalTable returns DeferredErrorMessage
* for unsupported modify queries that cannot be planned by router planner due to
* unsupported usage of postgres local or citus local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithLocalTable(Query *query)
{
RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query);
Oid targetRelationId = ModifyQueryResultRelationId(query);
DeferredErrorMessage *deferredErrorMessage =
DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(rteListProperties,
targetRelationId);
if (deferredErrorMessage)
{
return deferredErrorMessage;
}
deferredErrorMessage = DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
rteListProperties,
targetRelationId);
return deferredErrorMessage;
}
/*
* DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable is a helper function
* that takes RTEListProperties & targetRelationId and returns deferred error
* if query is not supported due to unsupported usage of citus local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithCitusLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId)
{
if (rteListProperties->hasDistributedTable && rteListProperties->hasCitusLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications with citus local tables and "
"distributed tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE) &&
rteListProperties->hasCitusLocalTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications of reference tables with citus "
"local tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
return NULL;
}
/*
* DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable is a helper
* function that takes RTEListProperties & targetRelationId and returns
* deferred error if query is not supported due to unsupported usage of
* postgres local tables.
*/
static DeferredErrorMessage *
DeferErrorIfUnsupportedModifyQueryWithPostgresLocalTable(
RTEListProperties *rteListProperties, Oid targetRelationId)
{
if (rteListProperties->hasPostgresLocalTable &&
rteListProperties->hasCitusTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications with local tables involving "
"citus tables", NULL,
LOCAL_TABLE_SUBQUERY_CTE_HINT);
}
if (!IsCitusTable(targetRelationId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot plan modifications of local tables involving "
"distributed tables",
NULL, NULL);
}
return NULL;
}
@ -3322,7 +3416,7 @@ MultiRouterPlannableQuery(Query *query)
NULL, NULL);
}
bool hasLocalTable = false;
bool hasPostgresOrCitusLocalTable = false;
bool hasDistributedTable = false;
ExtractRangeTableRelationWalker((Node *) query, &rangeTableRelationList);
@ -3336,7 +3430,13 @@ MultiRouterPlannableQuery(Query *query)
/* local tables are allowed if there are no distributed tables */
if (!IsCitusTable(distributedTableId))
{
hasLocalTable = true;
hasPostgresOrCitusLocalTable = true;
continue;
}
else if (IsCitusTableType(distributedTableId, CITUS_LOCAL_TABLE))
{
hasPostgresOrCitusLocalTable = true;
elog(DEBUG4, "Router planner finds a citus local table");
continue;
}
@ -3376,7 +3476,7 @@ MultiRouterPlannableQuery(Query *query)
}
/* local tables are not allowed if there are distributed tables */
if (hasLocalTable && hasDistributedTable)
if (hasPostgresOrCitusLocalTable && hasDistributedTable)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Local tables cannot be used in distributed queries.",

View File

@ -165,7 +165,7 @@ static bool ShouldRecursivelyPlanSetOperation(Query *query,
RecursivePlanningContext *context);
static void RecursivelyPlanSetOperations(Query *query, Node *node,
RecursivePlanningContext *context);
static bool IsLocalTableRTE(Node *node);
static bool IsLocalTableRteOrMatView(Node *node);
static void RecursivelyPlanSubquery(Query *subquery,
RecursivePlanningContext *planningContext);
static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId,
@ -376,7 +376,8 @@ ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery,
}
/* direct joins with local tables are not supported by any of Citus planners */
if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable, IsLocalTableRTE))
if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
IsLocalTableRteOrMatView))
{
return false;
}
@ -877,7 +878,8 @@ RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context)
static bool
ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context)
{
if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable, IsLocalTableRTE))
if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
IsLocalTableRteOrMatView))
{
/*
* Postgres can always plan queries that don't require distributed planning.
@ -1045,12 +1047,12 @@ RecursivelyPlanSetOperations(Query *query, Node *node,
/*
* IsLocalTableRTE gets a node and returns true if the node
* is a range table relation entry that points to a local
* relation (i.e., not a distributed relation).
* IsLocalTableRteOrMatView gets a node and returns true if the node is a range
* table entry that points to a postgres local or citus local table or to a
* materialized view.
*/
static bool
IsLocalTableRTE(Node *node)
IsLocalTableRteOrMatView(Node *node)
{
if (node == NULL)
{
@ -1074,13 +1076,18 @@ IsLocalTableRTE(Node *node)
}
Oid relationId = rangeTableEntry->relid;
if (IsCitusTable(relationId))
if (!IsCitusTable(relationId))
{
return false;
/* postgres local table or a materialized view */
return true;
}
else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
return true;
}
/* local table found */
return true;
/* no local table found */
return false;
}

View File

@ -734,8 +734,8 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
*/
if (relationName == NULL)
{
ereport(ERROR, (errmsg("cannot execute %s on reference table because "
"there was a parallel %s access to distributed table "
ereport(ERROR, (errmsg("cannot execute %s on table because there was "
"a parallel %s access to distributed table "
"\"%s\" in the same transaction",
accessTypeText, conflictingAccessTypeText,
conflictingRelationName),
@ -745,13 +745,12 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
}
else
{
ereport(ERROR, (errmsg(
"cannot execute %s on reference table \"%s\" because "
"there was a parallel %s access to distributed table "
"\"%s\" in the same transaction",
accessTypeText, relationName,
conflictingAccessTypeText,
conflictingRelationName),
ereport(ERROR, (errmsg("cannot execute %s on table \"%s\" because "
"there was a parallel %s access to distributed "
"table \"%s\" in the same transaction",
accessTypeText, relationName,
conflictingAccessTypeText,
conflictingRelationName),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
@ -769,13 +768,13 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot modify reference table \"%s\" because there "
"was a parallel operation on a distributed table",
ereport(ERROR, (errmsg("cannot modify table \"%s\" because there was "
"a parallel operation on a distributed table",
relationName),
errdetail("When there is a foreign key to a reference "
"table, Citus needs to perform all operations "
"over a single connection per node to ensure "
"consistency."),
"table or to a citus local table, Citus needs "
"to perform all operations over a single "
"connection per node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
@ -788,8 +787,8 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
*/
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"Reference table \"%s\" is modified, which might lead "
"to data inconsistencies or distributed deadlocks via "
"Table \"%s\" is modified, which might lead to data "
"inconsistencies or distributed deadlocks via "
"parallel accesses to hash distributed tables due to "
"foreign keys. Any parallel modification to "
"those hash distributed tables in the same "

View File

@ -135,6 +135,24 @@ typedef struct RelationRowLock
} RelationRowLock;
/*
* Parameters to be set according to range table entries of a query.
*/
typedef struct RTEListProperties
{
bool hasPostgresLocalTable;
bool hasReferenceTable;
bool hasCitusLocalTable;
/* includes hash, append and range partitioned tables */
bool hasDistributedTable;
/* union of above three */
bool hasCitusTable;
} RTEListProperties;
typedef struct DistributedPlanningContext
{
/* The parsed query that is given to the planner. It is a slightly modified
@ -195,6 +213,14 @@ extern PlannedStmt * distributed_planner(Query *parse,
#endif
/*
* Common hint message to workaround using postgres local and citus local tables
* in distributed queries
*/
#define LOCAL_TABLE_SUBQUERY_CTE_HINT \
"Use CTE's or subqueries to select from local tables and use them in joins"
extern List * ExtractRangeTableEntryList(Query *query);
extern bool NeedsDistributedPlanning(Query *query);
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
@ -218,6 +244,7 @@ extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);
extern struct DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,

View File

@ -147,6 +147,7 @@ extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId)
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId);
extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern int32 GetLocalGroupId(void);

View File

@ -192,8 +192,10 @@ extern bool TargetListOnPartitionColumn(Query *query, List *targetEntryList);
extern bool FindNodeMatchingCheckFunctionInRangeTableList(List *rtable, bool (*check)(
Node *));
extern bool IsCitusTableRTE(Node *node);
extern bool IsPostgresLocalTableRte(Node *node);
extern bool IsDistributedTableRTE(Node *node);
extern bool IsReferenceTableRTE(Node *node);
extern bool IsCitusLocalTableRTE(Node *node);
extern bool QueryContainsDistributedTableRTE(Query *query);
extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte);
extern bool ContainsReadIntermediateResultFunction(Node *node);

View File

@ -482,6 +482,9 @@ TRUNCATE citus_local_table_1, citus_local_table_2, distributed_table, local_tabl
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.citus_local_table_1_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE citus_local_tables_test_schema.reference_table_xxxxx CASCADE
-- test vacuum
VACUUM citus_local_table_1;
VACUUM citus_local_table_1, distributed_table, local_table, reference_table;
DROP TABLE citus_local_table_1, citus_local_table_2, distributed_table, local_table, reference_table;
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.reference_table_xxxxx CASCADE
NOTICE: executing the command locally: DROP TABLE IF EXISTS citus_local_tables_test_schema.citus_local_table_2_xxxxx CASCADE

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -418,9 +418,11 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
INSERT INTO dist_table VALUES(1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503017 (a) VALUES (1)
SELECT * FROM local JOIN dist_table ON (a = x);
ERROR: relation local is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1;;
ERROR: relation local is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- intermediate results are allowed
WITH cte_1 AS (SELECT * FROM dist_table LIMIT 1)
SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a);

View File

@ -350,7 +350,8 @@ FROM
distributed_table
WHERE
distributed_table.tenant_id = local_table.id;
ERROR: cannot plan modifications of local tables involving distributed tables
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
RESET client_min_messages;
DROP SCHEMA recursive_dml_queries CASCADE;
NOTICE: drop cascades to 5 other objects

View File

@ -351,7 +351,7 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
count
---------------------------------------------------------------------
@ -368,7 +368,7 @@ ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
count
---------------------------------------------------------------------
@ -386,7 +386,7 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 99;
count
---------------------------------------------------------------------
@ -415,7 +415,7 @@ ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 101 AND id = 99;
count
---------------------------------------------------------------------
@ -445,20 +445,20 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 15;
ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 15;
ROLLBACK;
-- case 2.4: UPDATE to a reference table is followed by multiple router UPDATEs
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 1;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 2;
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 3;
@ -467,7 +467,7 @@ ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 101 WHERE id = 1;
ERROR: insert or update on table "on_update_fkey_table_xxxxxxx" violates foreign key constraint "fkey_xxxxxxx"
DETAIL: Key (value_1)=(101) is not present in table "reference_table_2380001".
@ -483,7 +483,7 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE bigint;
DEBUG: rewriting table "on_update_fkey_table"
DEBUG: validating foreign key constraint "fkey"
@ -491,7 +491,7 @@ ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
ALTER TABLE on_update_fkey_table ALTER COLUMN value_1 SET DATA TYPE bigint;
DEBUG: rewriting table "on_update_fkey_table"
DEBUG: validating foreign key constraint "fkey"
@ -500,26 +500,26 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
ALTER TABLE on_update_fkey_table ADD COLUMN value_1_X INT;
ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
ALTER TABLE on_update_fkey_table ADD COLUMN value_1_X INT;
ROLLBACK;
-- case 2.7: UPDATE to a reference table is followed by COPY
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ERROR: insert or update on table "on_update_fkey_table_xxxxxxx" violates foreign key constraint "fkey_xxxxxxx"
DETAIL: Key (value_1)=(101) is not present in table "reference_table_2380001".
@ -528,20 +528,20 @@ ROLLBACK;
BEGIN;
UPDATE reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
TRUNCATE on_update_fkey_table;
ROLLBACK;
BEGIN;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- case 3.1: an unrelated DDL to a reference table is followed by a real-time SELECT
BEGIN;
ALTER TABLE reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table;
count
---------------------------------------------------------------------
@ -552,7 +552,7 @@ ROLLBACK;
BEGIN;
ALTER TABLE transitive_reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
SELECT count(*) FROM on_update_fkey_table;
count
---------------------------------------------------------------------
@ -583,20 +583,20 @@ ROLLBACK;
BEGIN;
ALTER TABLE reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
ROLLBACK;
BEGIN;
ALTER TABLE transitive_reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
ROLLBACK;
-- case 3.4: DDL to a reference table followed by multiple router UPDATEs
BEGIN;
ALTER TABLE reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 1;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 2;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 3;
@ -605,7 +605,7 @@ ROLLBACK;
BEGIN;
ALTER TABLE transitive_reference_table ALTER COLUMN id SET DEFAULT 1001;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 1;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 2;
UPDATE on_update_fkey_table SET value_1 = 98 WHERE id = 3;
@ -645,26 +645,26 @@ ROLLBACK;
BEGIN;
ALTER TABLE reference_table ADD COLUMN X int;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ROLLBACK;
BEGIN;
ALTER TABLE transitive_reference_table ADD COLUMN X int;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
COPY on_update_fkey_table FROM STDIN WITH CSV;
ROLLBACK;
-- case 3.8: DDL to a reference table is followed by TRUNCATE
BEGIN;
ALTER TABLE reference_table ADD COLUMN X int;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
TRUNCATE on_update_fkey_table;
ROLLBACK;
BEGIN;
ALTER TABLE transitive_reference_table ADD COLUMN X int;
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "transitive_reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- case 3.9: DDL to a reference table is followed by TRUNCATE
@ -721,8 +721,8 @@ BEGIN;
(1 row)
UPDATE reference_table SET id = 101 WHERE id = 99;
ERROR: cannot modify reference table "reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
ERROR: cannot modify table "reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table or to a citus local table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
@ -733,8 +733,8 @@ BEGIN;
(1 row)
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
ERROR: cannot modify reference table "transitive_reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
ERROR: cannot modify table "transitive_reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table or to a citus local table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 4.3: SELECT to a dist table is follwed by an unrelated DDL to a reference table
@ -746,7 +746,7 @@ BEGIN;
(1 row)
ALTER TABLE reference_table ADD COLUMN X INT;
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
@ -757,7 +757,7 @@ BEGIN;
(1 row)
ALTER TABLE transitive_reference_table ADD COLUMN X INT;
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 4.4: SELECT to a dist table is follwed by a DDL to a reference table
@ -771,7 +771,7 @@ BEGIN;
ALTER TABLE reference_table ALTER COLUMN id SET DATA TYPE smallint;
DEBUG: rewriting table "reference_table"
DEBUG: validating foreign key constraint "fkey"
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
@ -784,7 +784,7 @@ BEGIN;
ALTER TABLE transitive_reference_table ALTER COLUMN id SET DATA TYPE smallint;
DEBUG: rewriting table "transitive_reference_table"
DEBUG: validating foreign key constraint "fkey"
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 4.5: SELECT to a dist table is follwed by a TRUNCATE
@ -799,7 +799,7 @@ BEGIN;
TRUNCATE reference_table CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ROLLBACK;
BEGIN;
SELECT count(*) FROM on_update_fkey_table WHERE value_1 = 99;
@ -811,7 +811,7 @@ BEGIN;
TRUNCATE transitive_reference_table CASCADE;
NOTICE: truncate cascades to table "reference_table"
NOTICE: truncate cascades to table "on_update_fkey_table"
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ROLLBACK;
-- case 4.6: Router SELECT to a dist table is followed by a TRUNCATE
BEGIN;
@ -849,7 +849,7 @@ BEGIN;
DROP TABLE reference_table CASCADE;
NOTICE: drop cascades to constraint fkey on table on_update_fkey_table
ERROR: cannot execute DDL on reference table because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table because there was a parallel SELECT access to distributed table "on_update_fkey_table" in the same transaction
ROLLBACK;
-- case 4.8: Router SELECT to a dist table is followed by a TRUNCATE
-- No errors expected from below block as SELECT there is a router
@ -889,13 +889,13 @@ ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
UPDATE reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
UPDATE transitive_reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
@ -916,7 +916,7 @@ BEGIN;
(10 rows)
UPDATE reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
@ -937,46 +937,46 @@ BEGIN;
(10 rows)
UPDATE transitive_reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 5.3: Parallel UPDATE on distributed table follow by an unrelated DDL on reference table
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE reference_table ADD COLUMN X INT;
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE transitive_reference_table ADD COLUMN X INT;
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 5.4: Parallel UPDATE on distributed table follow by a related DDL on reference table
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE reference_table ALTER COLUMN id SET DATA TYPE smallint;
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 16 WHERE value_1 = 15;
ALTER TABLE transitive_reference_table ALTER COLUMN id SET DATA TYPE smallint;
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 6:1: Unrelated parallel DDL on distributed table followed by SELECT on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM reference_table;
ERROR: cannot execute SELECT on reference table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute SELECT on table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM transitive_reference_table;
ERROR: cannot execute SELECT on reference table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute SELECT on table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 6:2: Related parallel DDL on distributed table followed by SELECT on ref. table
@ -992,39 +992,39 @@ ROLLBACK;
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM reference_table;
ERROR: cannot execute SELECT on reference table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute SELECT on table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
SELECT count(*) FROM transitive_reference_table;
ERROR: cannot execute SELECT on reference table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute SELECT on table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 6:4: Related parallel DDL on distributed table followed by SELECT on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
UPDATE reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
UPDATE transitive_reference_table SET id = 160 WHERE id = 15;
ERROR: cannot execute DML on reference table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 6:5: Unrelated parallel DDL on distributed table followed by unrelated DDL on ref. table
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
ALTER TABLE reference_table ADD COLUMN X int;
ERROR: cannot execute DDL on reference table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
ALTER TABLE on_update_fkey_table ADD COLUMN X int;
ALTER TABLE transitive_reference_table ADD COLUMN X int;
ERROR: cannot execute DDL on reference table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DDL on table "transitive_reference_table" because there was a parallel DDL access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- case 6:6: Unrelated parallel DDL on distributed table followed by related DDL on ref. table
@ -1040,13 +1040,13 @@ ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
DELETE FROM reference_table WHERE id = 99;
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
DELETE FROM transitive_reference_table WHERE id = 99;
ERROR: cannot execute DML on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- an unrelated update followed by update on dist table and update
@ -1055,14 +1055,14 @@ BEGIN;
UPDATE unrelated_dist_table SET value_1 = 15;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
UPDATE reference_table SET id = 101 WHERE id = 99;
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
BEGIN;
UPDATE unrelated_dist_table SET value_1 = 15;
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
UPDATE transitive_reference_table SET id = 101 WHERE id = 99;
ERROR: cannot execute DML on reference table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
ERROR: cannot execute DML on table "transitive_reference_table" because there was a parallel DML access to distributed table "on_update_fkey_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- an unrelated update followed by update on the reference table and update
@ -1073,8 +1073,8 @@ ROLLBACK;
BEGIN;
UPDATE unrelated_dist_table SET value_1 = 15;
UPDATE reference_table SET id = 101 WHERE id = 99;
ERROR: cannot modify reference table "reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table, Citus needs to perform all operations over a single connection per node to ensure consistency.
ERROR: cannot modify table "reference_table" because there was a parallel operation on a distributed table
DETAIL: When there is a foreign key to a reference table or to a citus local table, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
UPDATE on_update_fkey_table SET value_1 = 5 WHERE id != 11;
ERROR: current transaction is aborted, commands ignored until end of transaction block
@ -1403,7 +1403,7 @@ INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
@ -1414,7 +1414,7 @@ WITH t1 AS (DELETE FROM reference_table RETURNING id)
DEBUG: generating subplan XXX_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.distributed_table USING (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) RETURNING distributed_table.id, distributed_table.value_1, t1.id
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
id | value_1 | id
---------------------------------------------------------------------
(0 rows)
@ -1424,7 +1424,7 @@ INSERT INTO reference_table SELECT i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
INSERT INTO distributed_table SELECT i, i % 10 FROM generate_series(0, 100) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
@ -1435,7 +1435,7 @@ WITH t1 AS (DELETE FROM reference_table RETURNING id)
DEBUG: generating subplan XXX_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1 WHERE (distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id)
DEBUG: switching to sequential query execution mode
DETAIL: Reference table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
DETAIL: Table "reference_table" is modified, which might lead to data inconsistencies or distributed deadlocks via parallel accesses to hash distributed tables due to foreign keys. Any parallel modification to those hash distributed tables in the same transaction can only be executed in sequential query execution mode
count
---------------------------------------------------------------------
0
@ -1449,7 +1449,7 @@ WITH t1 AS (DELETE FROM distributed_table RETURNING id),
DEBUG: generating subplan XXX_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: generating subplan XXX_2 for CTE t2: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM test_fkey_to_ref_in_tx.distributed_table, (SELECT intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t1, (SELECT intermediate_result.id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) t2 WHERE ((distributed_table.value_1 OPERATOR(pg_catalog.=) t1.id) AND (distributed_table.value_1 OPERATOR(pg_catalog.=) t2.id))
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "distributed_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- similarly this should fail since we first access to a distributed
-- table via t1, and then access to the reference table in the main query
@ -1457,7 +1457,7 @@ WITH t1 AS (DELETE FROM distributed_table RETURNING id)
DELETE FROM reference_table RETURNING id;
DEBUG: generating subplan XXX_1 for CTE t1: DELETE FROM test_fkey_to_ref_in_tx.distributed_table RETURNING id
DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM test_fkey_to_ref_in_tx.reference_table RETURNING id
ERROR: cannot execute DML on reference table "reference_table" because there was a parallel DML access to distributed table "distributed_table" in the same transaction
ERROR: cannot execute DML on table "reference_table" because there was a parallel DML access to distributed table "distributed_table" in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
-- finally, make sure that we can execute the same queries
-- in the sequential mode

View File

@ -241,7 +241,7 @@ NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
NOTICE: truncate cascades to table "dist_table_xxxxx"
ERROR: cannot execute DDL on reference table "ref_table" because there was a parallel SELECT access to distributed table "dist_table" in the same transaction
ERROR: cannot execute DDL on table "ref_table" because there was a parallel SELECT access to distributed table "dist_table" in the same transaction
COMMIT;
-- as we do not support local ANALYZE execution yet, below block would error out
BEGIN;

View File

@ -6,13 +6,43 @@ SELECT create_distributed_table('the_table', 'a');
(1 row)
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE reference_table (a int, b int, z bigserial);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table (a int, b int, z bigserial);
SELECT create_citus_local_table('citus_local_table');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE local (a int, b int);
\c - - - :follower_master_port
-- inserts normally do not work on a standby coordinator
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
-- we can allow DML on a writable standby coordinator
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
ERROR: writing to worker nodes is not currently allowed
DETAIL: the database is read-only
-- We can allow DML on a writable standby coordinator.
-- Note that it doesn't help to enable writes for citus local tables
-- and coordinator replicated reference tables. This is because, the
-- data is in the coordinator and will hit read-only tranaction checks
-- on Postgres
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM the_table;
@ -21,28 +51,82 @@ SELECT * FROM the_table;
1 | 2 | 2
(1 row)
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
ERROR: cannot execute INSERT in a read-only transaction
SELECT * FROM reference_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
ERROR: cannot execute INSERT in a read-only transaction
SELECT * FROM citus_local_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
UPDATE the_table SET z = 3 WHERE a = 1;
UPDATE reference_table SET z = 3 WHERE a = 1;
ERROR: cannot execute UPDATE in a read-only transaction
UPDATE citus_local_table SET z = 3 WHERE a = 1;
ERROR: cannot execute UPDATE in a read-only transaction
SELECT * FROM the_table;
a | b | z
---------------------------------------------------------------------
1 | 2 | 3
(1 row)
SELECT * FROM reference_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM citus_local_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
DELETE FROM the_table WHERE a = 1;
DELETE FROM reference_table WHERE a = 1;
ERROR: cannot execute DELETE in a read-only transaction
DELETE FROM citus_local_table WHERE a = 1;
ERROR: cannot execute DELETE in a read-only transaction
SELECT * FROM the_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM reference_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM citus_local_table;
a | b | z
---------------------------------------------------------------------
(0 rows)
-- drawing from a sequence is not possible
INSERT INTO the_table (a, b) VALUES (1, 2);
ERROR: cannot assign TransactionIds during recovery
INSERT INTO reference_table (a, b) VALUES (1, 2);
ERROR: cannot assign TransactionIds during recovery
INSERT INTO citus_local_table (a, b) VALUES (1, 2);
ERROR: cannot assign TransactionIds during recovery
-- 2PC is not possible
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot assign TransactionIds during recovery
INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot execute INSERT in a read-only transaction
INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot execute INSERT in a read-only transaction
-- COPY is not possible in 2PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
ERROR: cannot assign TransactionIds during recovery
COPY reference_table (a, b, z) FROM STDIN WITH CSV;
ERROR: cannot assign TransactionIds during recovery
COPY citus_local_table (a, b, z) FROM STDIN WITH CSV;
ERROR: cannot assign TransactionIds during recovery
-- 1PC is possible
SET citus.multi_shard_commit_protocol TO '1pc';
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
@ -53,6 +137,20 @@ SELECT * FROM the_table ORDER BY a;
5 | 6 | 7
(2 rows)
INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot execute INSERT in a read-only transaction
SELECT * FROM reference_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
(0 rows)
INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
ERROR: cannot execute INSERT in a read-only transaction
SELECT * FROM citus_local_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
(0 rows)
-- modifying CTEs are possible
WITH del AS (DELETE FROM the_table RETURNING *)
SELECT * FROM del ORDER BY a;
@ -62,8 +160,17 @@ SELECT * FROM del ORDER BY a;
5 | 6 | 7
(2 rows)
WITH del AS (DELETE FROM reference_table RETURNING *)
SELECT * FROM del ORDER BY a;
ERROR: cannot execute DELETE in a read-only transaction
WITH del AS (DELETE FROM citus_local_table RETURNING *)
SELECT * FROM del ORDER BY a;
ERROR: cannot execute DELETE in a read-only transaction
-- COPY is possible in 1PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
COPY reference_table (a, b, z) FROM STDIN WITH CSV;
ERROR: cannot assign TransactionIds during recovery
COPY citus_local_table (a, b, z) FROM STDIN WITH CSV;
SELECT * FROM the_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
@ -71,22 +178,68 @@ SELECT * FROM the_table ORDER BY a;
11 | 11 | 11
(2 rows)
SELECT * FROM reference_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM citus_local_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
10 | 10 | 10
11 | 11 | 11
(2 rows)
DELETE FROM the_table;
DELETE FROM reference_table;
ERROR: cannot execute DELETE in a read-only transaction
DELETE FROM citus_local_table;
ERROR: cannot execute DELETE in a read-only transaction
-- DDL is not possible
TRUNCATE the_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
TRUNCATE reference_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
TRUNCATE citus_local_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction
ALTER TABLE the_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ALTER TABLE reference_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ALTER TABLE citus_local_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
-- rollback is possible
BEGIN;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
BEGIN;
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
ERROR: cannot execute INSERT in a read-only transaction
ROLLBACK;
BEGIN;
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
ERROR: cannot execute INSERT in a read-only transaction
ROLLBACK;
SELECT * FROM the_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM reference_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
(0 rows)
SELECT * FROM citus_local_table ORDER BY a;
a | b | z
---------------------------------------------------------------------
10 | 10 | 10
11 | 11 | 11
(2 rows)
-- we should still disallow writes to local tables
INSERT INTO local VALUES (1, 1);
ERROR: cannot execute INSERT in a read-only transaction
@ -106,5 +259,23 @@ SELECT * FROM the_table ORDER BY a;
---------------------------------------------------------------------
(0 rows)
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 3);
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
SELECT * FROM reference_table ORDER BY a;
ERROR: there is a shard placement in node group 0 but there are no nodes in that group
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 3);
ERROR: writing to worker nodes is not currently allowed
DETAIL: citus.use_secondary_nodes is set to 'always'
SELECT * FROM citus_local_table ORDER BY a;
ERROR: there is a shard placement in node group 0 but there are no nodes in that group
\c - - - :master_port
DROP TABLE the_table;
DROP TABLE reference_table;
DROP TABLE citus_local_table;
SELECT master_remove_node('localhost', :master_port);
master_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -270,7 +270,8 @@ CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- commands containing a CTE are supported
WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders WHERE id < 0;
@ -433,7 +434,8 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- should succeed with a CTE
WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders SET symbol = 'GM';

View File

@ -158,7 +158,7 @@ CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
-- commands containing a CTE are supported
WITH new_orders AS (INSERT INTO limit_orders_mx VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders_mx WHERE id < 0;
@ -225,7 +225,7 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders
WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
-- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders_mx VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders_mx SET symbol = 'GM';

View File

@ -72,7 +72,8 @@ CREATE TABLE temp_nations(name text, key integer);
SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test USING temp_nations WHERE multi_shard_modify_test.t_value = temp_nations.key AND temp_nations.name = ''foobar'' ');
WARNING: master_modify_multiple_shards is deprecated and will be removed in a future release.
HINT: Run the command directly
ERROR: relation temp_nations is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- commands with a USING clause are unsupported
SELECT create_distributed_table('temp_nations', 'name', 'hash');
create_distributed_table

View File

@ -729,12 +729,14 @@ UPDATE users_test_table
SET value_2 = 5
FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id;
ERROR: relation events_test_table_local is not distributed
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
UPDATE events_test_table_local
SET value_2 = 5
FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id;
ERROR: cannot plan modifications of local tables involving distributed tables
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- Local tables in a subquery are supported through recursive planning
UPDATE users_test_table
SET value_2 = 5

View File

@ -284,10 +284,12 @@ ERROR: could not run distributed query with subquery outside the FROM, WHERE an
HINT: Consider using an equality filter on the distributed table's partition column.
-- joins are not supported between local and distributed tables
SELECT title, authors.name FROM authors, articles WHERE authors.id = articles.author_id;
ERROR: relation authors is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- inner joins are not supported (I think)
SELECT * FROM (articles INNER JOIN authors ON articles.id = authors.id);
ERROR: relation authors is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- test use of EXECUTE statements within plpgsql
DO $sharded_execute$
BEGIN

View File

@ -17,7 +17,8 @@ FROM
users_table_local, (SELECT user_id FROM events_table) as evs
WHERE users_table_local.user_id = evs.user_id
) as foo;
ERROR: relation users_table_local is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
RESET client_min_messages;
-- we don't support subqueries with local tables when they are not leaf queries
SELECT user_id FROM users_table WHERE user_id IN

View File

@ -307,7 +307,8 @@ WITH cte AS (
SELECT user_id FROM users_table
)
SELECT min(user_id) FROM cte JOIN local_table ON (user_id = id) JOIN events_table USING (user_id);
ERROR: relation local_table is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- unless the distributed table is part of a recursively planned subquery
WITH cte AS (
SELECT user_id FROM users_table
@ -325,7 +326,8 @@ WITH cte AS (
)
SELECT count(*) FROM local_table JOIN ref_table USING (id)
WHERE id IN (SELECT * FROM cte);
ERROR: relation local_table is not distributed
ERROR: direct joins between distributed and local tables are not supported
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- CTEs should be able to terminate a router query
WITH cte AS (
WITH cte_1 AS (

View File

@ -671,14 +671,16 @@ SELECT * FROM raw_data ORDER BY val;
-- Test that local tables are barred
UPDATE local_table lt SET val = mt.val
FROM modify_table mt WHERE mt.id = lt.id;
ERROR: cannot plan modifications of local tables involving distributed tables
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- Including inside CTEs
WITH cte AS (
UPDATE local_table lt SET val = mt.val
FROM modify_table mt WHERE mt.id = lt.id
RETURNING lt.id, lt.val
) SELECT * FROM cte JOIN modify_table mt ON mt.id = cte.id ORDER BY 1,2;
ERROR: cannot plan modifications of local tables involving distributed tables
ERROR: cannot plan modifications with local tables involving citus tables
HINT: Use CTE's or subqueries to select from local tables and use them in joins
-- Make sure checks for volatile functions apply to CTEs too
WITH cte AS (UPDATE modify_table SET val = random() WHERE id = 3 RETURNING *)
SELECT * FROM cte JOIN modify_table mt ON mt.id = 3 AND mt.id = cte.id ORDER BY 1,2;

View File

@ -43,6 +43,7 @@ test: multi_mx_modifications local_shard_execution
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy
test: citus_local_tables_mx
test: citus_local_tables_queries_mx
test: multi_mx_transaction_recovery
test: multi_mx_modifying_xacts
test: multi_mx_explain

View File

@ -300,7 +300,7 @@ test: multi_remove_node_reference_table
test: add_coordinator
test: multi_upgrade_reference_table
test: multi_replicate_reference_table
test: multi_reference_table
test: multi_reference_table citus_local_tables_queries
test: foreign_key_to_reference_table citus_local_table_triggers
test: replicate_reference_tables_to_coordinator
test: coordinator_shouldhaveshards

View File

@ -357,6 +357,11 @@ ORDER BY 1;
-- execute truncate & drop commands for multiple relations to see that we don't break local execution
TRUNCATE citus_local_table_1, citus_local_table_2, distributed_table, local_table, reference_table;
-- test vacuum
VACUUM citus_local_table_1;
VACUUM citus_local_table_1, distributed_table, local_table, reference_table;
DROP TABLE citus_local_table_1, citus_local_table_2, distributed_table, local_table, reference_table;
-- cleanup at exit

View File

@ -0,0 +1,635 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1509000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
SET citus.log_local_commands TO ON;
CREATE SCHEMA citus_local_table_queries;
SET search_path TO citus_local_table_queries;
-- ensure that coordinator is added to pg_dist_node
SET client_min_messages to ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
RESET client_min_messages;
CREATE TABLE citus_local_table(a int, b int);
SELECT create_citus_local_table('citus_local_table');
CREATE TABLE citus_local_table_2(a int, b int);
SELECT create_citus_local_table('citus_local_table_2');
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table(a int, b int);
SELECT create_distributed_table('distributed_table', 'a');
CREATE TABLE postgres_local_table(a int, b int);
-- Define a helper function to truncate & insert some data into our test tables
-- We should call this function at some places in this test file to prevent
-- test to take a long time.
-- We shouldn't use LIMIT in INSERT SELECT queries to make the test faster as
-- LIMIT would force planner to wrap SELECT query in an intermediate result and
-- this might reduce the coverage of the test cases.
CREATE FUNCTION clear_and_init_test_tables() RETURNS void AS $$
BEGIN
SET client_min_messages to ERROR;
TRUNCATE postgres_local_table, citus_local_table, reference_table, distributed_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO citus_local_table_2 SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
RESET client_min_messages;
END;
$$ LANGUAGE plpgsql;
----------------
---- SELECT ----
----------------
SELECT clear_and_init_test_tables();
-- join between citus local tables and reference tables would succeed
SELECT count(*) FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a;
SELECT * FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE;
-- should work
WITH cte_1 AS
(SELECT * FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE)
SELECT count(*) FROM cte_1;
-- should work as joins are between ctes
WITH cte_citus_local_table AS
(SELECT * FROM citus_local_table),
cte_postgres_local_table AS
(SELECT * FROM postgres_local_table),
cte_distributed_table AS
(SELECT * FROM distributed_table)
SELECT count(*) FROM cte_distributed_table, cte_citus_local_table, cte_postgres_local_table
WHERE cte_citus_local_table.a = 1 AND cte_distributed_table.a = 1;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM distributed_table d1, distributed_table d2, citus_local_table;
-- local table inside subquery should just work
SELECT count(*) FROM
(
SELECT * FROM (SELECT * FROM citus_local_table) as subquery_inner
) as subquery_top;
SELECT clear_and_init_test_tables();
-- join between citus/postgres local tables should just work
SELECT count(*) FROM
(
SELECT * FROM (SELECT count(*) FROM citus_local_table, postgres_local_table) as subquery_inner
) as subquery_top;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM
(
SELECT *, random() FROM (SELECT *, random() FROM citus_local_table, distributed_table) as subquery_inner
) as subquery_top;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM
(
SELECT *, random()
FROM (
WITH cte_1 AS (SELECT *, random() FROM citus_local_table, distributed_table) SELECT * FROM cte_1) as subquery_inner
) as subquery_top;
-- should be fine
SELECT count(*) FROM
(
SELECT *, random()
FROM (
WITH cte_1 AS (SELECT *, random() FROM citus_local_table), cte_2 AS (SELECT * FROM distributed_table) SELECT count(*) FROM cte_1, cte_2
) as subquery_inner
) as subquery_top;
SELECT clear_and_init_test_tables();
-- prepared statement
PREPARE citus_local_only AS SELECT count(*) FROM citus_local_table;
-- execute 6 times, local tables without params
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
-- execute 6 times, with param
PREPARE citus_local_only_p(int) AS SELECT count(*) FROM citus_local_table WHERE a = $1;
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
-- do not evalute the function
-- show the logs
EXECUTE citus_local_only_p(random());
EXECUTE citus_local_only_p(random());
PREPARE mixed_query(int, int, int) AS
WITH cte_citus_local_table AS
(SELECT * FROM citus_local_table WHERE a = $1),
cte_postgres_local_table AS
(SELECT * FROM postgres_local_table WHERE a = $2),
cte_distributed_table AS
(SELECT * FROM distributed_table WHERE a = $3),
cte_mixes AS (SELECT * FROM cte_distributed_table, cte_citus_local_table, cte_postgres_local_table)
SELECT count(*) FROM cte_mixes;
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
SELECT clear_and_init_test_tables();
-- anonymous columns
WITH a AS (SELECT a, '' FROM citus_local_table GROUP BY a) SELECT a.a FROM a ORDER BY 1 LIMIT 5;
WITH a AS (SELECT b, '' FROM citus_local_table WHERE a = 1) SELECT * FROM a, a b ORDER BY 1 LIMIT 5;
-- weird expression on citus/pg table joins should be fine
SELECT * FROM citus_local_table, postgres_local_table
WHERE citus_local_table.a - postgres_local_table.a = 0
ORDER BY 1,2,3,4
LIMIT 10;
-- set operations should just work
SELECT * FROM citus_local_table UNION SELECT * FROM postgres_local_table UNION SELECT * FROM distributed_table ORDER BY 1,2;
(SELECT * FROM citus_local_table ORDER BY 1,2 LIMIT 5) INTERSECT (SELECT i, i FROM generate_series(0, 100) i) ORDER BY 1, 2;
-- should just work as recursive planner kicks in
SELECT count(*) FROM distributed_table WHERE a IN (SELECT a FROM citus_local_table);
SELECT count(*) FROM citus_local_table WHERE a IN (SELECT a FROM distributed_table);
SELECT count(*) FROM reference_table WHERE a IN (SELECT a FROM citus_local_table);
SELECT count(*) FROM citus_local_table WHERE a IN (SELECT a FROM reference_table);
-- nested recursive queries should just work
SELECT count(*) FROM citus_local_table
WHERE a IN
(SELECT a FROM distributed_table WHERE a IN
(SELECT b FROM citus_local_table WHERE b IN (SELECT b FROM postgres_local_table)));
-- local outer joins
SELECT count(*) FROM citus_local_table LEFT JOIN reference_table ON (true);
SELECT count(*) FROM reference_table
LEFT JOIN citus_local_table ON (true)
LEFT JOIN postgres_local_table ON (true)
LEFT JOIN reference_table r2 ON (true);
-- not supported direct outer join
SELECT count(*) FROM citus_local_table LEFT JOIN distributed_table ON (true);
-- distinct in subquery on CTE
WITH one_row AS (
SELECT a from citus_local_table WHERE b = 1
)
SELECT
*
FROM
distributed_table
WHERE
b IN (SELECT DISTINCT a FROM one_row)
ORDER BY
1, 2
LIMIT
1;
WITH one_row_2 AS (
SELECT a from distributed_table WHERE b = 1
)
SELECT
*
FROM
citus_local_table
WHERE
b IN (SELECT DISTINCT a FROM one_row_2)
ORDER BY
1 ,2
LIMIT
1;
-- join between citus local tables and distributed tables would fail
SELECT count(*) FROM citus_local_table, distributed_table;
SELECT * FROM citus_local_table, distributed_table ORDER BY 1,2,3,4 FOR UPDATE;
-- join between citus local tables and postgres local tables are okey
SELECT count(citus_local_table.b), count(postgres_local_table.a)
FROM citus_local_table, postgres_local_table
WHERE citus_local_table.a = postgres_local_table.b;
-- select for update is just OK
SELECT * FROM citus_local_table ORDER BY 1,2 FOR UPDATE;
---------------------------
----- INSERT SELECT -----
---------------------------
-- simple INSERT SELECT is OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT * from reference_table;
INSERT INTO reference_table
SELECT * from citus_local_table;
INSERT INTO citus_local_table
SELECT * from distributed_table;
INSERT INTO distributed_table
SELECT * from citus_local_table;
INSERT INTO citus_local_table
SELECT * from citus_local_table_2;
INSERT INTO citus_local_table
SELECT * from citus_local_table_2
ORDER BY 1,2
LIMIT 10;
INSERT INTO citus_local_table
SELECT * from postgres_local_table;
INSERT INTO postgres_local_table
SELECT * from citus_local_table;
-- INSERT SELECT with local joins are OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO reference_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO reference_table
SELECT reference_table.* FROM reference_table, postgres_local_table
JOIN citus_local_table ON (true);
SELECT clear_and_init_test_tables();
INSERT INTO distributed_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO distributed_table
SELECT reference_table.* FROM reference_table, postgres_local_table
JOIN citus_local_table ON (true);
INSERT INTO postgres_local_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
-- INSERT SELECT that joins reference and distributed tables is also OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table
JOIN distributed_table ON (true);
INSERT INTO citus_local_table
SELECT reference_table.*
FROM reference_table, distributed_table;
-- INSERT SELECT that joins citus local and distributed table directly will fail ..
INSERT INTO citus_local_table
SELECT distributed_table.* FROM distributed_table
JOIN citus_local_table ON (true);
-- .. but when wrapped into a CTE, join works fine
INSERT INTO citus_local_table
SELECT distributed_table.* FROM distributed_table
JOIN (WITH cte AS (SELECT * FROM citus_local_table) SELECT * FROM cte) as foo ON (true);
-- multi row insert is OK
INSERT INTO citus_local_table VALUES (1, 2), (3, 4);
---------------------------
----- DELETE / UPDATE -----
---------------------------
-- modifications using citus local tables and postgres local tables
-- are not supported, see below four tests
SELECT clear_and_init_test_tables();
DELETE FROM citus_local_table
USING postgres_local_table
WHERE citus_local_table.b = postgres_local_table.b;
UPDATE citus_local_table
SET b = 5
FROM postgres_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
DELETE FROM postgres_local_table
USING citus_local_table
WHERE citus_local_table.b = postgres_local_table.b;
UPDATE postgres_local_table
SET b = 5
FROM citus_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
-- no direct joins supported
UPDATE distributed_table
SET b = 6
FROM citus_local_table
WHERE citus_local_table.a = distributed_table.a;
UPDATE reference_table
SET b = 6
FROM citus_local_table
WHERE citus_local_table.a = reference_table.a;
-- should not work, add HINT use CTEs
UPDATE citus_local_table
SET b = 6
FROM distributed_table
WHERE citus_local_table.a = distributed_table.a;
-- should work, add HINT use CTEs
UPDATE citus_local_table
SET b = 6
FROM reference_table
WHERE citus_local_table.a = reference_table.a;
-- should not work, add HINT use CTEs
DELETE FROM distributed_table
USING citus_local_table
WHERE citus_local_table.a = distributed_table.a;
-- should not work, add HINT use CTEs
DELETE FROM citus_local_table
USING distributed_table
WHERE citus_local_table.a = distributed_table.a;
DELETE FROM reference_table
USING citus_local_table
WHERE citus_local_table.a = reference_table.a;
-- should work, add HINT use CTEs
DELETE FROM citus_local_table
USING reference_table
WHERE citus_local_table.a = reference_table.a;
-- just works
DELETE FROM citus_local_table
WHERE citus_local_table.a IN (SELECT a FROM distributed_table);
-- just works
DELETE FROM citus_local_table
WHERE citus_local_table.a IN (SELECT a FROM reference_table);
-- just works
WITH distributed_table_cte AS (SELECT * FROM distributed_table)
UPDATE citus_local_table
SET b = 6
FROM distributed_table_cte
WHERE citus_local_table.a = distributed_table_cte.a;
-- just works
WITH reference_table_cte AS (SELECT * FROM reference_table)
UPDATE citus_local_table
SET b = 6
FROM reference_table_cte
WHERE citus_local_table.a = reference_table_cte.a;
------------------------
----- VIEW QUERIES -----
------------------------
CREATE MATERIALIZED VIEW mat_view_4 AS
SELECT count(*)
FROM citus_local_table
JOIN reference_table
USING (a);
-- ok
SELECT count(*) FROM mat_view_4;
-- should work
SELECT count(*) FROM distributed_table WHERE b in
(SELECT count FROM mat_view_4);
CREATE VIEW view_2 AS
SELECT count(*)
FROM citus_local_table
JOIN citus_local_table_2 USING (a)
JOIN distributed_table USING (a);
-- should fail as view contains direct local dist join
SELECT count(*) FROM view_2;
CREATE VIEW view_3
AS SELECT count(*)
FROM citus_local_table_2
JOIN reference_table
USING (a);
-- ok
SELECT count(*) FROM view_3;
-- view treated as subquery, so should work
SELECT count(*) FROM view_3, distributed_table;
----------------------------------------------
-- Some other tests with subqueries & CTE's --
----------------------------------------------
SELECT clear_and_init_test_tables();
SELECT count(*) AS a, count(*) AS b
FROM reference_table
JOIN (SELECT count(*) as a, count(*) as b
FROM citus_local_table_2
JOIN (SELECT count(*) as a, count(*) as b
FROM postgres_local_table
JOIN (SELECT count(*) as a, count(*) as b
FROM reference_table as table_4677) subquery5108
USING (a)) subquery7132
USING (b)) subquery7294
USING (a);
-- direct join inside CTE not supported
WITH cte AS (
UPDATE citus_local_table lt SET a = mt.a
FROM distributed_table mt WHERE mt.b = lt.b
RETURNING lt.b, lt.a
) SELECT * FROM cte JOIN distributed_table mt ON mt.b = cte.b ORDER BY 1,2,3,4;
-- join with CTE just works
UPDATE citus_local_table
SET a=5
FROM (SELECT avg(distributed_table.b) as avg_b
FROM distributed_table) as foo
WHERE
foo.avg_b = citus_local_table.b;
-- should work
UPDATE distributed_table
SET b = avg_a
FROM (SELECT avg(citus_local_table.a) as avg_a FROM citus_local_table) as foo
WHERE foo.avg_a = distributed_table.a
RETURNING distributed_table.*;
-- it is unfortunate that recursive planner cannot detect this
-- but expected to not work
UPDATE citus_local_table
SET a=5
FROM (SELECT b FROM distributed_table) AS foo
WHERE foo.b = citus_local_table.b;
------------------------------------
-- test different execution paths --
------------------------------------
-- a bit different explain output than for postgres local tables
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT * FROM distributed_table
ORDER BY distributed_table.*
LIMIT 10;
-- show that we do not pull to coordinator
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT * FROM citus_local_table;
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table;
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table, postgres_local_table;
-- show that we pull to coordinator when a distributed table is involved
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table, distributed_table;
-- truncate tables & add unique constraints to be able to define foreign keys
TRUNCATE reference_table, citus_local_table, distributed_table;
ALTER TABLE reference_table ADD CONSTRAINT pkey_ref PRIMARY KEY (a);
ALTER TABLE citus_local_table ADD CONSTRAINT pkey_c PRIMARY KEY (a);
-- define a foreign key chain distributed table -> reference table -> citus local table
-- to test sequential execution
ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY(a) REFERENCES reference_table(a) ON DELETE RESTRICT;
ALTER TABLE reference_table ADD CONSTRAINT fkey_ref_to_local FOREIGN KEY(a) REFERENCES citus_local_table(a) ON DELETE RESTRICT;
INSERT INTO citus_local_table VALUES (1);
INSERT INTO reference_table VALUES (1);
BEGIN;
INSERT INTO citus_local_table VALUES (1) ON CONFLICT (a) DO NOTHING;
INSERT INTO distributed_table VALUES (1);
-- should show sequential as first inserting into citus local table
-- would force the xact block to use sequential execution
show citus.multi_shard_modify_mode;
ROLLBACK;
BEGIN;
TRUNCATE distributed_table;
-- should error out as we truncated distributed_table via parallel execution
TRUNCATE citus_local_table CASCADE;
ROLLBACK;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
TRUNCATE distributed_table;
-- should work fine as we already switched to sequential execution
-- before parallel truncate
TRUNCATE citus_local_table CASCADE;
ROLLBACK;
ALTER TABLE distributed_table DROP CONSTRAINT fkey_dist_to_ref;
BEGIN;
INSERT INTO citus_local_table VALUES (1) ON CONFLICT (a) DO NOTHING;
show citus.multi_shard_modify_mode;
ROLLBACK;
-- remove uniqueness constraint and dependent foreign key constraint for next tests
ALTER TABLE reference_table DROP CONSTRAINT fkey_ref_to_local;
ALTER TABLE citus_local_table DROP CONSTRAINT pkey_c;
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
-- should use local execution
BEGIN;
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
COMMIT;
COPY citus_local_table TO STDOUT;
COPY (SELECT * FROM citus_local_table) TO STDOUT;
BEGIN;
COPY citus_local_table TO STDOUT;
COMMIT;
BEGIN;
COPY (SELECT * FROM citus_local_table) TO STDOUT;
COMMIT;
-- truncate test tables for next test
TRUNCATE citus_local_table, reference_table, distributed_table;
BEGIN;
INSERT INTO citus_local_table VALUES (1), (2);
SAVEPOINT sp1;
INSERT INTO citus_local_table VALUES (3), (4);
ROLLBACK TO SAVEPOINT sp1;
SELECT * FROM citus_local_table ORDER BY 1,2;
SAVEPOINT sp2;
INSERT INTO citus_local_table VALUES (5), (6);
INSERT INTO distributed_table VALUES (5), (6);
ROLLBACK TO SAVEPOINT sp2;
SELECT * FROM citus_local_table ORDER BY 1,2;
SELECT * FROM distributed_table ORDER BY 1,2;
SAVEPOINT sp3;
INSERT INTO citus_local_table VALUES (7), (8);
INSERT INTO reference_table VALUES (7), (8);
ROLLBACK TO SAVEPOINT sp3;
SELECT * FROM citus_local_table ORDER BY 1,2;
SELECT * FROM reference_table ORDER BY 1,2;
COMMIT;
-- cleanup at exit
DROP SCHEMA citus_local_table_queries CASCADE;

View File

@ -0,0 +1,659 @@
\set VERBOSITY terse
SET citus.next_shard_id TO 1510000;
SET citus.shard_replication_factor TO 1;
SET citus.enable_local_execution TO ON;
SET citus.log_local_commands TO ON;
CREATE SCHEMA citus_local_table_queries_mx;
SET search_path TO citus_local_table_queries_mx;
-- ensure that coordinator is added to pg_dist_node
SET client_min_messages to ERROR;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
RESET client_min_messages;
-- start metadata sync to worker 1
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.replication_model TO streaming;
CREATE TABLE citus_local_table(a int, b int);
SELECT create_citus_local_table('citus_local_table');
CREATE TABLE citus_local_table_2(a int, b int);
SELECT create_citus_local_table('citus_local_table_2');
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table(a int, b int);
SELECT create_distributed_table('distributed_table', 'a');
\c - - - :worker_1_port
SET search_path TO citus_local_table_queries_mx;
CREATE TABLE postgres_local_table(a int, b int);
-- Define a helper function to truncate & insert some data into our test tables
-- We should call this function at some places in this test file to prevent
-- test to take a long time.
-- We shouldn't use LIMIT in INSERT SELECT queries to make the test faster as
-- LIMIT would force planner to wrap SELECT query in an intermediate result and
-- this might reduce the coverage of the test cases.
CREATE FUNCTION clear_and_init_test_tables() RETURNS void AS $$
BEGIN
SET client_min_messages to ERROR;
TRUNCATE postgres_local_table, citus_local_table, reference_table, distributed_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO citus_local_table_2 SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
RESET client_min_messages;
END;
$$ LANGUAGE plpgsql;
----------------
---- SELECT ----
----------------
SELECT clear_and_init_test_tables();
-- join between citus local tables and reference tables would succeed
SELECT count(*) FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a;
SELECT * FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE;
-- should work
WITH cte_1 AS
(SELECT * FROM citus_local_table, reference_table WHERE citus_local_table.a = reference_table.a ORDER BY 1,2,3,4 FOR UPDATE)
SELECT count(*) FROM cte_1;
-- should work as joins are between ctes
WITH cte_citus_local_table AS
(SELECT * FROM citus_local_table),
cte_postgres_local_table AS
(SELECT * FROM postgres_local_table),
cte_distributed_table AS
(SELECT * FROM distributed_table)
SELECT count(*) FROM cte_distributed_table, cte_citus_local_table, cte_postgres_local_table
WHERE cte_citus_local_table.a = 1 AND cte_distributed_table.a = 1;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM distributed_table d1, distributed_table d2, citus_local_table;
-- local table inside subquery should just work
SELECT count(*) FROM
(
SELECT * FROM (SELECT * FROM citus_local_table) as subquery_inner
) as subquery_top;
SELECT clear_and_init_test_tables();
-- join between citus/postgres local tables wouldn't work as citus local table is on the coordinator
SELECT count(*) FROM
(
SELECT * FROM (SELECT count(*) FROM citus_local_table, postgres_local_table) as subquery_inner
) as subquery_top;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM
(
SELECT *, random() FROM (SELECT *, random() FROM citus_local_table, distributed_table) as subquery_inner
) as subquery_top;
-- should fail as we don't support direct joins between distributed/local tables
SELECT count(*) FROM
(
SELECT *, random()
FROM (
WITH cte_1 AS (SELECT *, random() FROM citus_local_table, distributed_table) SELECT * FROM cte_1) as subquery_inner
) as subquery_top;
-- should be fine
SELECT count(*) FROM
(
SELECT *, random()
FROM (
WITH cte_1 AS (SELECT *, random() FROM citus_local_table), cte_2 AS (SELECT * FROM distributed_table) SELECT count(*) FROM cte_1, cte_2
) as subquery_inner
) as subquery_top;
SELECT clear_and_init_test_tables();
-- prepared statement
PREPARE citus_local_only AS SELECT count(*) FROM citus_local_table;
-- execute 6 times, local tables without params
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
EXECUTE citus_local_only;
-- execute 6 times, with param
PREPARE citus_local_only_p(int) AS SELECT count(*) FROM citus_local_table WHERE a = $1;
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
EXECUTE citus_local_only_p(1);
-- do not evalute the function
-- show the logs
EXECUTE citus_local_only_p(random());
EXECUTE citus_local_only_p(random());
PREPARE mixed_query(int, int, int) AS
WITH cte_citus_local_table AS
(SELECT * FROM citus_local_table WHERE a = $1),
cte_postgres_local_table AS
(SELECT * FROM postgres_local_table WHERE a = $2),
cte_distributed_table AS
(SELECT * FROM distributed_table WHERE a = $3),
cte_mixes AS (SELECT * FROM cte_distributed_table, cte_citus_local_table, cte_postgres_local_table)
SELECT count(*) FROM cte_mixes;
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
EXECUTE mixed_query(1,2,3);
SELECT clear_and_init_test_tables();
-- anonymous columns
WITH a AS (SELECT a, '' FROM citus_local_table GROUP BY a) SELECT a.a FROM a ORDER BY 1 LIMIT 5;
WITH a AS (SELECT b, '' FROM citus_local_table WHERE a = 1) SELECT * FROM a, a b ORDER BY 1 LIMIT 5;
-- set operations should just work
SELECT * FROM citus_local_table UNION SELECT * FROM postgres_local_table UNION SELECT * FROM distributed_table ORDER BY 1,2;
(SELECT * FROM citus_local_table ORDER BY 1,2 LIMIT 5) INTERSECT (SELECT i, i FROM generate_series(0, 100) i) ORDER BY 1, 2;
-- should just work as recursive planner kicks in
SELECT count(*) FROM distributed_table WHERE a IN (SELECT a FROM citus_local_table);
SELECT count(*) FROM citus_local_table WHERE a IN (SELECT a FROM distributed_table);
SELECT count(*) FROM reference_table WHERE a IN (SELECT a FROM citus_local_table);
SELECT count(*) FROM citus_local_table WHERE a IN (SELECT a FROM reference_table);
-- nested recursive queries should just work
SELECT count(*) FROM citus_local_table
WHERE a IN
(SELECT a FROM distributed_table WHERE a IN
(SELECT b FROM citus_local_table WHERE b IN (SELECT b FROM postgres_local_table)));
-- local outer joins
SELECT count(*) FROM citus_local_table LEFT JOIN reference_table ON (true);
SELECT count(*) FROM reference_table
LEFT JOIN citus_local_table ON (true)
LEFT JOIN postgres_local_table ON (true)
LEFT JOIN reference_table r2 ON (true);
-- not supported direct outer join
SELECT count(*) FROM citus_local_table LEFT JOIN distributed_table ON (true);
-- distinct in subquery on CTE
WITH one_row AS (
SELECT a from citus_local_table WHERE b = 1
)
SELECT
*
FROM
distributed_table
WHERE
b IN (SELECT DISTINCT a FROM one_row)
ORDER BY
1, 2
LIMIT
1;
WITH one_row_2 AS (
SELECT a from distributed_table WHERE b = 1
)
SELECT
*
FROM
citus_local_table
WHERE
b IN (SELECT DISTINCT a FROM one_row_2)
ORDER BY
1 ,2
LIMIT
1;
-- join between citus local tables and distributed tables would fail
SELECT count(*) FROM citus_local_table, distributed_table;
SELECT * FROM citus_local_table, distributed_table ORDER BY 1,2,3,4 FOR UPDATE;
-- join between citus local table and postgres local table would fail
-- as citus local table is on the coordinator
SELECT count(citus_local_table.b), count(postgres_local_table.a)
FROM citus_local_table, postgres_local_table
WHERE citus_local_table.a = postgres_local_table.b;
-- select for update is just OK
SELECT * FROM citus_local_table ORDER BY 1,2 FOR UPDATE;
---------------------------
----- INSERT SELECT -----
---------------------------
-- simple INSERT SELECT is OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT * from reference_table;
INSERT INTO reference_table
SELECT * from citus_local_table;
INSERT INTO citus_local_table
SELECT * from distributed_table;
INSERT INTO distributed_table
SELECT * from citus_local_table;
INSERT INTO citus_local_table
SELECT * from citus_local_table_2;
INSERT INTO citus_local_table
SELECT * from citus_local_table_2
ORDER BY 1,2
LIMIT 10;
INSERT INTO citus_local_table
SELECT * from postgres_local_table;
INSERT INTO postgres_local_table
SELECT * from citus_local_table;
-- INSERT SELECT with local joins are OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO reference_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO reference_table
SELECT reference_table.* FROM reference_table, postgres_local_table
JOIN citus_local_table ON (true);
SELECT clear_and_init_test_tables();
INSERT INTO distributed_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
INSERT INTO distributed_table
SELECT reference_table.* FROM reference_table, postgres_local_table
JOIN citus_local_table ON (true);
INSERT INTO postgres_local_table
SELECT reference_table.* FROM reference_table
JOIN citus_local_table ON (true);
-- INSERT SELECT that joins reference and distributed tables is also OK
SELECT clear_and_init_test_tables();
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table
JOIN distributed_table ON (true);
INSERT INTO citus_local_table
SELECT reference_table.*
FROM reference_table, distributed_table;
-- INSERT SELECT that joins citus local and distributed table directly will fail ..
INSERT INTO citus_local_table
SELECT distributed_table.* FROM distributed_table
JOIN citus_local_table ON (true);
-- .. but when wrapped into a CTE, join works fine
INSERT INTO citus_local_table
SELECT distributed_table.* FROM distributed_table
JOIN (WITH cte AS (SELECT * FROM citus_local_table) SELECT * FROM cte) as foo ON (true);
-- multi row insert is OK
INSERT INTO citus_local_table VALUES (1, 2), (3, 4);
---------------------------
----- DELETE / UPDATE -----
---------------------------
-- modifications using citus local tables and postgres local tables
-- are not supported, see below four tests
SELECT clear_and_init_test_tables();
DELETE FROM citus_local_table
USING postgres_local_table
WHERE citus_local_table.b = postgres_local_table.b;
UPDATE citus_local_table
SET b = 5
FROM postgres_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
DELETE FROM postgres_local_table
USING citus_local_table
WHERE citus_local_table.b = postgres_local_table.b;
UPDATE postgres_local_table
SET b = 5
FROM citus_local_table
WHERE citus_local_table.a = 3 AND citus_local_table.b = postgres_local_table.b;
-- no direct joins supported
UPDATE distributed_table
SET b = 6
FROM citus_local_table
WHERE citus_local_table.a = distributed_table.a;
UPDATE reference_table
SET b = 6
FROM citus_local_table
WHERE citus_local_table.a = reference_table.a;
-- should not work, add HINT use CTEs
UPDATE citus_local_table
SET b = 6
FROM distributed_table
WHERE citus_local_table.a = distributed_table.a;
-- should work, add HINT use CTEs
UPDATE citus_local_table
SET b = 6
FROM reference_table
WHERE citus_local_table.a = reference_table.a;
-- should not work, add HINT use CTEs
DELETE FROM distributed_table
USING citus_local_table
WHERE citus_local_table.a = distributed_table.a;
-- should not work, add HINT use CTEs
DELETE FROM citus_local_table
USING distributed_table
WHERE citus_local_table.a = distributed_table.a;
DELETE FROM reference_table
USING citus_local_table
WHERE citus_local_table.a = reference_table.a;
-- should work, add HINT use CTEs
DELETE FROM citus_local_table
USING reference_table
WHERE citus_local_table.a = reference_table.a;
-- just works
DELETE FROM citus_local_table
WHERE citus_local_table.a IN (SELECT a FROM distributed_table);
-- just works
DELETE FROM citus_local_table
WHERE citus_local_table.a IN (SELECT a FROM reference_table);
-- just works
WITH distributed_table_cte AS (SELECT * FROM distributed_table)
UPDATE citus_local_table
SET b = 6
FROM distributed_table_cte
WHERE citus_local_table.a = distributed_table_cte.a;
-- just works
WITH reference_table_cte AS (SELECT * FROM reference_table)
UPDATE citus_local_table
SET b = 6
FROM reference_table_cte
WHERE citus_local_table.a = reference_table_cte.a;
------------------------
----- VIEW QUERIES -----
------------------------
CREATE MATERIALIZED VIEW mat_view_4 AS
SELECT count(*)
FROM citus_local_table
JOIN reference_table
USING (a);
-- ok
SELECT count(*) FROM mat_view_4;
-- should work
SELECT count(*) FROM distributed_table WHERE b in
(SELECT count FROM mat_view_4);
CREATE VIEW view_2 AS
SELECT count(*)
FROM citus_local_table
JOIN citus_local_table_2 USING (a)
JOIN distributed_table USING (a);
-- should fail as view contains direct local dist join
SELECT count(*) FROM view_2;
CREATE VIEW view_3
AS SELECT count(*)
FROM citus_local_table_2
JOIN reference_table
USING (a);
-- ok
SELECT count(*) FROM view_3;
-- view treated as subquery, so should work
SELECT count(*) FROM view_3, distributed_table;
----------------------------------------------
-- Some other tests with subqueries & CTE's --
----------------------------------------------
SELECT clear_and_init_test_tables();
SELECT count(*) AS a, count(*) AS b
FROM reference_table
JOIN (SELECT count(*) as a, count(*) as b
FROM citus_local_table_2
JOIN (SELECT count(*) as a, count(*) as b
FROM postgres_local_table
JOIN (SELECT count(*) as a, count(*) as b
FROM reference_table as table_4677) subquery5108
USING (a)) subquery7132
USING (b)) subquery7294
USING (a);
-- direct join inside CTE not supported
WITH cte AS (
UPDATE citus_local_table lt SET a = mt.a
FROM distributed_table mt WHERE mt.b = lt.b
RETURNING lt.b, lt.a
) SELECT * FROM cte JOIN distributed_table mt ON mt.b = cte.b ORDER BY 1,2,3,4;
-- join with CTE just works
UPDATE citus_local_table
SET a=5
FROM (SELECT avg(distributed_table.b) as avg_b
FROM distributed_table) as foo
WHERE
foo.avg_b = citus_local_table.b;
-- should work
UPDATE distributed_table
SET b = avg_a
FROM (SELECT avg(citus_local_table.a) as avg_a FROM citus_local_table) as foo
WHERE foo.avg_a = distributed_table.a
RETURNING distributed_table.*;
-- it is unfortunate that recursive planner cannot detect this
-- but expected to not work
UPDATE citus_local_table
SET a=5
FROM (SELECT b FROM distributed_table) AS foo
WHERE foo.b = citus_local_table.b;
------------------------------------
-- test different execution paths --
------------------------------------
-- a bit different explain output than for postgres local tables
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT * FROM distributed_table
ORDER BY distributed_table.*
LIMIT 10;
-- show that we do not pull to coordinator
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT * FROM citus_local_table;
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table;
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table, postgres_local_table;
-- show that we pull to coordinator when a distributed table is involved
EXPLAIN (COSTS FALSE)
INSERT INTO citus_local_table
SELECT reference_table.* FROM reference_table, distributed_table;
-- truncate tables & add unique constraints to be able to define foreign keys
TRUNCATE reference_table, citus_local_table, distributed_table;
\c - - - :master_port
SET search_path TO citus_local_table_queries_mx;
SET citus.replication_model TO streaming;
ALTER TABLE reference_table ADD CONSTRAINT pkey_ref PRIMARY KEY (a);
ALTER TABLE citus_local_table ADD CONSTRAINT pkey_c PRIMARY KEY (a);
-- define a foreign key chain distributed table -> reference table -> citus local table
-- to test sequential execution
ALTER TABLE distributed_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY(a) REFERENCES reference_table(a) ON DELETE RESTRICT;
ALTER TABLE reference_table ADD CONSTRAINT fkey_ref_to_local FOREIGN KEY(a) REFERENCES citus_local_table(a) ON DELETE RESTRICT;
\c - - - :worker_1_port
SET search_path TO citus_local_table_queries_mx;
INSERT INTO citus_local_table VALUES (1);
INSERT INTO reference_table VALUES (1);
BEGIN;
INSERT INTO citus_local_table VALUES (1) ON CONFLICT (a) DO NOTHING;
INSERT INTO distributed_table VALUES (1);
-- should show sequential as first inserting into citus local table
-- would force the xact block to use sequential execution
show citus.multi_shard_modify_mode;
ROLLBACK;
BEGIN;
TRUNCATE distributed_table;
-- should error out as we truncated distributed_table via parallel execution
TRUNCATE citus_local_table CASCADE;
ROLLBACK;
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
TRUNCATE distributed_table;
-- should work fine as we already switched to sequential execution
-- before parallel truncate
TRUNCATE citus_local_table CASCADE;
ROLLBACK;
\c - - - :master_port
SET search_path TO citus_local_table_queries_mx;
SET citus.replication_model TO streaming;
ALTER TABLE distributed_table DROP CONSTRAINT fkey_dist_to_ref;
\c - - - :worker_1_port
SET search_path TO citus_local_table_queries_mx;
BEGIN;
INSERT INTO citus_local_table VALUES (1) ON CONFLICT (a) DO NOTHING;
show citus.multi_shard_modify_mode;
ROLLBACK;
\c - - - :master_port
SET search_path TO citus_local_table_queries_mx;
SET citus.replication_model TO streaming;
-- remove uniqueness constraint and dependent foreign key constraint for next tests
ALTER TABLE reference_table DROP CONSTRAINT fkey_ref_to_local;
ALTER TABLE citus_local_table DROP CONSTRAINT pkey_c;
\c - - - :worker_1_port
SET search_path TO citus_local_table_queries_mx;
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
BEGIN;
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
COPY citus_local_table(a) FROM PROGRAM 'seq 1';
COMMIT;
COPY citus_local_table TO STDOUT;
COPY (SELECT * FROM citus_local_table) TO STDOUT;
BEGIN;
COPY citus_local_table TO STDOUT;
COMMIT;
BEGIN;
COPY (SELECT * FROM citus_local_table) TO STDOUT;
COMMIT;
-- truncate test tables for next test
TRUNCATE citus_local_table, reference_table, distributed_table;
BEGIN;
INSERT INTO citus_local_table VALUES (1), (2);
SAVEPOINT sp1;
INSERT INTO citus_local_table VALUES (3), (4);
ROLLBACK TO SAVEPOINT sp1;
SELECT * FROM citus_local_table ORDER BY 1,2;
SAVEPOINT sp2;
INSERT INTO citus_local_table VALUES (5), (6);
INSERT INTO distributed_table VALUES (5), (6);
ROLLBACK TO SAVEPOINT sp2;
SELECT * FROM citus_local_table ORDER BY 1,2;
SELECT * FROM distributed_table ORDER BY 1,2;
SAVEPOINT sp3;
INSERT INTO citus_local_table VALUES (7), (8);
INSERT INTO reference_table VALUES (7), (8);
ROLLBACK TO SAVEPOINT sp3;
SELECT * FROM citus_local_table ORDER BY 1,2;
SELECT * FROM reference_table ORDER BY 1,2;
COMMIT;
\c - - - :master_port
-- cleanup at exit
DROP SCHEMA citus_local_table_queries_mx CASCADE;

View File

@ -3,64 +3,135 @@
CREATE TABLE the_table (a int, b int, z bigserial);
SELECT create_distributed_table('the_table', 'a');
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
CREATE TABLE reference_table (a int, b int, z bigserial);
SELECT create_reference_table('reference_table');
CREATE TABLE citus_local_table (a int, b int, z bigserial);
SELECT create_citus_local_table('citus_local_table');
CREATE TABLE local (a int, b int);
\c - - - :follower_master_port
-- inserts normally do not work on a standby coordinator
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
-- we can allow DML on a writable standby coordinator
-- We can allow DML on a writable standby coordinator.
-- Note that it doesn't help to enable writes for citus local tables
-- and coordinator replicated reference tables. This is because, the
-- data is in the coordinator and will hit read-only tranaction checks
-- on Postgres
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM the_table;
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM reference_table;
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
SELECT * FROM citus_local_table;
UPDATE the_table SET z = 3 WHERE a = 1;
UPDATE reference_table SET z = 3 WHERE a = 1;
UPDATE citus_local_table SET z = 3 WHERE a = 1;
SELECT * FROM the_table;
SELECT * FROM reference_table;
SELECT * FROM citus_local_table;
DELETE FROM the_table WHERE a = 1;
DELETE FROM reference_table WHERE a = 1;
DELETE FROM citus_local_table WHERE a = 1;
SELECT * FROM the_table;
SELECT * FROM reference_table;
SELECT * FROM citus_local_table;
-- drawing from a sequence is not possible
INSERT INTO the_table (a, b) VALUES (1, 2);
INSERT INTO reference_table (a, b) VALUES (1, 2);
INSERT INTO citus_local_table (a, b) VALUES (1, 2);
-- 2PC is not possible
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
-- COPY is not possible in 2PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
COPY reference_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
COPY citus_local_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
-- 1PC is possible
SET citus.multi_shard_commit_protocol TO '1pc';
INSERT INTO the_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
SELECT * FROM the_table ORDER BY a;
INSERT INTO reference_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
SELECT * FROM reference_table ORDER BY a;
INSERT INTO citus_local_table (a, b, z) VALUES (2, 3, 4), (5, 6, 7);
SELECT * FROM citus_local_table ORDER BY a;
-- modifying CTEs are possible
WITH del AS (DELETE FROM the_table RETURNING *)
SELECT * FROM del ORDER BY a;
WITH del AS (DELETE FROM reference_table RETURNING *)
SELECT * FROM del ORDER BY a;
WITH del AS (DELETE FROM citus_local_table RETURNING *)
SELECT * FROM del ORDER BY a;
-- COPY is possible in 1PC mode
COPY the_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
COPY reference_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
COPY citus_local_table (a, b, z) FROM STDIN WITH CSV;
10,10,10
11,11,11
\.
SELECT * FROM the_table ORDER BY a;
SELECT * FROM reference_table ORDER BY a;
SELECT * FROM citus_local_table ORDER BY a;
DELETE FROM the_table;
DELETE FROM reference_table;
DELETE FROM citus_local_table;
-- DDL is not possible
TRUNCATE the_table;
TRUNCATE reference_table;
TRUNCATE citus_local_table;
ALTER TABLE the_table ADD COLUMN c int;
ALTER TABLE reference_table ADD COLUMN c int;
ALTER TABLE citus_local_table ADD COLUMN c int;
-- rollback is possible
BEGIN;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
BEGIN;
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
BEGIN;
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 2);
ROLLBACK;
SELECT * FROM the_table ORDER BY a;
SELECT * FROM reference_table ORDER BY a;
SELECT * FROM citus_local_table ORDER BY a;
-- we should still disallow writes to local tables
INSERT INTO local VALUES (1, 1);
@ -75,6 +146,13 @@ CREATE TEMP TABLE local_copy_of_the_table AS SELECT * FROM the_table;
SET citus.writable_standby_coordinator TO on;
INSERT INTO the_table (a, b, z) VALUES (1, 2, 3);
SELECT * FROM the_table ORDER BY a;
INSERT INTO reference_table (a, b, z) VALUES (1, 2, 3);
SELECT * FROM reference_table ORDER BY a;
INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 3);
SELECT * FROM citus_local_table ORDER BY a;
\c - - - :master_port
DROP TABLE the_table;
DROP TABLE reference_table;
DROP TABLE citus_local_table;
SELECT master_remove_node('localhost', :master_port);