Reference Table Support - Phase 1

With this commit, we implemented some basic features of reference tables.

To start with, a reference table is
  * a distributed table whithout a distribution column defined on it
  * the distributed table is single sharded
  * and the shard is replicated to all nodes

Reference tables follows the same code-path with a single sharded
tables. Thus, broadcast JOINs are applicable to reference tables.
But, since the table is replicated to all nodes, table fetching is
not required any more.

Reference tables support the uniqueness constraints for any column.

Reference tables can be used in INSERT INTO .. SELECT queries with
the following rules:
  * If a reference table is in the SELECT part of the query, it is
    safe join with another reference table and/or hash partitioned
    tables.
  * If a reference table is in the INSERT part of the query, all
    other participating tables should be reference tables.

Reference tables follow the regular co-location structure. Since
all reference tables are single sharded and replicated to all nodes,
they are always co-located with each other.

Queries involving only reference tables always follows router planner
and executor.

Reference tables can have composite typed columns and there is no need
to create/define the necessary support functions.

All modification queries, master_* UDFs, EXPLAIN, DDLs, TRUNCATE,
sequences, transactions, COPY, schema support works on reference
tables as expected. Plus, all the pre-requisites associated with
distribution columns are dismissed.
pull/1018/head
Onder Kalaci 2016-12-01 16:42:29 +02:00
parent a71b79983b
commit 9f0bd4cb36
44 changed files with 4665 additions and 182 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -104,7 +104,9 @@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql
cat $^ > $@
$(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql
cat $^ > $@
cat $^ > $@
$(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,9 @@
/* citus--6.1-5--6.1-6.sql */
SET search_path = 'pg_catalog';
-- we don't need this constraint any more since reference tables
-- wouldn't have partition columns, which we represent as NULL
ALTER TABLE pg_dist_partition ALTER COLUMN partkey DROP NOT NULL;
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-5'
default_version = '6.1-6'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -58,8 +58,10 @@
/* local function forward declarations */
static void CreateReferenceTable(Oid relationId);
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, uint32 colocationId);
char distributionMethod, uint32 colocationId,
char replicationModel);
static char LookupDistributionMethod(Oid distributionMethodOid);
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
@ -73,10 +75,12 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
Var *distributionColumn,
uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
Var *distributionColumn, uint32 colocationId,
char replicationModel);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor);
int shardCount, int replicationFactor,
char replicationModel);
static Oid ColumnType(Oid relationId, char *columnName);
@ -104,7 +108,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
distributionMethod, INVALID_COLOCATION_ID,
REPLICATION_MODEL_COORDINATOR);
PG_RETURN_VOID();
}
@ -159,14 +164,16 @@ create_distributed_table(PG_FUNCTION_ARGS)
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ConvertToDistributedTable(relationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
distributionMethod, INVALID_COLOCATION_ID,
REPLICATION_MODEL_COORDINATOR);
PG_RETURN_VOID();
}
/* use configuration values for shard count and shard replication factor */
CreateHashDistributedTable(relationId, distributionColumnName,
colocateWithTableName, ShardCount,
ShardReplicationFactor);
ShardReplicationFactor,
REPLICATION_MODEL_COORDINATOR);
PG_RETURN_VOID();
}
@ -175,32 +182,61 @@ create_distributed_table(PG_FUNCTION_ARGS)
/*
* create_reference_table accepts a table and then it creates a distributed
* table which has one shard and replication factor is set to
* shard_replication_factor configuration value.
* the worker count.
*/
Datum
create_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
int shardCount = 1;
AttrNumber firstColumnAttrNumber = 1;
char *colocateWithTableName = "default";
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
if (firstColumnName == NULL)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("reference table candidate %s needs to have at"
"least one column", relationName)));
}
CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName,
shardCount, ShardReplicationFactor);
CreateReferenceTable(relationId);
PG_RETURN_VOID();
}
/*
* CreateReferenceTable creates a distributed table with the given relationId. The
* created table has one shard and replication factor is set to the active worker
* count. In fact, the above is the definition of a reference table in Citus.
*/
static void
CreateReferenceTable(Oid relationId)
{
uint32 colocationId = INVALID_COLOCATION_ID;
List *workerNodeList = WorkerNodeList();
int shardCount = 1;
int replicationFactor = list_length(workerNodeList);
Oid distributionColumnType = InvalidOid;
char *distributionColumnName = NULL;
/* if there are no workers, error out */
if (replicationFactor == 0)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot create reference table \"%s\"", relationName),
errdetail("There are no active worker nodes.")));
}
/* check for existing colocations */
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType);
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId = CreateColocationGroup(shardCount, replicationFactor,
distributionColumnType);
}
/* first, convert the relation into distributed relation */
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_NONE, colocationId, REPLICATION_MODEL_2PC);
/* now, create the single shard replicated to all nodes */
CreateReferenceTableShard(relationId);
}
/*
* ConvertToDistributedTable converts the given regular PostgreSQL table into a
* distributed table. First, it checks if the given table can be distributed,
@ -214,7 +250,8 @@ create_reference_table(PG_FUNCTION_ARGS)
*/
static void
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, uint32 colocationId)
char distributionMethod, uint32 colocationId,
char replicationModel)
{
Relation relation = NULL;
TupleDesc relationDesc = NULL;
@ -272,6 +309,10 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
errhint("Empty your table before distributing it.")));
}
/*
* Distribution column returns NULL for reference tables,
* but it is not used below for reference tables.
*/
distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
@ -310,7 +351,7 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
colocationId);
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId);
colocationId, replicationModel);
relation_close(relation, NoLock);
@ -329,6 +370,9 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude
* constraints.
*
* The function skips the uniqeness checks for reference tables (i.e., distribution
* method is 'none').
*
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned
* tables, since currently there is no way of enforcing uniqueness for
* overlapping shards.
@ -344,10 +388,25 @@ static void
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId)
{
char *relationName = RelationGetRelationName(relation);
List *indexOidList = RelationGetIndexList(relation);
char *relationName = NULL;
List *indexOidList = NULL;
ListCell *indexOidCell = NULL;
/*
* Citus supports any kind of uniqueness constraints for reference tables
* given that they only consist of a single shard and we can simply rely on
* Postgres.
* TODO: Here we should be erroring out if there exists any foreign keys
* from/to a reference table.
*/
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
return;
}
relationName = RelationGetRelationName(relation);
indexOidList = RelationGetIndexList(relation);
foreach(indexOidCell, indexOidList)
{
Oid indexOid = lfirst_oid(indexOidCell);
@ -630,10 +689,10 @@ ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod,
*/
static void
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId)
Var *distributionColumn, uint32 colocationId,
char replicationModel)
{
Relation pgDistPartition = NULL;
const char replicationModel = 'c';
char *distributionColumnString = NULL;
HeapTuple newTuple = NULL;
@ -643,8 +702,6 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
distributionColumnString = nodeToString((Node *) distributionColumn);
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
@ -653,11 +710,23 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionColumnString);
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
/* set partkey column to NULL for reference tables */
if (distributionMethod != DISTRIBUTE_BY_NONE)
{
distributionColumnString = nodeToString((Node *) distributionColumn);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionColumnString);
}
else
{
newValues[Anum_pg_dist_partition_partkey - 1] = PointerGetDatum(NULL);
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
}
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
@ -883,8 +952,8 @@ CreateTruncateTrigger(Oid relationId)
*/
static void
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor)
char *colocateWithTableName, int shardCount,
int replicationFactor, char replicationModel)
{
Relation distributedRelation = NULL;
Relation pgDistColocation = NULL;
@ -935,7 +1004,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
/* create distributed table metadata */
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
colocationId);
colocationId, REPLICATION_MODEL_COORDINATOR);
/* create shards */
if (sourceRelationId != InvalidOid)
@ -944,6 +1013,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
CheckReplicationModel(sourceRelationId, relationId);
CheckDistributionColumnType(sourceRelationId, relationId);
CreateColocatedShards(relationId, sourceRelationId);
}
else

View File

@ -219,7 +219,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
char partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_RANGE)
DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE)
{
CopyToExistingShards(copyStatement, completionTag);
}
@ -422,8 +422,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
}
}
/* error if any shard missing min/max values */
if (cacheEntry->hasUninitializedShardInterval)
/* error if any shard missing min/max values for non reference tables */
if (partitionMethod != DISTRIBUTE_BY_NONE &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("could not start copy"),
@ -515,22 +516,37 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
CHECK_FOR_INTERRUPTS();
/* find the partition column value */
if (columnNulls[partitionColumn->varattno - 1])
/*
* Find the partition column value and corresponding shard interval
* for non-reference tables.
* Get the existing (and only a single) shard interval for the reference
* tables. Note that, reference tables has NULL partition column values so
* skip the check.
*/
if (partitionColumn != NULL)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot copy row with NULL value "
"in partition column")));
if (columnNulls[partitionColumn->varattno - 1])
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("cannot copy row with NULL value "
"in partition column")));
}
partitionColumnValue = columnValues[partitionColumn->varattno - 1];
}
partitionColumnValue = columnValues[partitionColumn->varattno - 1];
/* find the shard interval and id for the partition column value */
shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache,
/*
* Find the shard interval and id for the partition column value for
* non-reference tables.
* For reference table, this function blindly returns the tables single
* shard.
*/
shardInterval = FindShardInterval(partitionColumnValue,
shardIntervalCache,
shardCount, partitionMethod,
compareFunction, hashFunction,
useBinarySearch);
if (shardInterval == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),

View File

@ -1201,6 +1201,15 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
ListCell *indexParameterCell = NULL;
bool indexContainsPartitionColumn = false;
/*
* Reference tables do not have partition key, and unique constraints
* are allowed for them. Thus, we added a short-circuit for reference tables.
*/
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return;
}
if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1347,7 +1356,10 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
if (HeapTupleIsValid(tuple))
{
Form_pg_attribute targetAttr = (Form_pg_attribute) GETSTRUCT(tuple);
if (targetAttr->attnum == partitionColumn->varattno)
/* reference tables do not have partition column, so allow them */
if (partitionColumn != NULL &&
targetAttr->attnum == partitionColumn->varattno)
{
ereport(ERROR, (errmsg("cannot execute ALTER TABLE command "
"involving partition column")));

View File

@ -321,6 +321,82 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
}
/*
* CreateReferenceTableShard creates a single shard for the given
* distributedTableId. The created shard does not have min/max values.
* Also, the shard is replicated to the all active nodes in the cluster.
*/
void
CreateReferenceTableShard(Oid distributedTableId)
{
char *relationOwner = NULL;
char shardStorageType = 0;
List *workerNodeList = NIL;
List *ddlCommandList = NIL;
int32 workerNodeCount = 0;
List *existingShardList = NIL;
uint64 shardId = INVALID_SHARD_ID;
int workerStartIndex = 0;
int replicationFactor = 0;
text *shardMinValue = NULL;
text *shardMaxValue = NULL;
/*
* In contrast to append/range partitioned tables it makes more sense to
* require ownership privileges - shards for reference tables are
* only created once, not continually during ingest as for the other
* partitioning types such as append and range.
*/
EnsureTableOwner(distributedTableId);
/* we plan to add shards: get an exclusive metadata lock */
LockRelationDistributionMetadata(distributedTableId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);
/* set shard storage type according to relation type */
shardStorageType = ShardStorageType(distributedTableId);
/* validate that shards haven't already been created for this table */
existingShardList = LoadShardList(distributedTableId);
if (existingShardList != NIL)
{
char *tableName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already had shards created for it",
tableName)));
}
/* load and sort the worker node list for deterministic placement */
workerNodeList = WorkerNodeList();
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/* get the next shard id */
shardId = GetNextShardId();
/* retrieve the DDL commands for the table */
ddlCommandList = GetTableDDLEvents(distributedTableId);
/* set the replication factor equal to the number of worker nodes */
workerNodeCount = list_length(workerNodeList);
replicationFactor = workerNodeCount;
/*
* Grabbing the shard metadata lock isn't technically necessary since
* we already hold an exclusive lock on the partition table, but we'll
* acquire it for the sake of completeness. As we're adding new active
* placements, the mode must be exclusive.
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
workerNodeList, workerStartIndex, replicationFactor);
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);
}
/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function

View File

@ -146,6 +146,14 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
errdetail("Delete statements on hash-partitioned tables "
"with where clause is not supported")));
}
else if (partitionMethod == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot delete from distributed table"),
errdetail("Delete statements on reference tables "
"are not supported.")));
}
CheckDeleteCriteria(deleteCriteria);
CheckPartitionColumn(relationId, deleteCriteria);

View File

@ -95,6 +95,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
Oid relationId = ResolveRelationId(relationName);
DistTableCacheEntry *partitionEntry = NULL;
char *partitionKeyString = NULL;
TypeFuncClass resultTypeClass = 0;
Datum partitionKeyExpr = 0;
Datum partitionKey = 0;
@ -116,16 +117,27 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
ereport(ERROR, (errmsg("return type must be a row type")));
}
/* get decompiled expression tree for partition key */
partitionKeyExpr =
PointerGetDatum(cstring_to_text(partitionEntry->partitionKeyString));
partitionKey = DirectFunctionCall2(pg_get_expr, partitionKeyExpr,
ObjectIdGetDatum(relationId));
/* form heap tuple for table metadata */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
partitionKeyString = partitionEntry->partitionKeyString;
/* reference tables do not have partition key */
if (partitionKeyString == NULL)
{
partitionKey = PointerGetDatum(NULL);
isNulls[3] = true;
}
else
{
/* get decompiled expression tree for partition key */
partitionKeyExpr =
PointerGetDatum(cstring_to_text(partitionEntry->partitionKeyString));
partitionKey = DirectFunctionCall2(pg_get_expr, partitionKeyExpr,
ObjectIdGetDatum(relationId));
}
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
/* get storage type */

View File

@ -114,6 +114,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
errdetail("We currently don't support creating shards "
"on hash-partitioned tables")));
}
else if (partitionMethod == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errmsg("relation \"%s\" is a reference table",
relationName),
errdetail("We currently don't support creating shards "
"on reference tables")));
}
/* generate new and unique shardId from sequence */
shardId = GetNextShardId();
@ -219,11 +226,11 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
}
partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH)
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
errdetail("We currently don't support appending to shards "
"in hash-partitioned tables")));
"in hash-partitioned or reference tables")));
}
/* ensure that the shard placement metadata does not change during the append */

View File

@ -131,7 +131,8 @@ FixedJoinOrderList(FromExpr *fromExpr, List *tableEntryList)
Oid relationId = rangeTableEntry->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
if (cacheEntry->hasUninitializedShardInterval)
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
cacheEntry->hasUninitializedShardInterval)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this query"),
@ -1150,16 +1151,31 @@ BroadcastJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
/*
* If the table's shard count doesn't exceed the value specified in the
* configuration, then we assume table broadcasting is feasible. This assumption
* is valid only for inner joins.
* configuration or the table is a reference table, then we assume table
* broadcasting is feasible. This assumption is valid only for inner joins.
*
* Left join requires candidate table to have single shard, right join requires
* existing (left) table to have single shard, full outer join requires both tables
* to have single shard.
*/
if (joinType == JOIN_INNER && candidateShardCount < LargeTableShardCount)
if (joinType == JOIN_INNER)
{
performBroadcastJoin = true;
ShardInterval *initialCandidateShardInterval = NULL;
char candidatePartitionMethod = '\0';
if (candidateShardCount > 0)
{
initialCandidateShardInterval =
(ShardInterval *) linitial(candidateShardList);
candidatePartitionMethod =
PartitionMethod(initialCandidateShardInterval->relationId);
}
if (candidatePartitionMethod == DISTRIBUTE_BY_NONE ||
candidateShardCount < LargeTableShardCount)
{
performBroadcastJoin = true;
}
}
else if ((joinType == JOIN_LEFT || joinType == JOIN_ANTI) && candidateShardCount == 1)
{
@ -1276,7 +1292,8 @@ SinglePartitionJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
}
/* evaluate re-partitioning the current table only if the rule didn't apply above */
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH)
if (nextJoinNode == NULL && candidatePartitionMethod != DISTRIBUTE_BY_HASH &&
candidatePartitionMethod != DISTRIBUTE_BY_NONE)
{
OpExpr *joinClause = SinglePartitionJoinClause(candidatePartitionColumn,
applicableJoinClauses);
@ -1501,11 +1518,23 @@ RightColumn(OpExpr *joinClause)
/*
* PartitionColumn builds the partition column for the given relation, and sets
* the partition column's range table references to the given table identifier.
*
* Note that reference tables do not have partition column. Thus, this function
* returns NULL when called for reference tables.
*/
Var *
PartitionColumn(Oid relationId, uint32 rangeTableId)
{
Var *partitionColumn = PartitionKey(relationId);
Var *partitionKey = PartitionKey(relationId);
Var *partitionColumn = NULL;
/* short circuit for reference tables */
if (partitionKey == NULL)
{
return partitionColumn;
}
partitionColumn = partitionKey;
partitionColumn->varno = rangeTableId;
partitionColumn->varnoold = rangeTableId;
@ -1518,16 +1547,27 @@ PartitionColumn(Oid relationId, uint32 rangeTableId)
* that in the context of distributed join and query planning, the callers of
* this function *must* set the partition key column's range table reference
* (varno) to match the table's location in the query range table list.
*
* Note that reference tables do not have partition column. Thus, this function
* returns NULL when called for reference tables.
*/
Var *
PartitionKey(Oid relationId)
{
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
Node *variableNode = NULL;
Var *partitionKey = NULL;
/* reference tables do not have partition column */
if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
return NULL;
}
/* now obtain partition key and build the var node */
Node *variableNode = stringToNode(partitionEntry->partitionKeyString);
variableNode = stringToNode(partitionEntry->partitionKeyString);
Var *partitionKey = (Var *) variableNode;
partitionKey = (Var *) variableNode;
Assert(IsA(variableNode, Var));
return partitionKey;

View File

@ -3322,6 +3322,9 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
*
* Note that if the given expression is a field of a composite type, then this
* function checks if this composite column is a partition column.
*
* Also, the function returns always false for reference tables given that reference
* tables do not have partition column.
*/
bool
IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
@ -3368,7 +3371,12 @@ IsPartitionColumnRecursive(Expr *columnExpression, Query *query)
Oid relationId = rangeTableEntry->relid;
Var *partitionColumn = PartitionKey(relationId);
if (candidateColumn->varattno == partitionColumn->varattno)
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
isPartitionColumn = false;
}
else if (candidateColumn->varattno == partitionColumn->varattno)
{
isPartitionColumn = true;
}
@ -4189,7 +4197,8 @@ PartitionColumnOpExpressionList(Query *query)
relationId = rangeTableEntry->relid;
partitionColumn = PartitionKey(relationId);
if (candidatePartitionColumn->varattno == partitionColumn->varattno)
if (partitionColumn != NULL &&
candidatePartitionColumn->varattno == partitionColumn->varattno)
{
partitionColumnOpExpressionList = lappend(partitionColumnOpExpressionList,
whereClause);

View File

@ -2566,6 +2566,8 @@ RangeTableFragmentsList(List *rangeTableList, List *whereClauseList,
/*
* PruneShardList prunes shard intervals from given list based on the selection criteria,
* and returns remaining shard intervals in another list.
*
* For reference tables, the function simply returns the single shard that the table has.
*/
List *
PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
@ -2579,6 +2581,12 @@ PruneShardList(Oid relationId, Index tableId, List *whereClauseList,
Var *partitionColumn = PartitionColumn(relationId, tableId);
char partitionMethod = PartitionMethod(relationId);
/* short circuit for reference tables */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return shardIntervalList;
}
if (ContainsFalseClause(whereClauseList))
{
/* always return empty result if WHERE clause is of the form: false (AND ..) */
@ -3463,7 +3471,9 @@ JoinSequenceArray(List *rangeTableFragmentsList, Query *jobQuery, List *depended
/*
* PartitionedOnColumn finds the given column's range table entry, and checks if
* that range table is partitioned on the given column.
* that range table is partitioned on the given column. Note that since reference
* tables do not have partition columns, the function returns false when the distributed
* relation is a reference table.
*/
static bool
PartitionedOnColumn(Var *column, List *rangeTableList, List *dependedJobList)
@ -3476,8 +3486,17 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependedJobList)
if (rangeTableType == CITUS_RTE_RELATION)
{
Oid relationId = rangeTableEntry->relid;
char partitionMethod = PartitionMethod(relationId);
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);
/* reference tables do not have partition columns */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
partitionedOnColumn = false;
return partitionedOnColumn;
}
if (partitionColumn->varattno == column->varattno)
{
partitionedOnColumn = true;

View File

@ -352,6 +352,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
{
RelationRestrictionContext *restrictionContext = NULL;
RelationRestriction *relationRestriction = NULL;
DistTableCacheEntry *cacheEntry = NULL;
bool distributedTable = false;
bool localTable = false;
@ -378,6 +379,18 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
restrictionContext->hasDistributedRelation |= distributedTable;
restrictionContext->hasLocalRelation |= localTable;
/*
* We're also keeping track of whether all participant
* tables are reference tables.
*/
if (distributedTable)
{
cacheEntry = DistributedTableCacheEntry(rte->relid);
restrictionContext->allReferenceTables &=
(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
}
restrictionContext->relationRestrictionList =
lappend(restrictionContext->relationRestrictionList, relationRestriction);
}
@ -392,6 +405,10 @@ CreateAndPushRestrictionContext(void)
{
RelationRestrictionContext *restrictionContext =
palloc0(sizeof(RelationRestrictionContext));
/* we'll apply logical AND as we add tables */
restrictionContext->allReferenceTables = true;
relationRestrictionContextList = lcons(restrictionContext,
relationRestrictionContextList);

View File

@ -120,7 +120,8 @@ static RelationRestrictionContext * CopyRelationRestrictionContext(
static Node * InstantiatePartitionQual(Node *node, void *context);
static void ErrorIfInsertSelectQueryNotSupported(Query *queryTree,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte);
RangeTblEntry *subqueryRte,
bool allReferenceTables);
static void ErrorIfMultiTaskRouterSelectQueryUnsupported(Query *query);
static void ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query,
RangeTblEntry *insertRte,
@ -247,12 +248,14 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
Oid targetRelationId = insertRte->relid;
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
bool allReferenceTables = restrictionContext->allReferenceTables;
/*
* Error semantics for INSERT ... SELECT queries are different than regular
* modify queries. Thus, handle separately.
*/
ErrorIfInsertSelectQueryNotSupported(originalQuery, insertRte, subqueryRte);
ErrorIfInsertSelectQueryNotSupported(originalQuery, insertRte, subqueryRte,
allReferenceTables);
/*
* Plan select query for each shard in the target table. Do so by replacing the
@ -343,6 +346,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
bool routerPlannable = false;
bool upsertQuery = false;
bool replacePrunedQueryWithDummy = false;
bool allReferenceTables = restrictionContext->allReferenceTables;
/* grab shared metadata lock to stop concurrent placement additions */
LockShardDistributionMetadata(shardId, ShareLock);
@ -356,6 +360,15 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
RelationRestriction *restriction = lfirst(restrictionCell);
List *originalBaserestrictInfo = restriction->relOptInfo->baserestrictinfo;
/*
* We haven't added the quals if all participating tables are reference
* tables. Thus, now skip instantiating them.
*/
if (allReferenceTables)
{
break;
}
originalBaserestrictInfo =
(List *) InstantiatePartitionQual((Node *) originalBaserestrictInfo,
shardInterval);
@ -370,7 +383,10 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* prevent shard pruning logic (i.e, namely UpdateRelationNames())
* modifies range table entries, which makes hard to add the quals.
*/
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
if (!allReferenceTables)
{
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
}
/* mark that we don't want the router planner to generate dummy hosts/queries */
replacePrunedQueryWithDummy = false;
@ -461,7 +477,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
* hashfunc(partitionColumn) <= $upper_bound
*
* The function expects and asserts that subquery's target list contains a partition
* column value.
* column value. Thus, this function should never be called with reference tables.
*/
static void
AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval)
@ -622,10 +638,12 @@ ExtractInsertRangeTableEntry(Query *query)
*/
static void
ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte)
RangeTblEntry *subqueryRte, bool allReferenceTables)
{
Query *subquery = NULL;
Oid selectPartitionColumnTableId = InvalidOid;
Oid targetRelationId = insertRte->relid;
char targetPartitionMethod = PartitionMethod(targetRelationId);
/* we only do this check for INSERT ... SELECT queries */
AssertArg(InsertSelectQuery(queryTree));
@ -645,17 +663,39 @@ ErrorIfInsertSelectQueryNotSupported(Query *queryTree, RangeTblEntry *insertRte,
/* we don't support LIMIT, OFFSET and WINDOW functions */
ErrorIfMultiTaskRouterSelectQueryUnsupported(subquery);
/* ensure that INSERT's partition column comes from SELECT's partition column */
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
/* we expect partition column values come from colocated tables */
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
/*
* If we're inserting into a reference table, all participating tables
* should be reference tables as well.
*/
if (targetPartitionMethod == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("INSERT target table and the source relation "
"of the SELECT partition column value "
"must be colocated")));
if (!allReferenceTables)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("If data inserted into a reference table, "
"all of the participating tables in the "
"INSERT INTO ... SELECT query should be "
"reference tables.")));
}
}
else
{
/* ensure that INSERT's partition column comes from SELECT's partition column */
ErrorIfInsertPartitionColumnDoesNotMatchSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("INSERT target table and the source relation "
"of the SELECT partition column value "
"must be colocated")));
}
}
}
@ -797,6 +837,16 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
break;
}
/*
* Reference tables doesn't have a partition column, thus partition columns
* cannot match at all.
*/
if (PartitionMethod(subqeryTargetEntry->resorigtbl) == DISTRIBUTE_BY_NONE)
{
partitionColumnsMatch = false;
break;
}
if (!IsPartitionColumnRecursive(subqeryTargetEntry->expr, subquery))
{
partitionColumnsMatch = false;
@ -830,7 +880,10 @@ ErrorIfInsertPartitionColumnDoesNotMatchSelect(Query *query, RangeTblEntry *inse
* (i) Set operations are present on the top level query
* (ii) Target list does not include a bare partition column.
*
* Note that if the input query is not an INSERT .. SELECT the assertion fails.
* Note that if the input query is not an INSERT .. SELECT the assertion fails. Lastly,
* if all the participating tables in the query are reference tables, we implicitly
* skip adding the quals to the query since IsPartitionColumnRecursive() always returns
* false for reference tables.
*/
void
AddUninstantiatedPartitionRestriction(Query *originalQuery)
@ -1084,6 +1137,17 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
foreach(targetEntryCell, queryTree->targetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
bool targetEntryPartitionColumn = false;
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
targetEntryPartitionColumn = false;
}
else if (targetEntry->resno == partitionColumn->varattno)
{
targetEntryPartitionColumn = true;
}
/* skip resjunk entries: UPDATE adds some for ctid, etc. */
if (targetEntry->resjunk)
@ -1099,15 +1163,14 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
"tables must not be VOLATILE")));
}
if (commandType == CMD_UPDATE &&
if (commandType == CMD_UPDATE && targetEntryPartitionColumn &&
TargetEntryChangesValue(targetEntry, partitionColumn,
queryTree->jointree))
{
specifiesPartitionValue = true;
}
if (commandType == CMD_INSERT &&
targetEntry->resno == partitionColumn->varattno &&
if (commandType == CMD_INSERT && targetEntryPartitionColumn &&
!IsA(targetEntry->expr, Const))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1180,8 +1243,19 @@ ErrorIfModifyQueryNotSupported(Query *queryTree)
foreach(setTargetCell, onConflictSet)
{
TargetEntry *setTargetEntry = (TargetEntry *) lfirst(setTargetCell);
bool setTargetEntryPartitionColumn = false;
if (setTargetEntry->resno == partitionColumn->varattno)
/* reference tables do not have partition column */
if (partitionColumn == NULL)
{
setTargetEntryPartitionColumn = false;
}
else if (setTargetEntry->resno == partitionColumn->varattno)
{
setTargetEntryPartitionColumn = true;
}
if (setTargetEntryPartitionColumn)
{
Expr *setExpr = setTargetEntry->expr;
if (IsA(setExpr, Var) &&
@ -1724,17 +1798,30 @@ FastShardPruning(Oid distributedTableId, Const *partitionValue)
* statement these are the where-clause expressions. For INSERT statements we
* build an equality clause based on the partition-column and its supplied
* insert value.
*
* Since reference tables do not have partition columns, the function returns
* NIL for reference tables.
*/
static List *
QueryRestrictList(Query *query)
{
List *queryRestrictList = NIL;
CmdType commandType = query->commandType;
Oid distributedTableId = ExtractFirstDistributedTableId(query);
char partitionMethod = PartitionMethod(distributedTableId);
/*
* Reference tables do not have the notion of partition column. Thus,
* there are no restrictions on the partition column.
*/
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return queryRestrictList;
}
if (commandType == CMD_INSERT)
{
/* build equality expression based on partition column value for row */
Oid distributedTableId = ExtractFirstDistributedTableId(query);
uint32 rangeTableId = 1;
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
Const *partitionValue = ExtractInsertPartitionValue(query, partitionColumn);
@ -2389,7 +2476,8 @@ MultiRouterPlannableQuery(Query *query, RelationRestrictionContext *restrictionC
Oid distributedTableId = rte->relid;
char partitionMethod = PartitionMethod(distributedTableId);
if (partitionMethod != DISTRIBUTE_BY_HASH)
if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_NONE))
{
return false;
}
@ -2628,6 +2716,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
newContext->hasDistributedRelation = oldContext->hasDistributedRelation;
newContext->hasLocalRelation = oldContext->hasLocalRelation;
newContext->allReferenceTables = oldContext->allReferenceTables;
newContext->relationRestrictionList = NIL;
foreach(relationRestrictionCell, oldContext->relationRestrictionList)

View File

@ -38,6 +38,8 @@ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId);
static void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
@ -96,9 +98,6 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
uint32 targetColocationId = INVALID_COLOCATION_ID;
Relation pgDistColocation = NULL;
CheckHashPartitionedTable(sourceRelationId);
CheckHashPartitionedTable(targetRelationId);
CheckReplicationModel(sourceRelationId, targetRelationId);
CheckDistributionColumnType(sourceRelationId, targetRelationId);
@ -123,7 +122,13 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
Var *sourceDistributionColumn = PartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
Oid sourceDistributionColumnType = InvalidOid;
/* reference tables has NULL distribution column */
if (sourceDistributionColumn != NULL)
{
sourceDistributionColumnType = sourceDistributionColumn->vartype;
}
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
sourceDistributionColumnType);
@ -283,7 +288,60 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
/*
* ShardsIntervalsEqual checks if two shard intervals of hash distributed
* ShardsIntervalsEqual checks if two shard intervals of distributed
* tables are equal.
*
* Notes on the function:
* (i) The function returns true if both shard intervals are the same.
* (ii) The function returns false even if the shard intervals equal, but,
* their distribution method are different.
* (iii) The function returns false for append and range partitioned tables
* excluding (i) case.
* (iv) For reference tables, all shards are equal (i.e., same replication factor
* and shard min/max values). Thus, always return true for shards of reference
* tables.
*/
static bool
ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval)
{
char leftIntervalPartitionMethod = PartitionMethod(leftShardInterval->relationId);
char rightIntervalPartitionMethod = PartitionMethod(rightShardInterval->relationId);
/* if both shards are the same, return true */
if (leftShardInterval->shardId == rightShardInterval->shardId)
{
return true;
}
/* if partition methods are not the same, shards cannot be considered as co-located */
leftIntervalPartitionMethod = PartitionMethod(leftShardInterval->relationId);
rightIntervalPartitionMethod = PartitionMethod(rightShardInterval->relationId);
if (leftIntervalPartitionMethod != rightIntervalPartitionMethod)
{
return false;
}
if (leftIntervalPartitionMethod == DISTRIBUTE_BY_HASH)
{
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
}
else if (leftIntervalPartitionMethod == DISTRIBUTE_BY_NONE)
{
/*
* Reference tables has only a single shard and all reference tables
* are always co-located with each other.
*/
return true;
}
/* append and range partitioned shard never co-located */
return false;
}
/*
* HashPartitionedShardIntervalsEqual checks if two shard intervals of hash distributed
* tables are equal. Note that, this function doesn't work with non-hash
* partitioned table's shards.
*
@ -292,7 +350,8 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
* but do index check, but we avoid it because this way it is more cheaper.
*/
static bool
ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval)
HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval)
{
int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue);
int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue);
@ -511,11 +570,27 @@ CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
Oid sourceDistributionColumnType = InvalidOid;
Oid targetDistributionColumnType = InvalidOid;
/* reference tables have NULL distribution column */
sourceDistributionColumn = PartitionKey(sourceRelationId);
sourceDistributionColumnType = sourceDistributionColumn->vartype;
if (sourceDistributionColumn == NULL)
{
sourceDistributionColumnType = InvalidOid;
}
else
{
sourceDistributionColumnType = sourceDistributionColumn->vartype;
}
/* reference tables have NULL distribution column */
targetDistributionColumn = PartitionKey(targetRelationId);
targetDistributionColumnType = targetDistributionColumn->vartype;
if (targetDistributionColumn == NULL)
{
targetDistributionColumnType = InvalidOid;
}
else
{
targetDistributionColumnType = targetDistributionColumn->vartype;
}
if (sourceDistributionColumnType != targetDistributionColumnType)
{
@ -755,10 +830,11 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
char partitionMethod = cacheEntry->partitionMethod;
/*
* If distribution type of the table is not hash, each shard of the table is only
* co-located with itself.
* If distribution type of the table is not hash or reference, each shard of
* the shard is only co-located with itself.
*/
if (partitionMethod != DISTRIBUTE_BY_HASH)
if ((partitionMethod == DISTRIBUTE_BY_APPEND) ||
(partitionMethod == DISTRIBUTE_BY_RANGE))
{
colocatedShardList = lappend(colocatedShardList, shardInterval);
return colocatedShardList;

View File

@ -120,6 +120,9 @@ column_to_column_name(PG_FUNCTION_ARGS)
* only out of a reference to the column of name columnName. Errors out if the
* specified column does not exist or is not suitable to be used as a
* distribution column.
*
* The function returns NULL if the passed column name is NULL. That case only
* corresponds to reference tables.
*/
Var *
BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName)
@ -129,6 +132,12 @@ BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnNam
Var *distributionColumn = NULL;
char *tableName = RelationGetRelationName(distributedRelation);
/* short circuit for reference tables */
if (columnName == NULL)
{
return NULL;
}
/* it'd probably better to downcase identifiers consistent with SQL case folding */
truncate_identifier(columnName, strlen(columnName), true);

View File

@ -322,12 +322,12 @@ LookupDistTableCacheEntry(Oid relationId)
MemoryContext oldContext = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
bool isNull = false;
bool partitionKeyIsNull = false;
partitionKeyDatum = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_partkey,
tupleDescriptor,
&isNull);
Assert(!isNull);
&partitionKeyIsNull);
colocationId = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_colocationid, tupleDescriptor,
@ -352,10 +352,15 @@ LookupDistTableCacheEntry(Oid relationId)
}
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
partitionKeyString = TextDatumGetCString(partitionKeyDatum);
partitionMethod = partitionForm->partmethod;
replicationModel = DatumGetChar(replicationModelDatum);
/* note that for reference tables isNull becomes true */
if (!partitionKeyIsNull)
{
partitionKeyString = TextDatumGetCString(partitionKeyDatum);
}
MemoryContextSwitchTo(oldContext);
heap_freetuple(distPartitionTuple);
@ -406,7 +411,11 @@ LookupDistTableCacheEntry(Oid relationId)
}
/* decide and allocate interval comparison function */
if (shardIntervalArrayLength > 0)
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
shardIntervalCompareFunction = NULL;
}
else if (shardIntervalArrayLength > 0)
{
MemoryContext oldContext = CurrentMemoryContext;
@ -419,14 +428,39 @@ LookupDistTableCacheEntry(Oid relationId)
MemoryContextSwitchTo(oldContext);
}
/* sort the interval array */
sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray,
shardIntervalArrayLength,
shardIntervalCompareFunction);
/* reference tables has a single shard which is not initialized */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
hasUninitializedShardInterval = true;
/* check if there exists any shard intervals with no min/max values */
hasUninitializedShardInterval =
HasUninitializedShardInterval(sortedShardIntervalArray, shardIntervalArrayLength);
/*
* Note that during create_reference_table() call,
* the reference table do not have any shards.
*/
if (shardIntervalArrayLength > 1)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("reference table \"%s\" has more than 1 shard",
relationName)));
}
/* since there is a zero or one shard, it is already sorted */
sortedShardIntervalArray = shardIntervalArray;
}
else
{
/* sort the interval array */
sortedShardIntervalArray = SortShardIntervalArray(shardIntervalArray,
shardIntervalArrayLength,
shardIntervalCompareFunction);
/* check if there exists any shard intervals with no min/max values */
hasUninitializedShardInterval =
HasUninitializedShardInterval(sortedShardIntervalArray,
shardIntervalArrayLength);
}
/* we only need hash functions for hash distributed tables */
if (partitionMethod == DISTRIBUTE_BY_HASH)
@ -1461,8 +1495,11 @@ ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
cacheEntry->hasUninitializedShardInterval = false;
cacheEntry->hasUniformHashDistribution = false;
pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
if (cacheEntry->shardIntervalCompareFunction != NULL)
{
pfree(cacheEntry->shardIntervalCompareFunction);
cacheEntry->shardIntervalCompareFunction = NULL;
}
/* we only allocated hash function for hash distributed tables */
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
@ -1636,8 +1673,6 @@ LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId)
currentPartitionTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(currentPartitionTuple))
{
Assert(!HeapTupleHasNulls(currentPartitionTuple));
distPartitionTuple = heap_copytuple(currentPartitionTuple);
}
@ -1718,6 +1753,12 @@ GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
break;
}
case DISTRIBUTE_BY_NONE:
{
*intervalTypeId = InvalidOid;
break;
}
default:
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

View File

@ -133,10 +133,12 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement)
/*
* FindShardIntervalIndex finds index of given shard in sorted shard interval array. For
* this purpose, it calculates hash value of a number in its range(e.g. min value) and
* finds which shard should contain the hashed value. Therefore this function only works
* for hash distributed tables.
* FindShardIntervalIndex finds index of given shard in sorted shard interval array.
*
* For hash partitioned tables, it calculates hash value of a number in its
* range (e.g. min value) and finds which shard should contain the hashed
* value. For reference tables, it simply returns 0. For distribution methods
* other than hash and reference, the function errors out.
*/
int
FindShardIntervalIndex(ShardInterval *shardInterval)
@ -149,6 +151,15 @@ FindShardIntervalIndex(ShardInterval *shardInterval)
uint64 hashTokenIncrement = 0;
int shardIndex = -1;
/* short-circuit for reference tables */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
/* reference tables has only a single shard, so the index is fixed to 0 */
shardIndex = 0;
return shardIndex;
}
/*
* We can support it for other types of partitioned tables with simple binary scan
* but it is not necessary at the moment. If we need that simply check algorithm in
@ -184,7 +195,9 @@ FindShardIntervalIndex(ShardInterval *shardInterval)
/*
* FindShardInterval finds a single shard interval in the cache for the
* given partition column value.
* given partition column value. Note that reference tables do not have
* partition columns, thus, pass partitionColumnValue and compareFunction
* as NULL for them.
*/
ShardInterval *
FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache,
@ -225,6 +238,14 @@ FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache
shardInterval = shardIntervalCache[shardIndex];
}
}
else if (partitionMethod == DISTRIBUTE_BY_NONE)
{
int shardIndex = 0;
/* reference tables has a single shard, all values mapped to that shard */
Assert(shardCount == 1);
shardInterval = shardIntervalCache[shardIndex];
}
else
{
Assert(compareFunction != NULL);

View File

@ -106,6 +106,7 @@ extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
int shardIndex, uint64 shardId, char *newShardOwner,
List *ddlCommandList, List *foreignConstraintCommadList);

View File

@ -23,6 +23,7 @@ typedef struct RelationRestrictionContext
{
bool hasDistributedRelation;
bool hasLocalRelation;
bool allReferenceTables;
List *relationRestrictionList;
} RelationRestrictionContext;

View File

@ -52,11 +52,13 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
#define DISTRIBUTE_BY_APPEND 'a'
#define DISTRIBUTE_BY_HASH 'h'
#define DISTRIBUTE_BY_RANGE 'r'
#define DISTRIBUTE_BY_NONE 'n'
#define REDISTRIBUTE_BY_HASH 'x'
/* valid values for repmodel are 'c' for coordinator and 's' for streaming */
#define REPLICATION_MODEL_COORDINATOR 'c'
#define REPLICATION_MODEL_STREAMING 's'
#define REPLICATION_MODEL_2PC 't'
#endif /* PG_DIST_PARTITION_H */

View File

@ -1,6 +1,12 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership
-- before starting the test, lets try to create reference table and see a
-- meaningful error
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
ERROR: cannot create reference table "test_reference_table"
DETAIL: There are no active worker nodes.
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
master_add_node

View File

@ -654,7 +654,7 @@ SELECT * FROM pg_dist_colocation
4 | 4 | 2 | 23
5 | 2 | 2 | 23
9 | 3 | 2 | 23
10 | 1 | 2 | 23
10 | 1 | 2 | 0
(6 rows)
-- cross check with internal colocation API
@ -796,10 +796,10 @@ ORDER BY
table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881
table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647
table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647
table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300068 | t | 57637 | |
table1_groupf | 1300068 | t | 57638 | |
table2_groupf | 1300069 | t | 57637 | |
table2_groupf | 1300069 | t | 57638 | |
(84 rows)
-- reset colocation ids to test mark_tables_colocated
@ -835,7 +835,7 @@ ERROR: cannot colocate tables table1_groupb and table1_groupe
DETAIL: Shard 1300026 of table1_groupb and shard 1300046 of table1_groupe have different number of shard placements.
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']);
ERROR: cannot colocate tables table1_groupb and table1_groupf
DETAIL: Shard counts don't match for table1_groupb and table1_groupf.
DETAIL: Replication models don't match for table1_groupb and table1_groupf.
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']);
ERROR: cannot colocate tables table1_groupb and table1_groupd
DETAIL: Shard counts don't match for table1_groupb and table1_groupd.
@ -917,7 +917,7 @@ SELECT * FROM pg_dist_colocation
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
6 | 1 | 2 | 23
6 | 1 | 2 | 0
(5 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
@ -963,7 +963,7 @@ SELECT * FROM pg_dist_colocation
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
6 | 1 | 2 | 23
6 | 1 | 2 | 0
(4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition

View File

@ -81,9 +81,9 @@ ERROR: cannot distribute relation "nation"
DETAIL: Relation "nation" contains data.
HINT: Empty your table before distributing it.
TRUNCATE nation;
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
master_create_distributed_table
---------------------------------
SELECT create_reference_table('nation');
create_reference_table
------------------------
(1 row)
@ -113,7 +113,25 @@ CREATE TABLE supplier
s_acctbal decimal(15,2) not null,
s_comment varchar(101) not null
);
SELECT master_create_distributed_table('supplier', 's_suppkey', 'append');
SELECT create_reference_table('supplier');
create_reference_table
------------------------
(1 row)
-- create a single shard supplier table which is not
-- a reference table
CREATE TABLE supplier_single_shard
(
s_suppkey integer not null,
s_name char(25) not null,
s_address varchar(40) not null,
s_nationkey integer,
s_phone char(15) not null,
s_acctbal decimal(15,2) not null,
s_comment varchar(101) not null
);
SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append');
master_create_distributed_table
---------------------------------

View File

@ -493,7 +493,7 @@ Master Query
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -512,7 +512,7 @@ Master Query
-> Seq Scan on pg_merge_job_570021
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -558,14 +558,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
]
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -616,9 +616,22 @@ SELECT true AS valid FROM explain_xml($$
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
-- make sure that EXPLAIN works without
-- problems for queries that inlvolves only
-- reference tables
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation
WHERE n_name = 'CHINA'$$);
t
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation, supplier
WHERE nation.n_nationkey = supplier.s_nationkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -642,8 +655,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Parallel Aware: false
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
Relation Name: "pg_merge_job_570035"
Alias: "pg_merge_job_570035"
-- test parallel aggregates
SET parallel_setup_cost=0;
SET parallel_tuple_cost=0;
@ -659,7 +672,7 @@ Finalize Aggregate
-> Parallel Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Distributed Query into pg_merge_job_570036
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
@ -669,12 +682,12 @@ Distributed Query into pg_merge_job_570037
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037
-> Seq Scan on pg_merge_job_570036
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Distributed Query into pg_merge_job_570038
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
@ -685,11 +698,11 @@ Distributed Query into pg_merge_job_570038
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570038
-> Seq Scan on pg_merge_job_570037
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Distributed Query into pg_merge_job_570039
Distributed Query into pg_merge_job_570038
Executor: Router
Task Count: 1
Tasks Shown: All
@ -702,7 +715,7 @@ Distributed Query into pg_merge_job_570039
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Distributed Query into pg_merge_job_570040
Distributed Query into pg_merge_job_570039
Executor: Real-Time
Task Count: 4
Tasks Shown: One of 4
@ -713,4 +726,4 @@ Distributed Query into pg_merge_job_570040
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570040
-> Seq Scan on pg_merge_job_570039

View File

@ -472,7 +472,7 @@ Master Query
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -491,7 +491,7 @@ Master Query
-> Seq Scan on pg_merge_job_570021
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -534,14 +534,14 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
]
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -589,9 +589,22 @@ SELECT true AS valid FROM explain_xml($$
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
t
-- make sure that EXPLAIN works without
-- problems for queries that inlvolves only
-- reference tables
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation
WHERE n_name = 'CHINA'$$);
t
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation, supplier
WHERE nation.n_nationkey = supplier.s_nationkey$$);
t
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -612,8 +625,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_570036"
Alias: "pg_merge_job_570036"
Relation Name: "pg_merge_job_570035"
Alias: "pg_merge_job_570035"
-- test parallel aggregates
SET parallel_setup_cost=0;
ERROR: unrecognized configuration parameter "parallel_setup_cost"
@ -630,7 +643,7 @@ Aggregate
-> Seq Scan on lineitem_clone
-- ensure distributed plans don't break
EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem;
Distributed Query into pg_merge_job_570037
Distributed Query into pg_merge_job_570036
Executor: Task-Tracker
Task Count: 8
Tasks Shown: One of 8
@ -640,12 +653,12 @@ Distributed Query into pg_merge_job_570037
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570037
-> Seq Scan on pg_merge_job_570036
-- ensure EXPLAIN EXECUTE doesn't crash
PREPARE task_tracker_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query;
Distributed Query into pg_merge_job_570038
Distributed Query into pg_merge_job_570037
Executor: Task-Tracker
Task Count: 4
Tasks Shown: One of 4
@ -656,11 +669,11 @@ Distributed Query into pg_merge_job_570038
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570038
-> Seq Scan on pg_merge_job_570037
SET citus.task_executor_type TO 'real-time';
PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
EXPLAIN EXECUTE router_executor_query;
Distributed Query into pg_merge_job_570039
Distributed Query into pg_merge_job_570038
Executor: Router
Task Count: 1
Tasks Shown: All
@ -673,7 +686,7 @@ Distributed Query into pg_merge_job_570039
PREPARE real_time_executor_query AS
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query;
Distributed Query into pg_merge_job_570040
Distributed Query into pg_merge_job_570039
Executor: Real-Time
Task Count: 4
Tasks Shown: One of 4
@ -684,4 +697,4 @@ Distributed Query into pg_merge_job_570040
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_570040
-> Seq Scan on pg_merge_job_570039

View File

@ -63,6 +63,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -1225,7 +1225,7 @@ FROM
DEBUG: StartTransactionCommand
DEBUG: StartTransaction
DEBUG: name: unnamed; blockState: DEFAULT; state: INPROGR, xid/subid/cid: 0/1/0, nestlvl: 1, children:
ERROR: INSERT target table and the source relation of the SELECT partition column value must be colocated
ERROR: SELECT query should return bare partition column on the same ordinal position as the INSERT's partition column
-- unsupported joins between subqueries
-- we do not return bare partition column on the inner query
INSERT INTO agg_events

View File

@ -80,15 +80,15 @@ DEBUG: assigned task 21 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000025".intermediate_column_1250_0, "pg_merge_job_1250.task_000025".intermediate_column_1250_1, "pg_merge_job_1250.task_000025".intermediate_column_1250_2, "pg_merge_job_1250.task_000025".intermediate_column_1250_3, "pg_merge_job_1250.task_000025".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000025 "pg_merge_job_1250.task_000025" JOIN part_290012 part ON (("pg_merge_job_1250.task_000025".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000025".intermediate_column_1250_0, "pg_merge_job_1250.task_000025".intermediate_column_1250_1, "pg_merge_job_1250.task_000025".intermediate_column_1250_2, "pg_merge_job_1250.task_000025".intermediate_column_1250_3, "pg_merge_job_1250.task_000025".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000025 "pg_merge_job_1250.task_000025" JOIN part_290011 part ON (("pg_merge_job_1250.task_000025".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000034".intermediate_column_1250_0, "pg_merge_job_1250.task_000034".intermediate_column_1250_1, "pg_merge_job_1250.task_000034".intermediate_column_1250_2, "pg_merge_job_1250.task_000034".intermediate_column_1250_3, "pg_merge_job_1250.task_000034".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000034 "pg_merge_job_1250.task_000034" JOIN part_280002 part ON (("pg_merge_job_1250.task_000034".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 25
DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 34
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000]

File diff suppressed because it is too large Load Diff

View File

@ -262,7 +262,7 @@ CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1;
-- Now try with 2pc off
RESET citus.multi_shard_commit_protocol;
@ -272,7 +272,7 @@ CREATE INDEX single_index_3 ON single_shard_items(name);
COMMIT;
-- The block should have committed with a warning
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1;
\c - - - :worker_2_port
DROP EVENT TRIGGER log_ddl_tag;

View File

@ -18,4 +18,5 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy customer FROM '@abs_srcdir@/data/customer.1.data' with delimiter '|'
\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|'
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'

View File

@ -0,0 +1,451 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1260000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1260000;
SET citus.log_multi_join_order to true;
SET client_min_messages TO LOG;
SET citus.shard_count TO 4;
CREATE TABLE multi_outer_join_left_hash
(
l_custkey integer not null,
l_name varchar(25) not null,
l_address varchar(40) not null,
l_nationkey integer not null,
l_phone char(15) not null,
l_acctbal decimal(15,2) not null,
l_mktsegment char(10) not null,
l_comment varchar(117) not null
);
SELECT create_distributed_table('multi_outer_join_left_hash', 'l_custkey');
CREATE TABLE multi_outer_join_right_reference
(
r_custkey integer not null,
r_name varchar(25) not null,
r_address varchar(40) not null,
r_nationkey integer not null,
r_phone char(15) not null,
r_acctbal decimal(15,2) not null,
r_mktsegment char(10) not null,
r_comment varchar(117) not null
);
SELECT create_reference_table('multi_outer_join_right_reference');
CREATE TABLE multi_outer_join_third_reference
(
t_custkey integer not null,
t_name varchar(25) not null,
t_address varchar(40) not null,
t_nationkey integer not null,
t_phone char(15) not null,
t_acctbal decimal(15,2) not null,
t_mktsegment char(10) not null,
t_comment varchar(117) not null
);
SELECT create_reference_table('multi_outer_join_third_reference');
CREATE TABLE multi_outer_join_right_hash
(
r_custkey integer not null,
r_name varchar(25) not null,
r_address varchar(40) not null,
r_nationkey integer not null,
r_phone char(15) not null,
r_acctbal decimal(15,2) not null,
r_mktsegment char(10) not null,
r_comment varchar(117) not null
);
SELECT create_distributed_table('multi_outer_join_right_hash', 'r_custkey');
-- Make sure we do not crash if both tables are emmpty
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_third_reference b ON (l_custkey = t_custkey);
-- Left table is a large table
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-1-10.data' with delimiter '|'
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
-- Right table is a small table
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
-- Make sure we do not crash if one table has data
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_third_reference b ON (l_custkey = t_custkey);
SELECT
min(t_custkey), max(t_custkey)
FROM
multi_outer_join_third_reference a LEFT JOIN multi_outer_join_right_reference b ON (r_custkey = t_custkey);
-- Third table is a single shard table with all data
\copy multi_outer_join_third_reference FROM '@abs_srcdir@/data/customer-1-30.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-1-30.data' with delimiter '|'
-- Regular outer join should return results for all rows
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
-- Since this is a broadcast join, we should be able to join on any key
SELECT
count(*)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_nationkey = r_nationkey);
-- Anti-join should return customers for which there is no row in the right table
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL;
-- Partial anti-join with specific value
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL OR r_custkey = 5;
-- This query is an INNER JOIN in disguise since there cannot be NULL results
-- Added extra filter to make query not router plannable
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey = 5 or r_custkey > 15;
-- Apply a filter before the join
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = 5);
-- Apply a filter before the join (no matches right)
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = -1 /* nonexistant */);
-- Apply a filter before the join (no matches left)
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND l_custkey = -1 /* nonexistant */);
-- Right join should be disallowed in this case
SELECT
min(r_custkey), max(r_custkey)
FROM
multi_outer_join_left_hash a RIGHT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
-- Reverse right join should be same as left join
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_right_reference a RIGHT JOIN multi_outer_join_left_hash b ON (l_custkey = r_custkey);
-- load some more data
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- Update shards so that they do not have 1-1 matching. We should error here.
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006;
-- empty tables
SELECT master_modify_multiple_shards('DELETE FROM multi_outer_join_left_hash');
SELECT master_modify_multiple_shards('DELETE FROM multi_outer_join_right_hash');
DELETE FROM multi_outer_join_right_reference;
-- reload shards with 1-1 matching
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- multi_outer_join_third_reference is a single shard table
-- Regular left join should work as expected
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
-- Citus can use broadcast join here
SELECT
count(*)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_nationkey = r_nationkey);
-- Anti-join should return customers for which there is no row in the right table
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL;
-- Partial anti-join with specific value (5, 11-15)
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL OR r_custkey = 15;
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
-- Added extra filter to make query not router plannable
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey = 21 or r_custkey < 10;
-- Apply a filter before the join
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = 21);
-- Right join should not be allowed in this case
SELECT
min(r_custkey), max(r_custkey)
FROM
multi_outer_join_left_hash a RIGHT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
-- Reverse right join should be same as left join
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_right_reference a RIGHT JOIN multi_outer_join_left_hash b ON (l_custkey = r_custkey);
-- complex query tree should error out
SELECT
*
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_right_reference r2 ON (l1.l_custkey = r2.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l2 ON (r2.r_custkey = l2.l_custkey);
-- add an anti-join, this should also error out
SELECT
*
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_right_reference r2 ON (l1.l_custkey = r2.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l2 ON (r2.r_custkey = l2.l_custkey)
WHERE
r1.r_custkey is NULL;
-- Three way join 2-1-1 (broadcast + broadcast join) should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1;
-- Right join with single shard right most table should error out
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey);
-- Right join with single shard left most table should work
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey)
ORDER BY 1,2,3;
-- Make it anti-join, should display values with l_custkey is null
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey)
WHERE
l_custkey is NULL
ORDER BY 1;
-- Cascading right join with single shard left most table should error out
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey);
-- full outer join should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2;
-- full outer join + anti (right) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
r_custkey is NULL
ORDER BY 1;
-- full outer join + anti (left) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
l_custkey is NULL
ORDER BY 2;
-- full outer join + anti (both) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
l_custkey is NULL or r_custkey is NULL
ORDER BY 1,2 DESC;
-- full outer join should error out for mismatched shards
SELECT
l_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey);
-- inner join + single shard left join should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1;
-- inner (broadcast) join + 2 shards left (local) join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2,3;
-- inner (local) join + 2 shards left (dual partition) join should error out
SELECT
t_custkey, l_custkey, r_custkey
FROM
multi_outer_join_third_reference t1
INNER JOIN multi_outer_join_left_hash l1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey);
-- inner (local) join + 2 shards left (dual partition) join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2,3;
-- inner (broadcast) join + 2 shards left (local) + anti join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
r_custkey is NULL
ORDER BY 1;
-- Test joinExpr aliases by performing an outer-join.
SELECT
t_custkey
FROM
(multi_outer_join_right_hash r1
LEFT OUTER JOIN multi_outer_join_left_hash l1 ON (l1.l_custkey = r1.r_custkey)) AS
test(c_custkey, c_nationkey)
INNER JOIN multi_outer_join_third_reference t1 ON (test.c_custkey = t1.t_custkey)
ORDER BY 1;
-- flattened out subqueries with outer joins are not supported
SELECT
l1.l_custkey,
count(*) as cnt
FROM (
SELECT l_custkey, l_nationkey
FROM multi_outer_join_left_hash
WHERE l_comment like '%a%'
) l1
LEFT JOIN (
SELECT r_custkey, r_name
FROM multi_outer_join_right_reference
WHERE r_comment like '%b%'
) l2 ON l1.l_custkey = l2.r_custkey
GROUP BY l1.l_custkey
ORDER BY cnt DESC, l1.l_custkey DESC
LIMIT 20;
-- full join among reference tables should go thourgh router planner
SELECT
t_custkey, r_custkey
FROM
multi_outer_join_right_reference FULL JOIN
multi_outer_join_third_reference ON (t_custkey = r_custkey);

View File

@ -38,6 +38,8 @@ test: multi_deparse_shard_query
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
test: multi_explain
test: multi_subquery
test: multi_reference_table
test: multi_outer_join_reference
test: multi_single_relation_subquery
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause multi_limit_clause_approximate
test: multi_average_expression multi_working_columns

View File

@ -561,7 +561,7 @@ DETAIL: Key (command)=(CREATE INDEX) already exists.
CONTEXT: while executing command on localhost:57638
ERROR: failure on connection marked as essential: localhost:57638
-- Nothing from the block should have committed
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1;
indexname | tablename
-----------+-----------
(0 rows)
@ -579,7 +579,7 @@ DETAIL: Key (command)=(CREATE INDEX) already exists.
CONTEXT: while executing command on localhost:57638
WARNING: failed to commit critical transaction on localhost:57638, metadata is likely out of sync
-- The block should have committed with a warning
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items' ORDER BY 1;
indexname | tablename
----------------+--------------------
single_index_2 | single_shard_items

View File

@ -134,7 +134,7 @@ FROM
pg_dist_shard
WHERE 'multi_append_table_to_shard_right_hash'::regclass::oid = logicalrelid;
ERROR: cannot append to shardId 230000
DETAIL: We currently don't support appending to shards in hash-partitioned tables
DETAIL: We currently don't support appending to shards in hash-partitioned or reference tables
-- Clean up after test
SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_right');
master_apply_delete_command

View File

@ -15,3 +15,4 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 290000;
\copy nation FROM '@abs_srcdir@/data/nation.data' with delimiter '|'
\copy part FROM '@abs_srcdir@/data/part.data' with delimiter '|'
\copy supplier FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'
\copy supplier_single_shard FROM '@abs_srcdir@/data/supplier.data' with delimiter '|'

View File

@ -0,0 +1,836 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1260000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1260000;
SET citus.log_multi_join_order to true;
SET client_min_messages TO LOG;
SET citus.shard_count TO 4;
CREATE TABLE multi_outer_join_left_hash
(
l_custkey integer not null,
l_name varchar(25) not null,
l_address varchar(40) not null,
l_nationkey integer not null,
l_phone char(15) not null,
l_acctbal decimal(15,2) not null,
l_mktsegment char(10) not null,
l_comment varchar(117) not null
);
SELECT create_distributed_table('multi_outer_join_left_hash', 'l_custkey');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE multi_outer_join_right_reference
(
r_custkey integer not null,
r_name varchar(25) not null,
r_address varchar(40) not null,
r_nationkey integer not null,
r_phone char(15) not null,
r_acctbal decimal(15,2) not null,
r_mktsegment char(10) not null,
r_comment varchar(117) not null
);
SELECT create_reference_table('multi_outer_join_right_reference');
create_reference_table
------------------------
(1 row)
CREATE TABLE multi_outer_join_third_reference
(
t_custkey integer not null,
t_name varchar(25) not null,
t_address varchar(40) not null,
t_nationkey integer not null,
t_phone char(15) not null,
t_acctbal decimal(15,2) not null,
t_mktsegment char(10) not null,
t_comment varchar(117) not null
);
SELECT create_reference_table('multi_outer_join_third_reference');
create_reference_table
------------------------
(1 row)
CREATE TABLE multi_outer_join_right_hash
(
r_custkey integer not null,
r_name varchar(25) not null,
r_address varchar(40) not null,
r_nationkey integer not null,
r_phone char(15) not null,
r_acctbal decimal(15,2) not null,
r_mktsegment char(10) not null,
r_comment varchar(117) not null
);
SELECT create_distributed_table('multi_outer_join_right_hash', 'r_custkey');
create_distributed_table
--------------------------
(1 row)
-- Make sure we do not crash if both tables are emmpty
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_third_reference b ON (l_custkey = t_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ]
min | max
-----+-----
|
(1 row)
-- Left table is a large table
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-1-10.data' with delimiter '|'
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
-- Right table is a small table
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
-- Make sure we do not crash if one table has data
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_third_reference b ON (l_custkey = t_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ]
min | max
-----+-----
1 | 20
(1 row)
SELECT
min(t_custkey), max(t_custkey)
FROM
multi_outer_join_third_reference a LEFT JOIN multi_outer_join_right_reference b ON (r_custkey = t_custkey);
min | max
-----+-----
|
(1 row)
-- Third table is a single shard table with all data
\copy multi_outer_join_third_reference FROM '@abs_srcdir@/data/customer-1-30.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-1-30.data' with delimiter '|'
-- Regular outer join should return results for all rows
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
1 | 20
(1 row)
-- Since this is a broadcast join, we should be able to join on any key
SELECT
count(*)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_nationkey = r_nationkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
count
-------
28
(1 row)
-- Anti-join should return customers for which there is no row in the right table
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
16 | 20
(1 row)
-- Partial anti-join with specific value
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL OR r_custkey = 5;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
5 | 20
(1 row)
-- This query is an INNER JOIN in disguise since there cannot be NULL results
-- Added extra filter to make query not router plannable
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey = 5 or r_custkey > 15;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
5 | 5
(1 row)
-- Apply a filter before the join
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = 5);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
count | count
-------+-------
20 | 1
(1 row)
-- Apply a filter before the join (no matches right)
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = -1 /* nonexistant */);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
count | count
-------+-------
20 | 0
(1 row)
-- Apply a filter before the join (no matches left)
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND l_custkey = -1 /* nonexistant */);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
count | count
-------+-------
20 | 0
(1 row)
-- Right join should be disallowed in this case
SELECT
min(r_custkey), max(r_custkey)
FROM
multi_outer_join_left_hash a RIGHT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
-- Reverse right join should be same as left join
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_right_reference a RIGHT JOIN multi_outer_join_left_hash b ON (l_custkey = r_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
1 | 20
(1 row)
-- load some more data
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- Update shards so that they do not have 1-1 matching. We should error here.
UPDATE pg_dist_shard SET shardminvalue = '2147483646' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '2147483647' WHERE shardid = 1260006;
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
UPDATE pg_dist_shard SET shardminvalue = '-2147483648' WHERE shardid = 1260006;
UPDATE pg_dist_shard SET shardmaxvalue = '-1073741825' WHERE shardid = 1260006;
-- empty tables
SELECT master_modify_multiple_shards('DELETE FROM multi_outer_join_left_hash');
master_modify_multiple_shards
-------------------------------
20
(1 row)
SELECT master_modify_multiple_shards('DELETE FROM multi_outer_join_right_hash');
master_modify_multiple_shards
-------------------------------
45
(1 row)
DELETE FROM multi_outer_join_right_reference;
-- reload shards with 1-1 matching
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-1-15.data' with delimiter '|'
\copy multi_outer_join_left_hash FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
\copy multi_outer_join_right_reference FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-11-20.data' with delimiter '|'
\copy multi_outer_join_right_hash FROM '@abs_srcdir@/data/customer-21-30.data' with delimiter '|'
-- multi_outer_join_third_reference is a single shard table
-- Regular left join should work as expected
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_custkey = r_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ]
min | max
-----+-----
1 | 30
(1 row)
-- Citus can use broadcast join here
SELECT
count(*)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_hash b ON (l_nationkey = r_nationkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
-- Anti-join should return customers for which there is no row in the right table
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
1 | 10
(1 row)
-- Partial anti-join with specific value (5, 11-15)
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey IS NULL OR r_custkey = 15;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
1 | 15
(1 row)
-- This query is an INNER JOIN in disguise since there cannot be NULL results (21)
-- Added extra filter to make query not router plannable
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey)
WHERE
r_custkey = 21 or r_custkey < 10;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
21 | 21
(1 row)
-- Apply a filter before the join
SELECT
count(l_custkey), count(r_custkey)
FROM
multi_outer_join_left_hash a LEFT JOIN multi_outer_join_right_reference b
ON (l_custkey = r_custkey AND r_custkey = 21);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
count | count
-------+-------
25 | 1
(1 row)
-- Right join should not be allowed in this case
SELECT
min(r_custkey), max(r_custkey)
FROM
multi_outer_join_left_hash a RIGHT JOIN multi_outer_join_right_reference b ON (l_custkey = r_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
-- Reverse right join should be same as left join
SELECT
min(l_custkey), max(l_custkey)
FROM
multi_outer_join_right_reference a RIGHT JOIN multi_outer_join_left_hash b ON (l_custkey = r_custkey);
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ]
min | max
-----+-----
1 | 30
(1 row)
-- complex query tree should error out
SELECT
*
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_right_reference r2 ON (l1.l_custkey = r2.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l2 ON (r2.r_custkey = l2.l_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Complex join orders are currently unsupported
-- add an anti-join, this should also error out
SELECT
*
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_right_reference r2 ON (l1.l_custkey = r2.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l2 ON (r2.r_custkey = l2.l_custkey)
WHERE
r1.r_custkey is NULL;
ERROR: cannot perform distributed planning on this query
DETAIL: Complex join orders are currently unsupported
-- Three way join 2-1-1 (broadcast + broadcast join) should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_right_reference" ][ broadcast join "multi_outer_join_third_reference" ]
l_custkey | r_custkey | t_custkey
-----------+-----------+-----------
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- Right join with single shard right most table should error out
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Complex join orders are currently unsupported
-- Right join with single shard left most table should work
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey)
ORDER BY 1,2,3;
LOG: join order: [ "multi_outer_join_right_hash" ][ broadcast join "multi_outer_join_third_reference" ][ local partition join "multi_outer_join_left_hash" ]
t_custkey | r_custkey | l_custkey
-----------+-----------+-----------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
16 | 16 |
17 | 17 |
18 | 18 |
19 | 19 |
20 | 20 |
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(20 rows)
-- Make it anti-join, should display values with l_custkey is null
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey)
WHERE
l_custkey is NULL
ORDER BY 1;
LOG: join order: [ "multi_outer_join_right_hash" ][ broadcast join "multi_outer_join_third_reference" ][ local partition join "multi_outer_join_left_hash" ]
t_custkey | r_custkey | l_custkey
-----------+-----------+-----------
16 | 16 |
17 | 17 |
18 | 18 |
19 | 19 |
20 | 20 |
(5 rows)
-- Cascading right join with single shard left most table should error out
SELECT
t_custkey, r_custkey, l_custkey
FROM
multi_outer_join_third_reference t1
RIGHT JOIN multi_outer_join_right_hash r1 ON (t1.t_custkey = r1.r_custkey)
RIGHT JOIN multi_outer_join_left_hash l1 ON (r1.r_custkey = l1.l_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Complex join orders are currently unsupported
-- full outer join should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2;
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | r_custkey
-----------+-----------
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 | 11
12 | 12
13 | 13
14 | 14
15 | 15
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
| 16
| 17
| 18
| 19
| 20
(30 rows)
-- full outer join + anti (right) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
r_custkey is NULL
ORDER BY 1;
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | r_custkey
-----------+-----------
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
(10 rows)
-- full outer join + anti (left) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
l_custkey is NULL
ORDER BY 2;
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | r_custkey
-----------+-----------
| 16
| 17
| 18
| 19
| 20
(5 rows)
-- full outer join + anti (both) should work with 1-1 matched shards
SELECT
l_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
l_custkey is NULL or r_custkey is NULL
ORDER BY 1,2 DESC;
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | r_custkey
-----------+-----------
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
| 20
| 19
| 18
| 17
| 16
(15 rows)
-- full outer join should error out for mismatched shards
SELECT
l_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
FULL JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
-- inner join + single shard left join should work
SELECT
l_custkey, r_custkey, t_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
LEFT JOIN multi_outer_join_third_reference t1 ON (r1.r_custkey = t1.t_custkey)
ORDER BY 1;
LOG: join order: [ "multi_outer_join_left_hash" ][ local partition join "multi_outer_join_right_hash" ][ broadcast join "multi_outer_join_third_reference" ]
l_custkey | r_custkey | t_custkey
-----------+-----------+-----------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(15 rows)
-- inner (broadcast) join + 2 shards left (local) join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2,3;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | t_custkey | r_custkey
-----------+-----------+-----------
1 | 1 |
2 | 2 |
3 | 3 |
4 | 4 |
5 | 5 |
6 | 6 |
7 | 7 |
8 | 8 |
9 | 9 |
10 | 10 |
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- inner (local) join + 2 shards left (dual partition) join should error out
SELECT
t_custkey, l_custkey, r_custkey
FROM
multi_outer_join_third_reference t1
INNER JOIN multi_outer_join_left_hash l1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_reference r1 ON (l1.l_custkey = r1.r_custkey);
ERROR: cannot perform distributed planning on this query
DETAIL: Shards of relations in outer join queries must have 1-to-1 shard partitioning
-- inner (local) join + 2 shards left (dual partition) join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
ORDER BY 1,2,3;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | t_custkey | r_custkey
-----------+-----------+-----------
1 | 1 |
2 | 2 |
3 | 3 |
4 | 4 |
5 | 5 |
6 | 6 |
7 | 7 |
8 | 8 |
9 | 9 |
10 | 10 |
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- inner (broadcast) join + 2 shards left (local) + anti join should work
SELECT
l_custkey, t_custkey, r_custkey
FROM
multi_outer_join_left_hash l1
INNER JOIN multi_outer_join_third_reference t1 ON (l1.l_custkey = t1.t_custkey)
LEFT JOIN multi_outer_join_right_hash r1 ON (l1.l_custkey = r1.r_custkey)
WHERE
r_custkey is NULL
ORDER BY 1;
LOG: join order: [ "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ][ local partition join "multi_outer_join_right_hash" ]
l_custkey | t_custkey | r_custkey
-----------+-----------+-----------
1 | 1 |
2 | 2 |
3 | 3 |
4 | 4 |
5 | 5 |
6 | 6 |
7 | 7 |
8 | 8 |
9 | 9 |
10 | 10 |
(10 rows)
-- Test joinExpr aliases by performing an outer-join.
SELECT
t_custkey
FROM
(multi_outer_join_right_hash r1
LEFT OUTER JOIN multi_outer_join_left_hash l1 ON (l1.l_custkey = r1.r_custkey)) AS
test(c_custkey, c_nationkey)
INNER JOIN multi_outer_join_third_reference t1 ON (test.c_custkey = t1.t_custkey)
ORDER BY 1;
LOG: join order: [ "multi_outer_join_right_hash" ][ local partition join "multi_outer_join_left_hash" ][ broadcast join "multi_outer_join_third_reference" ]
t_custkey
-----------
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
(20 rows)
-- flattened out subqueries with outer joins are not supported
SELECT
l1.l_custkey,
count(*) as cnt
FROM (
SELECT l_custkey, l_nationkey
FROM multi_outer_join_left_hash
WHERE l_comment like '%a%'
) l1
LEFT JOIN (
SELECT r_custkey, r_name
FROM multi_outer_join_right_reference
WHERE r_comment like '%b%'
) l2 ON l1.l_custkey = l2.r_custkey
GROUP BY l1.l_custkey
ORDER BY cnt DESC, l1.l_custkey DESC
LIMIT 20;
ERROR: cannot perform distributed planning on this query
DETAIL: Subqueries in outer joins are not supported
-- full join among reference tables should go thourgh router planner
SELECT
t_custkey, r_custkey
FROM
multi_outer_join_right_reference FULL JOIN
multi_outer_join_third_reference ON (t_custkey = r_custkey);
t_custkey | r_custkey
-----------+-----------
11 | 11
12 | 12
13 | 13
14 | 14
15 | 15
16 | 16
17 | 17
18 | 18
19 | 19
20 | 20
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
10 |
2 |
5 |
8 |
6 |
4 |
1 |
3 |
9 |
7 |
(30 rows)

View File

@ -3,6 +3,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- Tests functions related to cluster membership
-- before starting the test, lets try to create reference table and see a
-- meaningful error
CREATE TABLE test_reference_table (y int primary key, name text);
SELECT create_reference_table('test_reference_table');
-- add the nodes to the cluster
SELECT master_add_node('localhost', :worker_1_port);
SELECT master_add_node('localhost', :worker_2_port);

View File

@ -72,7 +72,7 @@ SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
TRUNCATE nation;
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
SELECT create_reference_table('nation');
CREATE TABLE part (
p_partkey integer not null,
@ -96,4 +96,18 @@ CREATE TABLE supplier
s_acctbal decimal(15,2) not null,
s_comment varchar(101) not null
);
SELECT master_create_distributed_table('supplier', 's_suppkey', 'append');
SELECT create_reference_table('supplier');
-- create a single shard supplier table which is not
-- a reference table
CREATE TABLE supplier_single_shard
(
s_suppkey integer not null,
s_name char(25) not null,
s_address varchar(40) not null,
s_nationkey integer,
s_phone char(15) not null,
s_acctbal decimal(15,2) not null,
s_comment varchar(101) not null
);
SELECT master_create_distributed_table('supplier_single_shard', 's_suppkey', 'append');

View File

@ -2,7 +2,6 @@
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 570000;
@ -148,28 +147,28 @@ SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
SELECT true AS valid FROM explain_json($$
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
@ -181,9 +180,23 @@ SELECT true AS valid FROM explain_xml($$
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey$$);
-- make sure that EXPLAIN works without
-- problems for queries that inlvolves only
-- reference tables
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation
WHERE n_name = 'CHINA'$$);
SELECT true AS valid FROM explain_xml($$
SELECT count(*)
FROM nation, supplier
WHERE nation.n_nationkey = supplier.s_nationkey$$);
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT count(*)
FROM lineitem, orders, customer, supplier
FROM lineitem, orders, customer, supplier_single_shard
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;

View File

@ -63,6 +63,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)

View File

@ -0,0 +1,990 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
-- insert some data, and make sure that cannot be create_distributed_table
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05');
-- should error out given that there exists data
SELECT create_reference_table('reference_table_test');
TRUNCATE reference_table_test;
-- now should be able to create the reference table
SELECT create_reference_table('reference_table_test');
-- see that partkey is NULL
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'reference_table_test'::regclass;
-- now see that shard min/max values are NULL
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'reference_table_test'::regclass;
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass);
-- now, execute some modification queries
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05');
-- most of the queries in this file are already tested on multi_router_planner.sql
-- However, for the sake of completeness we need to run similar tests with
-- reference tables as well
-- run some queries on top of the data
SELECT
*
FROM
reference_table_test;
SELECT
*
FROM
reference_table_test
WHERE
value_1 = 1;
SELECT
value_1,
value_2
FROM
reference_table_test
ORDER BY
2 ASC LIMIT 3;
SELECT
value_1, value_3
FROM
reference_table_test
WHERE
value_2 >= 4
ORDER BY
2 LIMIT 3;
SELECT
value_1, 15 * value_2
FROM
reference_table_test
ORDER BY
2 ASC
LIMIT 2;
SELECT
value_1, 15 * value_2
FROM
reference_table_test
ORDER BY
2 ASC LIMIT 2 OFFSET 2;
SELECT
value_2, value_4
FROM
reference_table_test
WHERE
value_2 = 2 OR value_2 = 3;
SELECT
value_2, value_4
FROM
reference_table_test
WHERE
value_2 = 2 AND value_2 = 3;
SELECT
value_2, value_4
FROM
reference_table_test
WHERE
value_3 = '2' OR value_1 = 3;
SELECT
value_2, value_4
FROM
reference_table_test
WHERE
(
value_3 = '2' OR value_1 = 3
)
AND FALSE;
SELECT
*
FROM
reference_table_test
WHERE
value_2 IN
(
SELECT
value_3::FLOAT
FROM
reference_table_test
)
AND value_1 < 3;
SELECT
value_4
FROM
reference_table_test
WHERE
value_3 IN
(
'1', '2'
);
SELECT
date_part('day', value_4)
FROM
reference_table_test
WHERE
value_3 IN
(
'5', '2'
);
SELECT
value_4
FROM
reference_table_test
WHERE
value_2 <= 2 AND value_2 >= 4;
SELECT
value_4
FROM
reference_table_test
WHERE
value_2 <= 20 AND value_2 >= 4;
SELECT
value_4
FROM
reference_table_test
WHERE
value_2 >= 5 AND value_2 <= random();
SELECT
value_1
FROM
reference_table_test
WHERE
value_4 BETWEEN '2016-12-01' AND '2016-12-03';
SELECT
value_1
FROM
reference_table_test
WHERE
FALSE;
SELECT
value_1
FROM
reference_table_test
WHERE
int4eq(1, 2);
-- rename output name and do some operations
SELECT
value_1 as id, value_2 * 15 as age
FROM
reference_table_test;
-- queries with CTEs are supported
WITH some_data AS ( SELECT value_2, value_4 FROM reference_table_test WHERE value_2 >=3)
SELECT
*
FROM
some_data;
-- queries with CTEs are supported even if CTE is not referenced inside query
WITH some_data AS ( SELECT value_2, value_4 FROM reference_table_test WHERE value_2 >=3)
SELECT * FROM reference_table_test ORDER BY 1 LIMIT 1;
-- queries which involve functions in FROM clause are supported if it goes to a single worker.
SELECT
*
FROM
reference_table_test, position('om' in 'Thomas')
WHERE
value_1 = 1;
SELECT
*
FROM
reference_table_test, position('om' in 'Thomas')
WHERE
value_1 = 1 OR value_1 = 2;
-- set operations are supported
(SELECT * FROM reference_table_test WHERE value_1 = 1)
UNION
(SELECT * FROM reference_table_test WHERE value_1 = 3);
(SELECT * FROM reference_table_test WHERE value_1 = 1)
EXCEPT
(SELECT * FROM reference_table_test WHERE value_1 = 3);
(SELECT * FROM reference_table_test WHERE value_1 = 1)
INTERSECT
(SELECT * FROM reference_table_test WHERE value_1 = 3);
-- to make the tests more interested for aggregation tests, ingest some more data
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
-- some aggregations
SELECT
value_4, SUM(value_2)
FROM
reference_table_test
GROUP BY
value_4
HAVING
SUM(value_2) > 3
ORDER BY
1;
SELECT
value_4,
value_3,
SUM(value_2)
FROM
reference_table_test
GROUP BY
GROUPING sets ((value_4), (value_3))
ORDER BY 1, 2, 3;
-- distinct clauses also work fine
SELECT DISTINCT
value_4
FROM
reference_table_test
ORDER BY
1;
-- window functions are also supported
SELECT
value_4, RANK() OVER (PARTITION BY value_1 ORDER BY value_4)
FROM
reference_table_test;
-- window functions are also supported
SELECT
value_4, AVG(value_1) OVER (PARTITION BY value_4 ORDER BY value_4)
FROM
reference_table_test;
SELECT
count(DISTINCT CASE
WHEN
value_2 >= 3
THEN
value_2
ELSE
NULL
END) as c
FROM
reference_table_test;
SELECT
value_1,
count(DISTINCT CASE
WHEN
value_2 >= 3
THEN
value_2
ELSE
NULL
END) as c
FROM
reference_table_test
GROUP BY
value_1
ORDER BY
1;
-- selects inside a transaction works fine as well
BEGIN;
SELECT * FROM reference_table_test;
SELECT * FROM reference_table_test WHERE value_1 = 1;
END;
-- cursor queries also works fine
BEGIN;
DECLARE test_cursor CURSOR FOR
SELECT *
FROM reference_table_test
WHERE value_1 = 1 OR value_1 = 2
ORDER BY value_1;
FETCH test_cursor;
FETCH ALL test_cursor;
FETCH test_cursor; -- fetch one row after the last
END;
-- table creation queries inside can be router plannable
CREATE TEMP TABLE temp_reference_test as
SELECT *
FROM reference_table_test
WHERE value_1 = 1;
-- all kinds of joins are supported among reference tables
-- first create two more tables
CREATE TABLE reference_table_test_second (value_1 int, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_second');
CREATE TABLE reference_table_test_third (value_1 int, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_third');
-- ingest some data to both tables
INSERT INTO reference_table_test_second VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test_second VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test_second VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test_third VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test_third VALUES (5, 5.0, '5', '2016-12-05');
-- some very basic tests
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2
WHERE
t1.value_2 = t2.value_2
ORDER BY
1;
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_third t3
WHERE
t1.value_2 = t3.value_2
ORDER BY
1;
SELECT
DISTINCT t2.value_1
FROM
reference_table_test_second t2, reference_table_test_third t3
WHERE
t2.value_2 = t3.value_2
ORDER BY
1;
-- join on different columns and different data types via casts
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2
WHERE
t1.value_2 = t2.value_1
ORDER BY
1;
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2
WHERE
t1.value_2 = t2.value_3::int
ORDER BY
1;
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2
WHERE
t1.value_2 = date_part('day', t2.value_4)
ORDER BY
1;
-- ingest a common row to see more meaningful results with joins involving 3 tables
INSERT INTO reference_table_test_third VALUES (3, 3.0, '3', '2016-12-03');
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2, reference_table_test_third t3
WHERE
t1.value_2 = date_part('day', t2.value_4) AND t3.value_2 = t1.value_2
ORDER BY
1;
-- same query on different columns
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1, reference_table_test_second t2, reference_table_test_third t3
WHERE
t1.value_1 = date_part('day', t2.value_4) AND t3.value_2 = t1.value_1
ORDER BY
1;
-- with the JOIN syntax
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1 JOIN reference_table_test_second t2 USING (value_1)
JOIN reference_table_test_third t3 USING (value_1)
ORDER BY
1;
-- and left/right joins
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1 LEFT JOIN reference_table_test_second t2 USING (value_1)
LEFT JOIN reference_table_test_third t3 USING (value_1)
ORDER BY
1;
SELECT
DISTINCT t1.value_1
FROM
reference_table_test t1 RIGHT JOIN reference_table_test_second t2 USING (value_1)
RIGHT JOIN reference_table_test_third t3 USING (value_1)
ORDER BY
1;
-- now, lets have some tests on UPSERTs and uniquness
CREATE TABLE reference_table_test_fourth (value_1 int, value_2 float PRIMARY KEY, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_fourth');
-- insert a row
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
-- now get the unique key violation
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01');
-- now get null constraint violation due to primary key
INSERT INTO reference_table_test_fourth (value_1, value_3, value_4) VALUES (1, '1.0', '2016-12-01');
-- lets run some upserts
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '1', '2016-12-01') ON CONFLICT DO NOTHING RETURNING *;
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON CONFLICT (value_2) DO
UPDATE SET value_3 = EXCLUDED.value_3, value_2 = EXCLUDED.value_2
RETURNING *;
-- update all columns
INSERT INTO reference_table_test_fourth VALUES (1, 1.0, '10', '2016-12-01') ON CONFLICT (value_2) DO
UPDATE SET value_3 = EXCLUDED.value_3 || '+10', value_2 = EXCLUDED.value_2 + 10, value_1 = EXCLUDED.value_1 + 10, value_4 = '2016-12-10'
RETURNING *;
-- finally see that shard healths are OK
SELECT
shardid, shardstate, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test_fourth'::regclass);
-- let's not run some update/delete queries on arbitrary columns
DELETE FROM
reference_table_test
WHERE
value_1 = 1
RETURNING *;
DELETE FROM
reference_table_test
WHERE
value_4 = '2016-12-05'
RETURNING *;
UPDATE
reference_table_test
SET
value_2 = 15
WHERE
value_2 = 2
RETURNING *;
-- and some queries without any filters
UPDATE
reference_table_test
SET
value_2 = 15, value_1 = 45
RETURNING *;
DELETE FROM
reference_table_test
RETURNING *;
-- some tests with function evaluation and sequences
CREATE TABLE reference_table_test_fifth (value_1 serial PRIMARY KEY, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_fifth');
CREATE SEQUENCE example_ref_value_seq;
-- see that sequences work as expected
INSERT INTO
reference_table_test_fifth (value_2) VALUES (2)
RETURNING value_1, value_2;
INSERT INTO
reference_table_test_fifth (value_2) VALUES (2)
RETURNING value_1, value_2;
INSERT INTO
reference_table_test_fifth (value_2, value_3) VALUES (nextval('example_ref_value_seq'), nextval('example_ref_value_seq')::text)
RETURNING value_1, value_2, value_3;
UPDATE
reference_table_test_fifth SET value_4 = now()
WHERE
value_1 = 1
RETURNING value_1, value_2, value_4 > '2000-01-01';
-- test copying FROM / TO
-- first delete all the data
DELETE FROM
reference_table_test;
COPY reference_table_test FROM STDIN WITH CSV;
1,1.0,1,2016-01-01
\.
COPY reference_table_test (value_2, value_3, value_4) FROM STDIN WITH CSV;
2.0,2,2016-01-02
\.
COPY reference_table_test (value_3) FROM STDIN WITH CSV;
3
\.
COPY reference_table_test FROM STDIN WITH CSV;
,,,
\.
COPY reference_table_test TO STDOUT WITH CSV;
-- INSERT INTO SELECT among reference tables
DELETE FROM
reference_table_test_second;
INSERT INTO
reference_table_test_second
SELECT
*
FROM
reference_table_test
RETURNING *;
INSERT INTO
reference_table_test_second (value_2)
SELECT
reference_table_test.value_2
FROM
reference_table_test JOIN reference_table_test_second USING (value_1)
RETURNING *;
SET citus.shard_count TO 6;
SET citus.shard_replication_factor TO 2;
CREATE TABLE colocated_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
SELECT create_distributed_table('colocated_table_test', 'value_1');
CREATE TABLE colocated_table_test_2 (value_1 int, value_2 float, value_3 text, value_4 timestamp);
SELECT create_distributed_table('colocated_table_test_2', 'value_1');
DELETE FROM reference_table_test;
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO colocated_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO colocated_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO colocated_table_test_2 VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO colocated_table_test_2 VALUES (2, 2.0, '2', '2016-12-02');
SET client_min_messages TO DEBUG1;
SET citus.log_multi_join_order TO TRUE;
SELECT
reference_table_test.value_1
FROM
reference_table_test, colocated_table_test
WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test
WHERE
colocated_table_test.value_2 = reference_table_test.value_2;
SELECT
colocated_table_test.value_2
FROM
colocated_table_test, reference_table_test
WHERE
reference_table_test.value_1 = colocated_table_test.value_1;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_2 = reference_table_test.value_2;
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_1 = colocated_table_test_2.value_1 AND colocated_table_test.value_2 = reference_table_test.value_2;
SET citus.task_executor_type to "task-tracker";
SELECT
colocated_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_2 = colocated_table_test_2.value_2 AND colocated_table_test.value_2 = reference_table_test.value_2;
SELECT
reference_table_test.value_2
FROM
reference_table_test, colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_1 = reference_table_test.value_1 AND colocated_table_test_2.value_1 = reference_table_test.value_1;
SET client_min_messages TO NOTICE;
SET citus.log_multi_join_order TO FALSE;
SET citus.shard_count TO DEFAULT;
SET citus.task_executor_type to "real-time";
-- some INSERT .. SELECT queries that involve both hash distributed and reference tables
-- should error out since we're inserting into reference table where
-- not all the participants are reference tables
INSERT INTO
reference_table_test (value_1)
SELECT
colocated_table_test.value_1
FROM
colocated_table_test, colocated_table_test_2
WHERE
colocated_table_test.value_1 = colocated_table_test.value_1;
-- should error out, same as the above
INSERT INTO
reference_table_test (value_1)
SELECT
colocated_table_test.value_1
FROM
colocated_table_test, reference_table_test
WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
-- now, insert into the hash partitioned table and use reference
-- tables in the SELECT queries
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
colocated_table_test_2.value_1, reference_table_test.value_2
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
-- some more complex queries (Note that there are more complex queries in multi_insert_select.sql)
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
colocated_table_test_2.value_1, reference_table_test.value_2
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_2 = reference_table_test.value_2
RETURNING value_1, value_2;
-- partition column value comes from reference table which should error out
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
reference_table_test.value_2, colocated_table_test_2.value_1
FROM
colocated_table_test_2, reference_table_test
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
-- some tests for mark_tables_colocated
-- should error out
SELECT mark_tables_colocated('colocated_table_test_2', ARRAY['reference_table_test']);
-- should work sliently
SELECT mark_tables_colocated('reference_table_test', ARRAY['reference_table_test_fifth']);
-- ensure that reference tables on
-- different queries works as expected
CREATE SCHEMA reference_schema;
-- create with schema prefix
CREATE TABLE reference_schema.reference_table_test_sixth (value_1 serial PRIMARY KEY, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_schema.reference_table_test_sixth');
SET search_path TO 'reference_schema';
-- create on the schema
CREATE TABLE reference_table_test_seventh (value_1 serial PRIMARY KEY, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_test_seventh');
-- ingest some data
INSERT INTO reference_table_test_sixth VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test_seventh VALUES (1, 1.0, '1', '2016-12-01');
SET search_path TO 'public';
-- ingest some data
INSERT INTO reference_schema.reference_table_test_sixth VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_schema.reference_table_test_seventh VALUES (2, 2.0, '2', '2016-12-02');
-- some basic queries
SELECT
value_1
FROM
reference_schema.reference_table_test_sixth;
SET search_path TO 'reference_schema';
SELECT
reference_table_test_sixth.value_1
FROM
reference_table_test_sixth, reference_table_test_seventh
WHERE
reference_table_test_sixth.value_4 = reference_table_test_seventh.value_4;
-- last test with cross schemas
SET search_path TO 'public';
SELECT
reftable.value_2, colocated_table_test_2.value_1
FROM
colocated_table_test_2, reference_schema.reference_table_test_sixth as reftable
WHERE
colocated_table_test_2.value_4 = reftable.value_4;
-- let's now test TRUNCATE and DROP TABLE
-- delete all rows and ingest some data
DELETE FROM reference_table_test;
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05');
SELECT
count(*)
FROM
reference_table_test;
-- truncate it and get the result back
TRUNCATE reference_table_test;
SELECT
count(*)
FROM
reference_table_test;
-- now try dropping one of the existing reference tables
-- and check the metadata
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE '%reference_table_test_fifth%';
SELECT logicalrelid FROM pg_dist_shard WHERE logicalrelid::regclass::text LIKE '%reference_table_test_fifth%';
DROP TABLE reference_table_test_fifth;
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE '%reference_table_test_fifth%';
SELECT logicalrelid FROM pg_dist_shard WHERE logicalrelid::regclass::text LIKE '%reference_table_test_fifth%';
-- now test DDL changes
CREATE TABLE reference_table_ddl (value_1 int, value_2 float, value_3 text, value_4 timestamp);
SELECT create_reference_table('reference_table_ddl');
-- CREATE & DROP index and check the workers
CREATE INDEX reference_index_1 ON reference_table_ddl(value_1);
CREATE INDEX reference_index_2 ON reference_table_ddl(value_2, value_3);
-- should be able to create/drop UNIQUE index on a reference table
CREATE UNIQUE INDEX reference_index_3 ON reference_table_ddl(value_1);
-- should be able to add a column
ALTER TABLE reference_table_ddl ADD COLUMN value_5 INTEGER;
ALTER TABLE reference_table_ddl ALTER COLUMN value_5 SET DATA TYPE FLOAT;
ALTER TABLE reference_table_ddl DROP COLUMN value_1;
ALTER TABLE reference_table_ddl ALTER COLUMN value_2 SET DEFAULT 25.0;
ALTER TABLE reference_table_ddl ALTER COLUMN value_3 SET NOT NULL;
-- see that Citus applied all DDLs to the table
\d reference_table_ddl
-- also to the shard placements
\c - - - :worker_1_port
\d reference_table_ddl*
\c - - - :master_port
DROP INDEX reference_index_2;
\c - - - :worker_1_port
\d reference_table_ddl*
\c - - - :master_port
-- as we expect, renaming and setting WITH OIDS does not work for reference tables
ALTER TABLE reference_table_ddl RENAME TO reference_table_ddl_test;
ALTER TABLE reference_table_ddl SET WITH OIDS;
-- now test reference tables against some helper UDFs that Citus provides
-- cannot delete / drop shards from a reference table
SELECT master_apply_delete_command('DELETE FROM reference_table_ddl');
-- cannot add shards
SELECT master_create_empty_shard('reference_table_ddl');
-- master_modify_multiple_shards works, but, does it make sense to use at all?
INSERT INTO reference_table_ddl (value_2, value_3) VALUES (7, 'aa');
SELECT master_modify_multiple_shards('DELETE FROM reference_table_ddl WHERE value_2 = 7');
INSERT INTO reference_table_ddl (value_2, value_3) VALUES (7, 'bb');
SELECT master_modify_multiple_shards('DELETE FROM reference_table_ddl');
-- get/update the statistics
SELECT part_storage_type, part_key, part_replica_count, part_max_size,
part_placement_policy FROM master_get_table_metadata('reference_table_ddl');
SELECT shardid AS a_shard_id FROM pg_dist_shard WHERE logicalrelid = 'reference_table_ddl'::regclass \gset
SELECT master_update_shard_statistics(:a_shard_id);
CREATE TABLE append_reference_tmp_table (id INT);
SELECT master_append_table_to_shard(:a_shard_id, 'append_reference_tmp_table', 'localhost', :master_port);
SELECT master_get_table_ddl_events('reference_table_ddl');
-- in reality, we wouldn't need to repair any reference table shard placements
-- however, the test could be relevant for other purposes
SELECT placementid AS a_placement_id FROM pg_dist_shard_placement WHERE shardid = :a_shard_id AND nodeport = :worker_1_port \gset
SELECT placementid AS b_placement_id FROM pg_dist_shard_placement WHERE shardid = :a_shard_id AND nodeport = :worker_2_port \gset
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE placementid = :a_placement_id;
SELECT master_copy_shard_placement(:a_shard_id, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
SELECT shardid, shardstate FROM pg_dist_shard_placement WHERE placementid = :a_placement_id;
-- some queries that are captured in functions
CREATE FUNCTION select_count_all() RETURNS bigint AS '
SELECT
count(*)
FROM
reference_table_test;
' LANGUAGE SQL;
CREATE FUNCTION insert_into_ref_table(value_1 int, value_2 float, value_3 text, value_4 timestamp)
RETURNS void AS '
INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
' LANGUAGE SQL;
TRUNCATE reference_table_test;
SELECT select_count_all();
SELECT insert_into_ref_table(1, 1.0, '1', '2016-12-01');
SELECT insert_into_ref_table(2, 2.0, '2', '2016-12-02');
SELECT insert_into_ref_table(3, 3.0, '3', '2016-12-03');
SELECT insert_into_ref_table(4, 4.0, '4', '2016-12-04');
SELECT insert_into_ref_table(5, 5.0, '5', '2016-12-05');
SELECT insert_into_ref_table(6, 6.0, '6', '2016-12-06');
SELECT select_count_all();
TRUNCATE reference_table_test;
-- some prepared queries and pl/pgsql functions
PREPARE insert_into_ref_table_pr (int, float, text, timestamp)
AS INSERT INTO reference_table_test VALUES ($1, $2, $3, $4);
-- reference tables do not have up-to-five execution limit as other tables
EXECUTE insert_into_ref_table_pr(1, 1.0, '1', '2016-12-01');
EXECUTE insert_into_ref_table_pr(2, 2.0, '2', '2016-12-02');
EXECUTE insert_into_ref_table_pr(3, 3.0, '3', '2016-12-03');
EXECUTE insert_into_ref_table_pr(4, 4.0, '4', '2016-12-04');
EXECUTE insert_into_ref_table_pr(5, 5.0, '5', '2016-12-05');
EXECUTE insert_into_ref_table_pr(6, 6.0, '6', '2016-12-06');
-- see the count, then truncate the table
SELECT select_count_all();
TRUNCATE reference_table_test;
-- reference tables work with composite key
-- and we even do not need to create hash
-- function etc.
-- first create the type on all nodes
CREATE TYPE reference_comp_key as (key text, value text);
\c - - - :worker_1_port
CREATE TYPE reference_comp_key as (key text, value text);
\c - - - :worker_2_port
CREATE TYPE reference_comp_key as (key text, value text);
\c - - - :master_port
CREATE TABLE reference_table_composite (id int PRIMARY KEY, data reference_comp_key);
SELECT create_reference_table('reference_table_composite');
-- insert and query some data
INSERT INTO reference_table_composite (id, data) VALUES (1, ('key_1', 'value_1')::reference_comp_key);
INSERT INTO reference_table_composite (id, data) VALUES (2, ('key_2', 'value_2')::reference_comp_key);
SELECT * FROM reference_table_composite;
SELECT (data).key FROM reference_table_composite;
-- make sure that reference tables obeys single shard transactions
TRUNCATE reference_table_test;
BEGIN;
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
SELECT * FROM reference_table_test;
ROLLBACK;
SELECT * FROM reference_table_test;
-- now insert a row and commit
BEGIN;
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
COMMIT;
SELECT * FROM reference_table_test;
-- one basic UPDATE test
BEGIN;
UPDATE reference_table_test SET value_1 = 10 WHERE value_1 = 2;
COMMIT;
SELECT * FROM reference_table_test;
-- do not allow mixing transactions
BEGIN;
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
ROLLBACK;
-- Do not allow DDL and modification in the same transaction
BEGIN;
ALTER TABLE reference_table_test ADD COLUMN value_dummy INT;
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
ROLLBACK;
-- clean up tables
DROP TABLE reference_table_test, reference_table_test_second, reference_table_test_third,
reference_table_test_fourth, reference_table_ddl;
DROP SCHEMA reference_schema CASCADE;