Add support for creating distributed tables without shard key [merging the main devel branch] (#6867)

DESCRIPTION: Adds support for creating distributed tables without shard
key

Commits proposed in this PR have already been reviewed in other PRs
noted
for each commit.

With this PR, we allow creating distributed tables without
specifying a shard key via create_distributed_table(). Here are the
the important details about those tables:
* Specifying `shard_count` is not allowed because it is assumed to be 1.
* We mostly call such tables as "single-shard" distributed table in code
  / comments.
* `colocate_with` param allows colocating such single-shard tables to
  each other.
* We define this table type, i.e., SINGLE_SHARD_DISTRIBUTED, as a
subclass
of DISTRIBUTED_TABLE because we mostly want to treat them as distributed
  tables in terms of SQL / DDL / operation support.
* Metadata for such tables look like:
  - distribution method => DISTRIBUTE_BY_NONE
  - replication model => REPLICATION_MODEL_STREAMING
- colocation id => **!=** INVALID_COLOCATION_ID (distinguishes from
Citus local tables)
* We assign colocation groups for such tables to different nodes in a
  round-robin fashion based on the modulo of "colocation id".

There are also still more work that needs to be done, such as improving
SQL
support, making sure that Citus operations work well such distributed
tables
and making sure that latest features merged in at 11.3 / 12.0 (such as
CDC)
works fine. We will take care of them in subsequent PRs.

In this release, we will build schema-based-sharding on top of this
infrastructure. And it's likely that we will use this infra for some
other nice features in future too.
pull/6907/head
Onur Tirtir 2023-05-03 17:15:22 +03:00 committed by GitHub
commit aeaa48c197
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 8679 additions and 110 deletions

View File

@ -141,6 +141,8 @@ static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams,
Var *distributionColumn);
@ -216,51 +218,86 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
if (distributionColumnText)
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
if (PG_ARGISNULL(2))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
PG_RETURN_VOID();
}
shardCount = PG_GETARG_INT32(4);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}
/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
shardCount = PG_GETARG_INT32(4);
/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
else
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when the "
"distribution column is null because in "
"that case it's automatically set to 1")));
}
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
if (!PG_ARGISNULL(2) &&
LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH)
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a single-shard table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead do nothing if the distribution
* method is DISTRIBUTE_BY_HASH.
*/
ereport(ERROR, (errmsg("distribution_type can't be specified "
"when the distribution column is null ")));
}
CreateSingleShardTable(relationId, colocateWithTableName);
}
PG_RETURN_VOID();
}
@ -276,11 +313,18 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently "
"to create a distributed table with a null shard "
"key, consider using create_distributed_table()")));
}
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
char *distributionColumnName = text_to_cstring(distributionColumnText);
@ -982,6 +1026,23 @@ CreateReferenceTable(Oid relationId)
}
/*
* CreateSingleShardTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
void
CreateSingleShardTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams);
}
/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
@ -1000,7 +1061,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) !=
(distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "
@ -1078,7 +1140,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
PropagatePrerequisiteObjectsForDistributedTable(relationId);
Var *distributionColumn = NULL;
if (distributedTableParams)
if (distributedTableParams && distributedTableParams->distributionColumnName)
{
distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributedTableParams->
@ -1150,6 +1212,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
if (ShouldSyncTableMetadata(relationId))
{
@ -1204,7 +1271,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}
/* copy over data for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
if (tableType == HASH_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED ||
tableType == REFERENCE_TABLE)
{
if (RegularTable(relationId))
{
@ -1268,6 +1336,13 @@ DecideCitusTableParams(CitusTableType tableType,
break;
}
case SINGLE_SHARD_DISTRIBUTED:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
break;
}
case REFERENCE_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
@ -1630,6 +1705,41 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}
/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* distributed table.
*/
static void
CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId)
{
if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");
/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateSingleShardTableShardWithRoundRobinPolicy(relationId, colocationId);
}
}
/*
* ColocationIdForNewTable returns a colocation id for given table
* according to given configuration. If there is no such configuration, it
@ -1662,8 +1772,8 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"for append / range distributed tables.")));
}
return colocationId;
@ -1679,10 +1789,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;
/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
@ -1871,8 +1982,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or single-shard tables.
*/
bool isSingleShardTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isSingleShardTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "

View File

@ -303,6 +303,11 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
/*
* Foreign keys from citus local tables or reference tables to distributed
* tables are not supported.
*
* We could support foreign keys from references tables to single-shard
* tables but this doesn't seem useful a lot. However, if we decide supporting
* this, then we need to expand relation access tracking check for the single-shard
* tables too.
*/
if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
{
@ -361,7 +366,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsSingleShardTable =
IsSingleShardTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsSingleShardTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),

View File

@ -2146,6 +2146,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, SINGLE_SHARD_DISTRIBUTED) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),

View File

@ -384,6 +384,11 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
*/
if (IsCitusTable(parentRelationId))
{
/*
* We can create Citus local tables and single-shard distributed tables
* right away, without switching to sequential mode, because they are going to
* have only one shard.
*/
if (IsCitusTableType(parentRelationId, CITUS_LOCAL_TABLE))
{
CreateCitusLocalTablePartitionOf(createStatement, relationId,
@ -391,11 +396,18 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
return;
}
char *parentRelationName = generate_qualified_relation_name(parentRelationId);
if (IsCitusTableType(parentRelationId, SINGLE_SHARD_DISTRIBUTED))
{
CreateSingleShardTable(relationId, parentRelationName);
return;
}
Var *parentDistributionColumn = DistPartitionKeyOrError(parentRelationId);
char *distributionColumnName =
ColumnToColumnName(parentRelationId, (Node *) parentDistributionColumn);
char parentDistributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = generate_qualified_relation_name(parentRelationId);
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(parentRelationId,
relationId);
@ -589,19 +601,32 @@ PreprocessAttachCitusPartitionToCitusTable(Oid parentCitusRelationId, Oid
/*
* DistributePartitionUsingParent takes a parent and a partition relation and
* distributes the partition, using the same distribution column as the parent.
* It creates a *hash* distributed table by default, as partitioned tables can only be
* distributed by hash.
* distributes the partition, using the same distribution column as the parent, if the
* parent has a distribution column. It creates a *hash* distributed table by default, as
* partitioned tables can only be distributed by hash, unless it's null key distributed.
*
* If the parent has no distribution key, we distribute the partition with null key too.
*/
static void
DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationId)
{
char *parentRelationName = generate_qualified_relation_name(parentCitusRelationId);
if (!HasDistributionKey(parentCitusRelationId))
{
/*
* If the parent is null key distributed, we should distribute the partition
* with null distribution key as well.
*/
CreateSingleShardTable(partitionRelationId, parentRelationName);
return;
}
Var *distributionColumn = DistPartitionKeyOrError(parentCitusRelationId);
char *distributionColumnName = ColumnToColumnName(parentCitusRelationId,
(Node *) distributionColumn);
char distributionMethod = DISTRIBUTE_BY_HASH;
char *parentRelationName = generate_qualified_relation_name(parentCitusRelationId);
SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(
parentCitusRelationId, partitionRelationId);

View File

@ -324,7 +324,8 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
if (IsCitusTable(relationId) && !HasDistributionKey(relationId) &&
if ((IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);

View File

@ -508,11 +508,21 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
return partitionMethod == DISTRIBUTE_BY_RANGE;
}
case SINGLE_SHARD_DISTRIBUTED:
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}
case DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_APPEND;
partitionMethod == DISTRIBUTE_BY_APPEND ||
(partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID);
}
case STRICTLY_PARTITIONED_DISTRIBUTED_TABLE:
@ -815,6 +825,21 @@ IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
}
/*
* IsSingleShardTableByDistParams returns true if given partitionMethod,
* replicationModel and colocationId would identify a single-shard distributed
* table that has a null shard key.
*/
bool
IsSingleShardTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}
/*
* CitusTableList returns a list that includes all the valid distributed table
* cache entries.

View File

@ -515,7 +515,7 @@ ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
/*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is a hash distributed table or
* reference/citus local table.
* a Citus table that doesn't have shard key.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
@ -537,10 +537,11 @@ ShouldSyncTableMetadata(Oid relationId)
/*
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a distributed table should
* be propagated to metadata workers, i.e. the table is an MX table or reference table.
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a Citus table should
* be propagated to metadata workers, i.e. the table is an MX table or Citus table
* that doesn't have shard key.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
* considered as MX tables.
*
* ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads
* from catalog tables directly.
@ -1080,7 +1081,7 @@ EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId)
/*
* DistributionCreateCommands generates a commands that can be
* executed to replicate the metadata for a distributed table.
* executed to replicate the metadata for a Citus table.
*/
char *
DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)

View File

@ -217,9 +217,9 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId);
CheckHashPartitionedTable(sourceRelationId);
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
Assert(targetCacheEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
targetCacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
/*
* In contrast to append/range partitioned tables it makes more sense to
@ -259,10 +259,20 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
*newShardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, newShardIdPtr);
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
text *shardMinValueText = NULL;
text *shardMaxValueText = NULL;
if (targetCacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
Assert(list_length(sourceShardIntervalList) == 1);
}
else
{
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
shardMinValueText = IntegerToText(shardMinValue);
shardMaxValueText = IntegerToText(shardMaxValue);
}
List *sourceShardPlacementList = ShardPlacementListSortedByWorker(
sourceShardId);
@ -362,6 +372,72 @@ CreateReferenceTableShard(Oid distributedTableId)
}
/*
* CreateSingleShardTableShardWithRoundRobinPolicy creates a single
* shard for the given distributedTableId. The created shard does not
* have min/max values. Unlike CreateReferenceTableShard, the shard is
* _not_ replicated to all nodes but would have a single placement like
* Citus local tables.
*
* However, this placement doesn't necessarily need to be placed on
* coordinator. This is determined based on modulo of the colocation
* id that given table has been associated to.
*/
void
CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocationId)
{
EnsureTableOwner(relationId);
/* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(relationId, ExclusiveLock);
/*
* Load and sort the worker node list for deterministic placement.
*
* Also take a RowShareLock on pg_dist_node to disallow concurrent
* node list changes that require an exclusive lock.
*/
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}
char shardStorageType = ShardStorageType(relationId);
text *minHashTokenText = NULL;
text *maxHashTokenText = NULL;
uint64 shardId = GetNextShardId();
InsertShardRow(relationId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
/* determine the node index based on colocation id */
int roundRobinNodeIdx = colocationId % workerNodeCount;
int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
bool colocatedShard = false;
CreateShardsOnWorkers(relationId, insertedShardPlacements,
useExclusiveConnection, colocatedShard);
}
/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function

View File

@ -138,6 +138,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
errdetail("We currently don't support creating shards "
"on hash-partitioned tables")));
}
else if (IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("relation \"%s\" is a single shard table",
relationName),
errdetail("We currently don't support creating shards "
"on single shard tables")));
}
else if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
ereport(ERROR, (errmsg("relation \"%s\" is a reference table",
@ -521,7 +528,8 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
relationShard->shardId = shardInterval->shardId;
List *relationShardList = list_make1(relationShard);
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
if ((IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, SINGLE_SHARD_DISTRIBUTED)) &&
cacheEntry->colocationId != INVALID_COLOCATION_ID)
{
shardIndex = ShardIndex(shardInterval);

View File

@ -1025,6 +1025,17 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
{
return distributedPlan;
}
else if (ContainsSingleShardTable(originalQuery))
{
/*
* We only support router queries if the query contains reference to
* a single-shard table. This temporary restriction will be removed
* once we support recursive planning for the queries that reference
* single-shard tables.
*/
WrapRouterErrorForSingleShardTable(distributedPlan->planningError);
RaiseDeferredError(distributedPlan->planningError, ERROR);
}
else
{
RaiseDeferredError(distributedPlan->planningError, DEBUG2);
@ -2462,6 +2473,18 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
}
/*
* ContainsSingleShardTable returns true if given query contains reference
* to a single-shard table.
*/
bool
ContainsSingleShardTable(Query *query)
{
RTEListProperties *rteListProperties = GetRTEListPropertiesForQuery(query);
return rteListProperties->hasSingleShardDistTable;
}
/*
* GetRTEListPropertiesForQuery is a wrapper around GetRTEListProperties that
* returns RTEListProperties for the rte list retrieved from query.
@ -2538,6 +2561,15 @@ GetRTEListProperties(List *rangeTableList)
else if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE))
{
rteListProperties->hasDistributedTable = true;
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
rteListProperties->hasSingleShardDistTable = true;
}
else
{
rteListProperties->hasDistTableWithShardKey = true;
}
}
else
{

View File

@ -212,6 +212,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}
/*
* If the table doesn't have a distribution column, we don't need to
* check anything further.
*/
Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey)
{
return true;
}
/* WHERE clause should not be empty for distributed tables */
if (joinTree == NULL ||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
@ -220,13 +230,6 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}
/* if that's a reference table, we don't need to check anything further */
Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey)
{
return true;
}
/* convert list of expressions into expression tree for further processing */
quals = joinTree->quals;
if (quals != NULL && IsA(quals, List))

View File

@ -730,25 +730,49 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
"table", NULL, NULL);
}
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
if (!HasDistributionKey(targetRelationId) ||
subqueryRteListProperties->hasSingleShardDistTable)
{
return error;
/*
* XXX: Better to check this regardless of the fact that the target table
* has a distribution column or not.
*/
List *distributedRelationIdList = DistributedRelationIdList(subquery);
distributedRelationIdList = lappend_oid(distributedRelationIdList,
targetRelationId);
if (!AllDistributedRelationsInListColocated(distributedRelationIdList))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot reference a "
"distributed table without a shard key together "
"with non-colocated distributed tables",
NULL, NULL);
}
}
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
if (HasDistributionKey(targetRelationId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
/* ensure that INSERT's partition column comes from SELECT's partition column */
error = InsertPartitionColumnMatchesSelect(queryTree, insertRte, subqueryRte,
&selectPartitionColumnTableId);
if (error)
{
return error;
}
/*
* We expect partition column values come from colocated tables. Note that we
* skip this check from the reference table case given that all reference tables
* are already (and by default) co-located.
*/
if (!TablesColocated(insertRte->relid, selectPartitionColumnTableId))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"INSERT target table and the source relation of the SELECT partition "
"column value must be colocated in distributed INSERT ... SELECT",
NULL, NULL);
}
}
}
@ -867,7 +891,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
*/
RTEListProperties *subqueryRteListProperties = GetRTEListPropertiesForQuery(
copiedSubquery);
if (subqueryRteListProperties->hasDistributedTable)
if (subqueryRteListProperties->hasDistTableWithShardKey)
{
AddPartitionKeyNotNullFilterToSelect(copiedSubquery);
}
@ -1537,6 +1561,19 @@ CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse, ParamListInfo bou
bool repartitioned = IsRedistributablePlan(selectPlan->planTree) &&
IsSupportedRedistributionTarget(targetRelationId);
/*
* Today it's not possible to generate a distributed plan for a SELECT
* having more than one tasks if it references a single-shard table.
* This is because, we don't support queries beyond router planner
* if the query references a single-shard table.
*
* For this reason, right now we don't expect an INSERT .. SELECT
* query to go through the repartitioned INSERT .. SELECT logic if the
* SELECT query references a single-shard table.
*/
Assert(!repartitioned ||
!GetRTEListPropertiesForQuery(selectQueryCopy)->hasSingleShardDistTable);
distributedPlan->insertSelectQuery = insertSelectQuery;
distributedPlan->selectPlanForInsertSelect = selectPlan;
distributedPlan->insertSelectMethod = repartitioned ?

View File

@ -509,6 +509,11 @@ InsertDistributionColumnMatchesSource(Oid targetRelationId, Query *query)
return NULL;
}
if (!HasDistributionKey(targetRelationId))
{
return NULL;
}
bool foundDistributionColumn = false;
MergeAction *action = NULL;
foreach_ptr(action, query->mergeActionList)

View File

@ -1404,7 +1404,7 @@ DistPartitionKeyOrError(Oid relationId)
if (partitionKey == NULL)
{
ereport(ERROR, (errmsg(
"no distribution column found for relation %d, because it is a reference table",
"no distribution column found for relation %d",
relationId)));
}

View File

@ -272,7 +272,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
if (!targetListOnPartitionColumn)
{
if (!FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
IsDistributedTableRTE))
IsTableWithDistKeyRTE))
{
targetListOnPartitionColumn = true;
}
@ -379,6 +379,20 @@ IsReferenceTableRTE(Node *node)
}
/*
* IsTableWithDistKeyRTE gets a node and returns true if the node
* is a range table relation entry that points to a distributed table
* that has a distribution column.
*/
bool
IsTableWithDistKeyRTE(Node *node)
{
Oid relationId = NodeTryGetRteRelid(node);
return relationId != InvalidOid && IsCitusTable(relationId) &&
HasDistributionKey(relationId);
}
/*
* FullCompositeFieldList gets a composite field list, and checks if all fields
* of composite type are used in the list.

View File

@ -2487,7 +2487,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
/* non-distributed tables have only one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
/* use as anchor shard only if we couldn't find any yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;

View File

@ -258,6 +258,22 @@ CreateModifyPlan(Query *originalQuery, Query *query,
}
/*
* WrapRouterErrorForSingleShardTable wraps given planning error with a
* generic error message if given query references a distributed table
* that doesn't have a distribution key.
*/
void
WrapRouterErrorForSingleShardTable(DeferredErrorMessage *planningError)
{
planningError->detail = planningError->message;
planningError->message = pstrdup("queries that reference a distributed "
"table without a shard key can only "
"reference colocated distributed "
"tables or reference tables");
}
/*
* CreateSingleTaskRouterSelectPlan creates a physical plan for given SELECT query.
* The returned plan is a router task that returns query results from a single worker.
@ -1870,6 +1886,11 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
*/
if (IsMergeQuery(originalQuery))
{
if (ContainsSingleShardTable(originalQuery))
{
WrapRouterErrorForSingleShardTable(*planningError);
}
RaiseDeferredError(*planningError, ERROR);
}
else
@ -2684,7 +2705,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
if (!HasDistributionKey(relationId))
{
/* we don't need to do shard pruning for non-distributed tables */
/* we don't need to do shard pruning for single shard tables */
return list_make1(LoadShardIntervalList(relationId));
}
@ -2974,7 +2995,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
Assert(query->commandType == CMD_INSERT);
/* reference tables and citus local tables can only have one shard */
/* tables that don't have distribution column can only have one shard */
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
@ -2992,6 +3013,12 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
ereport(ERROR, (errmsg("local table cannot have %d shards",
shardCount)));
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("distributed tables having a null shard key "
"cannot have %d shards",
shardCount)));
}
}
ShardInterval *shardInterval = linitial(shardIntervalList);
@ -3849,7 +3876,8 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree)
CitusTableCacheEntry *modificationTableCacheEntry =
GetCitusTableCacheEntry(distributedTableId);
if (!HasDistributionKeyCacheEntry(modificationTableCacheEntry))
if (!IsCitusTableTypeCacheEntry(modificationTableCacheEntry,
DISTRIBUTED_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot router plan modification of a non-distributed table",

View File

@ -195,7 +195,7 @@ RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType acce
* recursively calling RecordRelationAccessBase(), so becareful about
* removing this check.
*/
if (IsCitusTable(relationId) && HasDistributionKey(relationId))
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{
return;
}
@ -732,7 +732,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (HasDistributionKeyCacheEntry(cacheEntry) ||
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) ||
cacheEntry->referencingRelationsViaForeignKey == NIL)
{
return;
@ -931,7 +931,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
* We're only interested in foreign keys to reference tables and citus
* local tables.
*/
if (IsCitusTable(referencedRelation) && HasDistributionKey(referencedRelation))
if (IsCitusTableType(referencedRelation, DISTRIBUTED_TABLE))
{
continue;
}
@ -993,7 +993,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
bool holdsConflictingLocks = false;
Assert(!HasDistributionKeyCacheEntry(cacheEntry));
Assert(!IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE));
Oid referencingRelation = InvalidOid;
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)

View File

@ -1384,17 +1384,19 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId)
{
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
if (IsCitusTableTypeCacheEntry(sourceTableEntry, APPEND_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(sourceTableEntry, RANGE_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(sourceTableEntry, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"with append / range distributed tables and local "
"tables added to metadata.")));
}
char sourceReplicationModel = sourceTableEntry->replicationModel;
if (sourceReplicationModel != replicationModel)
{
char *relationName = get_rel_name(relationId);
@ -1406,7 +1408,9 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
sourceRelationName, relationName)));
}
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = !sourceDistributionColumn ? InvalidOid :
sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);

View File

@ -135,7 +135,7 @@ BuildDistributionKeyFromColumnName(Oid relationId, char *columnName, LOCKMODE lo
char *tableName = get_rel_name(relationId);
/* short circuit for reference tables */
/* short circuit for reference tables and single-shard tables */
if (columnName == NULL)
{
return NULL;

View File

@ -206,7 +206,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
*
* For hash partitioned tables, it calculates hash value of a number in its
* range (e.g. min value) and finds which shard should contain the hashed
* value. For reference tables and citus local tables, it simply returns 0.
* value. For the tables that don't have a shard key, it simply returns 0.
* For the other table types, the function errors out.
*/
int
@ -231,12 +231,11 @@ ShardIndex(ShardInterval *shardInterval)
"tables that are added to citus metadata")));
}
/* short-circuit for reference tables */
/* short-circuit for the tables that don't have a distribution key */
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
/*
* Reference tables and citus local tables have only a single shard,
* so the index is fixed to 0.
* Such tables have only a single shard, so the index is fixed to 0.
*/
shardIndex = 0;

View File

@ -262,6 +262,8 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId,
uint32 colocationId);
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList);

View File

@ -147,9 +147,19 @@ typedef struct RTEListProperties
bool hasReferenceTable;
bool hasCitusLocalTable;
/* includes hash, append and range partitioned tables */
/* includes hash, single-shard, append and range partitioned tables */
bool hasDistributedTable;
/*
* Effectively, hasDistributedTable is equal to
* "hasDistTableWithShardKey || hasSingleShardDistTable".
*
* We provide below two for the callers that want to know what kind of
* distributed tables that given query has references to.
*/
bool hasDistTableWithShardKey;
bool hasSingleShardDistTable;
/* union of hasReferenceTable, hasCitusLocalTable and hasDistributedTable */
bool hasCitusTable;
@ -243,6 +253,7 @@ extern int32 BlessRecordExpression(Expr *expr);
extern void DissuadePlannerFromUsingPlan(PlannedStmt *plan);
extern PlannedStmt * FinalizePlan(PlannedStmt *localPlan,
struct DistributedPlan *distributedPlan);
extern bool ContainsSingleShardTable(Query *query);
extern RTEListProperties * GetRTEListPropertiesForQuery(Query *query);

View File

@ -123,6 +123,7 @@ typedef enum
HASH_DISTRIBUTED,
APPEND_DISTRIBUTED,
RANGE_DISTRIBUTED,
SINGLE_SHARD_DISTRIBUTED,
/* hash, range or append distributed table */
DISTRIBUTED_TABLE,
@ -157,6 +158,8 @@ extern uint32 ColocationIdViaCatalog(Oid relationId);
bool IsReferenceTableByDistParams(char partitionMethod, char replicationModel);
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId);
extern bool IsSingleShardTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId);
extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern bool ShardExists(uint64 shardId);

View File

@ -326,6 +326,7 @@ extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateSingleShardTable(Oid relationId, char *colocateWithTableName);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName);

View File

@ -200,6 +200,7 @@ extern bool IsCitusTableRTE(Node *node);
extern bool IsDistributedOrReferenceTableRTE(Node *node);
extern bool IsDistributedTableRTE(Node *node);
extern bool IsReferenceTableRTE(Node *node);
extern bool IsTableWithDistKeyRTE(Node *node);
extern bool IsCitusExtraDataContainerRelation(RangeTblEntry *rte);
extern bool ContainsReadIntermediateResultFunction(Node *node);
extern bool ContainsReadIntermediateResultArrayFunction(Node *node);

View File

@ -36,6 +36,7 @@ extern DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query,
extern DistributedPlan * CreateModifyPlan(Query *originalQuery, Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
extern void WrapRouterErrorForSingleShardTable(DeferredErrorMessage *planningError);
extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext,

View File

@ -76,6 +76,19 @@ def run_for_config(config, lock, sql_schedule_name):
cfg.SUPER_USER_NAME,
)
common.save_regression_diff("postgres", config.output_dir)
elif config.all_null_dist_key:
exitCode |= common.run_pg_regress_without_exit(
config.bindir,
config.pg_srcdir,
config.coordinator_port(),
cfg.SINGLE_SHARD_PREP_SCHEDULE,
config.output_dir,
config.input_dir,
cfg.SUPER_USER_NAME,
)
common.save_regression_diff(
"single_shard_table_prep_regression", config.output_dir
)
exitCode |= _run_pg_regress_on_port(
config, config.coordinator_port(), cfg.CREATE_SCHEDULE

View File

@ -22,6 +22,7 @@ ARBITRARY_SCHEDULE_NAMES = [
"sql_schedule",
"sql_base_schedule",
"postgres_schedule",
"single_shard_table_prep_schedule",
]
BEFORE_PG_UPGRADE_SCHEDULE = "./before_pg_upgrade_schedule"
@ -29,6 +30,7 @@ AFTER_PG_UPGRADE_SCHEDULE = "./after_pg_upgrade_schedule"
CREATE_SCHEDULE = "./create_schedule"
POSTGRES_SCHEDULE = "./postgres_schedule"
SINGLE_SHARD_PREP_SCHEDULE = "./single_shard_table_prep_schedule"
SQL_SCHEDULE = "./sql_schedule"
SQL_BASE_SCHEDULE = "./sql_base_schedule"
@ -101,6 +103,7 @@ class CitusBaseClusterConfig(object, metaclass=NewInitCaller):
self.dbname = DATABASE_NAME
self.is_mx = True
self.is_citus = True
self.all_null_dist_key = False
self.name = type(self).__name__
self.settings = {
"shared_preload_libraries": "citus",
@ -203,6 +206,43 @@ class PostgresConfig(CitusDefaultClusterConfig):
]
class AllSingleShardTableDefaultConfig(CitusDefaultClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)
self.all_null_dist_key = True
self.skip_tests += [
# i) Skip the following tests because they require SQL support beyond
# router planner / supporting more DDL command types.
#
# group 1
"dropped_columns_create_load",
"dropped_columns_1",
# group 2
"distributed_planning_create_load",
"distributed_planning",
# group 4
"views_create",
"views",
# group 5
"intermediate_result_pruning_create",
"intermediate_result_pruning_queries_1",
"intermediate_result_pruning_queries_2",
# group 6
"local_dist_join_load",
"local_dist_join",
"arbitrary_configs_recurring_outer_join",
# group 7
"sequences_create",
"sequences",
# group 8
"function_create",
"functions",
#
# ii) Skip the following test as it requires support for create_distributed_function.
"nested_execution",
]
class CitusSingleNodeClusterConfig(CitusDefaultClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)

View File

@ -128,6 +128,7 @@ DEPS = {
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]),
"multi_simple_queries": TestDeps("base_schedule"),
"create_single_shard_table": TestDeps("minimal_schedule"),
}

View File

@ -0,0 +1,154 @@
CREATE SCHEMA alter_null_dist_key;
SET search_path TO alter_null_dist_key;
SET citus.next_shard_id TO 1720000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE SEQUENCE dist_seq;
CREATE TABLE null_dist_table(a bigint DEFAULT nextval('dist_seq') UNIQUE, "b" text, c bigint GENERATED BY DEFAULT AS IDENTITY);
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT create_distributed_table('null_dist_table', null, colocate_with=>'none', distribution_type=>null);
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$alter_null_dist_key.null_dist_table$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- add column
ALTER TABLE null_dist_table ADD COLUMN d bigint DEFAULT 2;
SELECT * FROM null_dist_table ORDER BY c;
a | b | c | d
---------------------------------------------------------------------
1 | test | 1 | 2
(1 row)
-- alter default, set to 3
ALTER TABLE null_dist_table ALTER COLUMN d SET DEFAULT 3;
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT * FROM null_dist_table ORDER BY c;
a | b | c | d
---------------------------------------------------------------------
1 | test | 1 | 2
2 | test | 2 | 3
(2 rows)
-- drop default, see null
ALTER TABLE null_dist_table ALTER COLUMN d DROP DEFAULT;
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT * FROM null_dist_table ORDER BY c;
a | b | c | d
---------------------------------------------------------------------
1 | test | 1 | 2
2 | test | 2 | 3
3 | test | 3 |
(3 rows)
-- cleanup the rows that were added to test the default behavior
DELETE FROM null_dist_table WHERE "b" = 'test' AND a > 1;
-- alter column type
ALTER TABLE null_dist_table ALTER COLUMN d TYPE text;
UPDATE null_dist_table SET d = 'this is a text' WHERE d = '2';
SELECT * FROM null_dist_table ORDER BY c;
a | b | c | d
---------------------------------------------------------------------
1 | test | 1 | this is a text
(1 row)
-- drop seq column
ALTER TABLE null_dist_table DROP COLUMN a;
SELECT * FROM null_dist_table ORDER BY c;
b | c | d
---------------------------------------------------------------------
test | 1 | this is a text
(1 row)
-- add not null constraint
ALTER TABLE null_dist_table ALTER COLUMN b SET NOT NULL;
-- not null constraint violation, error out
INSERT INTO null_dist_table VALUES (NULL, 2, 'test');
ERROR: null value in column "b" violates not-null constraint
DETAIL: Failing row contains (null, 2, test).
CONTEXT: while executing command on localhost:xxxxx
-- drop not null constraint and try again
ALTER TABLE null_dist_table ALTER COLUMN b DROP NOT NULL;
INSERT INTO null_dist_table VALUES (NULL, 3, 'test');
SELECT * FROM null_dist_table ORDER BY c;
b | c | d
---------------------------------------------------------------------
test | 1 | this is a text
| 3 | test
(2 rows)
-- add exclusion constraint
ALTER TABLE null_dist_table ADD CONSTRAINT exc_b EXCLUDE USING btree (b with =);
-- rename the exclusion constraint, errors out
ALTER TABLE null_dist_table RENAME CONSTRAINT exc_b TO exc_b_1;
ERROR: renaming constraints belonging to distributed tables is currently unsupported
-- create exclusion constraint without a name
ALTER TABLE null_dist_table ADD EXCLUDE USING btree (b with =);
-- test setting autovacuum option
ALTER TABLE null_dist_table SET (autovacuum_enabled = false);
-- test multiple subcommands
ALTER TABLE null_dist_table ADD COLUMN int_column1 INTEGER,
DROP COLUMN d;
SELECT * FROM null_dist_table ORDER BY c;
b | c | int_column1
---------------------------------------------------------------------
test | 1 |
| 3 |
(2 rows)
-- test policy and row level security
CREATE TABLE null_dist_key_with_policy (table_user text);
INSERT INTO null_dist_key_with_policy VALUES ('user_1');
SELECT create_distributed_table('null_dist_key_with_policy', null);
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$alter_null_dist_key.null_dist_key_with_policy$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- enable rls
ALTER TABLE null_dist_key_with_policy ENABLE ROW LEVEL SECURITY;
-- user_1 will be allowed to see the inserted row
CREATE ROLE user_1 WITH LOGIN;
GRANT ALL ON SCHEMA alter_null_dist_key TO user_1;
GRANT ALL ON TABLE alter_null_dist_key.null_dist_key_with_policy TO user_1;
CREATE POLICY table_policy ON null_dist_key_with_policy TO user_1
USING (table_user = current_user);
-- user_2 will not be allowed to see the inserted row
CREATE ROLE user_2 WITH LOGIN;
GRANT ALL ON SCHEMA alter_null_dist_key TO user_2;
GRANT ALL ON TABLE alter_null_dist_key.null_dist_key_with_policy TO user_2;
CREATE POLICY table_policy_1 ON null_dist_key_with_policy TO user_2
USING (table_user = current_user);
\c - user_1 -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
table_user
---------------------------------------------------------------------
user_1
(1 row)
\c - user_2 -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
table_user
---------------------------------------------------------------------
(0 rows)
-- postgres will always be allowed to see the row as a superuser
\c - postgres -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
table_user
---------------------------------------------------------------------
user_1
(1 row)
-- cleanup
SET client_min_messages TO ERROR;
DROP SCHEMA alter_null_dist_key CASCADE;
DROP ROLE user_1, user_2;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,814 @@
CREATE SCHEMA insert_select_single_shard_table;
SET search_path TO insert_select_single_shard_table;
SET citus.next_shard_id TO 1820000;
SET citus.shard_count TO 32;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO NOTICE;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE nullkey_c2_t1(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c1_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c1_t2(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_c2_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE TABLE postgres_local_table(a int, b int);
CREATE FUNCTION reload_tables() RETURNS void AS $$
BEGIN
SET LOCAL client_min_messages TO WARNING;
TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1,
distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i;
INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
END;
$$ LANGUAGE plpgsql;
SELECT reload_tables();
reload_tables
---------------------------------------------------------------------
(1 row)
CREATE TABLE append_table (a int, b int);
SELECT create_distributed_table('append_table', 'a', 'append');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT master_create_empty_shard('append_table') AS shardid1 \gset
SELECT master_create_empty_shard('append_table') AS shardid2 \gset
SELECT master_create_empty_shard('append_table') AS shardid3 \gset
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1);
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2);
CREATE TABLE range_table(a int, b int);
SELECT create_distributed_table('range_table', 'a', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}');
INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50);
CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1;
SET client_min_messages TO DEBUG2;
-- Test inserting into a distributed table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
-- use a distributed table that is colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: INSERT ... SELECT into an append-distributed table is not supported
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
DEBUG: Router planner cannot handle multi-shard select queries
avg | avg
---------------------------------------------------------------------
4.2105263157894737 | 4.2105263157894737
(1 row)
TRUNCATE distributed_table_c1_t1;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a reference table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a non-colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
-- use a distributed table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.0428571428571429 | 4.0428571428571429
(1 row)
TRUNCATE reference_table;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a citus local table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a reference table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a colocated single-shard table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a distributed table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a postgres local table
INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.4333333333333333 | 4.4333333333333333
(1 row)
TRUNCATE citus_local_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a single-shard table by selecting from a combination of
-- different table types, together with or without single-shard tables.
-- use a postgres local table
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a citus local table
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table;
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7;
DEBUG: distributed INSERT ... SELECT cannot select from a local table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from distributed tables and local tables at the same time
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
-- use a non-colocated single-shard table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- use a materialized view
INSERT INTO nullkey_c1_t1 SELECT * FROM matview;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a);
DEBUG: distributed INSERT ... SELECT cannot select from a local relation when inserting into a distributed table
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
-- use append / range distributed tables
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table;
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 SELECT * FROM append_table;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Router planner does not support append-partitioned tables.
DEBUG: Collecting INSERT ... SELECT results on coordinator
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
5.8611111111111111 | 13.9305555555555556
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
3.8750000000000000 | 3.8750000000000000
(1 row)
TRUNCATE nullkey_c1_t1, nullkey_c2_t1;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Test inserting into a local table by selecting from a combination of
-- different table types, together with or without single-shard tables.
INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
DEBUG: Creating router plan
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2;
DEBUG: Creating router plan
WITH cte_1 AS (
DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING *
)
INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL;
DEBUG: Creating router plan
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
avg | avg
---------------------------------------------------------------------
5.0000000000000000 | 5.0000000000000000
(1 row)
TRUNCATE postgres_local_table;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b)
)
INSERT INTO distributed_table_c1_t1
SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: CTE cte_2 is going to be inlined via distributed planning
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Local tables cannot be used in distributed queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT * FROM nullkey_c1_t2 WHERE EXISTS (
SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a
)
ORDER BY 1,2 OFFSET 1 LIMIT 4
)
INSERT INTO distributed_table_c1_t1
SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: CTE cte_1 is going to be inlined via distributed planning
DEBUG: CTE cte_2 is going to be inlined via distributed planning
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1
) t2
ON t1.b < t2.b;
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1 (a, b)
WITH cte AS (
SELECT a, b,
(SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1,
(SELECT a FROM reference_table WHERE b = t.b) AS d2
FROM nullkey_c1_t1 t
)
SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL;
DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO citus_local_table (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
CROSS JOIN (
SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1
) t2;
DEBUG: distributed INSERT ... SELECT cannot insert into a local table that is added to metadata
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: found no worker with all shard placements
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM reference_table t1
LEFT JOIN (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM nullkey_c1_t1
) t2 ON t1.b = t2.b
WHERE t2.rn > 0;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT rn, b
FROM (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM distributed_table_c2_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.rn > 2;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT sum_val, b
FROM (
SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val
FROM nullkey_c1_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.sum_val > 2;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
DEBUG: DISTINCT ON (non-partition column) clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Similarly, we could push down the following query as well. see
-- https://github.com/citusdata/citus/pull/6831.
INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1;
DEBUG: Window functions without PARTITION BY on distribution column is currently unsupported
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c2_t1
SELECT t2.a, t2.b
FROM nullkey_c1_t1 AS t2
JOIN reference_table AS t3 ON (t2.a = t3.a)
WHERE NOT EXISTS (
SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM nullkey_c1_t1 AS t1
WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: Router planner cannot handle multi-shard select queries
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
JOIN (
SELECT t2.a FROM (
SELECT a FROM nullkey_c1_t1
UNION
SELECT a FROM nullkey_c1_t2
) AS t2
) AS t3 ON t1.a = t3.a;
DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries
DEBUG: Creating router plan
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- Temporaryly reduce the verbosity to avoid noise
-- in the output of the next query.
SET client_min_messages TO DEBUG1;
INSERT INTO nullkey_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
WHERE t1.a IN (
SELECT t2.a FROM (
SELECT t3.a FROM (
SELECT a FROM distributed_table_c1_t1 WHERE b > 4
) AS t3
JOIN (
SELECT a FROM distributed_table_c1_t2 WHERE b < 7
) AS t4 ON t3.a = t4.a
) AS t2
);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM (SELECT t3.a FROM ((SELECT distributed_table_c1_t1.a FROM insert_select_single_shard_table.distributed_table_c1_t1 WHERE (distributed_table_c1_t1.b OPERATOR(pg_catalog.>) 4)) t3 JOIN (SELECT distributed_table_c1_t2.a FROM insert_select_single_shard_table.distributed_table_c1_t2 WHERE (distributed_table_c1_t2.b OPERATOR(pg_catalog.<) 7)) t4 ON ((t3.a OPERATOR(pg_catalog.=) t4.a)))) t2
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM insert_select_single_shard_table.reference_table t1 WHERE (a OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)))
DEBUG: Collecting INSERT ... SELECT results on coordinator
SET client_min_messages TO DEBUG2;
-- test upsert with plain INSERT query
CREATE TABLE upsert_test_1
(
unique_col int UNIQUE,
other_col int,
third_col int
);
DEBUG: CREATE TABLE / UNIQUE will create implicit index "upsert_test_1_unique_col_key" for table "upsert_test_1"
SELECT create_distributed_table('upsert_test_1', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE upsert_test_2(key int primary key, value text);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_2_pkey" for table "upsert_test_2"
SELECT create_distributed_table('upsert_test_2', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text;
DEBUG: Creating router plan
INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2.value::int * 3)::text;
DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 VALUES (3, 5, 7);
DEBUG: Creating router plan
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int
DO UPDATE SET other_col = 5;
ERROR: queries that reference a distributed table without a shard key can only reference colocated distributed tables or reference tables
DETAIL: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2));
DEBUG: CREATE TABLE will create implicit sequence "upsert_test_3_key_2_seq" for serial column "upsert_test_3.key_2"
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "upsert_test_3_pkey" for table "upsert_test_3"
SELECT create_distributed_table('upsert_test_3', null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *;
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
1 | 1 | 1
(1 row)
INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *;
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
5 | 2 | default_value
(1 row)
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *;
key_1 | key_2 | value
---------------------------------------------------------------------
7 | 5 | harcoded_text_value
(1 row)
SET client_min_messages TO DEBUG2;
-- test upsert with INSERT .. SELECT queries
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = upsert_test_1.other_col + 1;
-- Fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO DEBUG2;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
ERROR: functions used in the DO UPDATE SET clause of INSERTs on distributed tables must be marked IMMUTABLE
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
SELECT reload_tables();
DEBUG: function does not have co-located tables
reload_tables
---------------------------------------------------------------------
(1 row)
ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a);
DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "nullkey_c1_t1_pkey" for table "nullkey_c1_t1"
DEBUG: verifying table "nullkey_c1_t1"
ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b);
DEBUG: ALTER TABLE / ADD PRIMARY KEY will create implicit index "distributed_table_c1_t1_pkey" for table "distributed_table_c1_t1"
DEBUG: verifying table "distributed_table_c1_t1"
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
DEBUG: distributed statement: INSERT INTO insert_select_single_shard_table.nullkey_c1_t1_1820000 AS t1 (a, b) SELECT t3.a, t3.b FROM (insert_select_single_shard_table.nullkey_c1_t2_1820001 t2 JOIN insert_select_single_shard_table.reference_table_1820003 t3 ON ((t2.a OPERATOR(pg_catalog.=) t3.a))) ON CONFLICT(a) DO UPDATE SET a = (t1.a OPERATOR(pg_catalog.+) 10)
SET client_min_messages TO DEBUG1;
INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b)
DO UPDATE SET b = t1.b + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- This also fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a)
DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3);
DEBUG: distributed INSERT ... SELECT cannot reference a distributed table without a shard key together with non-colocated distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
ERROR: cannot execute a distributed query from a query on a shard
DETAIL: Executing a distributed query in a function call that may be pushed to a remote node can lead to incorrect results.
HINT: Avoid nesting of distributed queries or use alter user current_user set citus.allow_nested_distributed_execution to on to allow it with possible incorrectness.
CONTEXT: while executing command on localhost:xxxxx
SET client_min_messages TO DEBUG2;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1;
DEBUG: Router planner cannot handle multi-shard select queries
avg | avg
---------------------------------------------------------------------
5.0000000000000000 | 9.2857142857142857
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c1_t1;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
7.5000000000000000 | 4.1666666666666667
(1 row)
SELECT avg(a), avg(b) FROM nullkey_c1_t2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
avg | avg
---------------------------------------------------------------------
4.5000000000000000 | 4.5000000000000000
(1 row)
SELECT * FROM upsert_test_1 ORDER BY unique_col;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
unique_col | other_col | third_col
---------------------------------------------------------------------
3 | 6 | 7
(1 row)
SELECT * FROM upsert_test_2 ORDER BY key;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
key | value
---------------------------------------------------------------------
1 | 15
(1 row)
SELECT * FROM upsert_test_3 ORDER BY key_1, key_2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
key_1 | key_2 | value
---------------------------------------------------------------------
1 | 1 | 1
5 | 2 | default_value
7 | 5 | harcoded_text_value
(3 rows)
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_single_shard_table CASCADE;
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)

View File

@ -3228,6 +3228,154 @@ WHEN NOT MATCHED THEN
INSERT VALUES(dist_source.id, dist_source.val);
ERROR: For MERGE command, all the distributed tables must be colocated, for append/range distribution, colocation is not supported
HINT: Consider using hash distribution instead
-- test merge with single-shard tables
CREATE SCHEMA query_single_shard_table;
SET search_path TO query_single_shard_table;
SET client_min_messages TO DEBUG2;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE nullkey_c2_t1(a int, b int);
CREATE TABLE nullkey_c2_t2(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE distributed_table(a int, b int);
SELECT create_distributed_table('distributed_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
CREATE TABLE postgres_local_table(a int, b int);
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- with a colocated table
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000148 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE;
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000148 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000148 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000148 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
DEBUG: Creating MERGE router plan
-- with non-colocated single-shard table
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
ERROR: For MERGE command, all the distributed tables must be colocated
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
ERROR: For MERGE command, all the distributed tables must be colocated
-- with a distributed table
MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN UPDATE SET b = distributed_table.b
WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b);
ERROR: For MERGE command, all the distributed tables must be colocated
MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
ERROR: For MERGE command, all the distributed tables must be colocated
-- with a reference table
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
ERROR: MERGE command is not supported with combination of distributed/reference yet
HINT: If target is distributed, source must be distributed and co-located
MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
ERROR: Reference table as target is not allowed in MERGE command
-- with a citus local table
MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN UPDATE SET b = citus_local_table.b;
ERROR: MERGE command is not supported with combination of distributed/local tables yet
MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported with combination of distributed/local tables yet
-- with a postgres table
MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b;
ERROR: MERGE command is not supported with combination of distributed/local tables yet
MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
ERROR: MERGE command is not supported with combination of distributed/local tables yet
-- using ctes
WITH cte AS (
SELECT * FROM nullkey_c1_t1
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000147 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
DEBUG: Creating MERGE router plan
WITH cte AS (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
ERROR: For MERGE command, all the distributed tables must be colocated
WITH cte AS materialized (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
ERROR: For MERGE command, all the distributed tables must be colocated
SET client_min_messages TO WARNING;
DROP SCHEMA query_single_shard_table CASCADE;
RESET client_min_messages;
SET search_path TO merge_schema;
DROP SERVER foreign_server CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to user mapping for postgres on server foreign_server

View File

@ -612,10 +612,10 @@ CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
DETAIL: Currently, colocate_with option is not supported with append / range distributed tables and local tables added to metadata.
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
DETAIL: Currently, colocate_with option is not supported for append / range distributed tables.
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
ERROR: relation table_postgresql is not distributed
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');

View File

@ -1735,6 +1735,33 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a single-shard table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', null, colocate_with=>'none', distribution_type=>null);
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- and make sure that we can't remove the coordinator due to "test"
SELECT citus_remove_node('localhost', :master_port);
ERROR: cannot remove or disable the node localhost:xxxxx because because it contains the only shard placement for shard xxxxx
DETAIL: One of the table(s) that prevents the operation complete successfully is public.test
HINT: To proceed, either drop the tables or use undistribute_table() function to convert them to local tables
DROP TABLE test;
-- and now we should be able to remove the coordinator
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);

File diff suppressed because it is too large Load Diff

View File

@ -101,8 +101,63 @@ SELECT pg_reload_conf();
t
(1 row)
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- try creating a single-shard table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;

View File

@ -101,8 +101,63 @@ SELECT pg_reload_conf();
t
(1 row)
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- try creating a single-shard table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;

View File

@ -0,0 +1,13 @@
ALTER FUNCTION create_distributed_table RENAME TO create_distributed_table_internal;
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM create_distributed_table_internal(table_name, NULL, NULL, colocate_with, NULL);
END;
$function$;

View File

@ -32,6 +32,7 @@ test: escape_extension_name
test: ref_citus_local_fkeys
test: alter_database_owner
test: distributed_triggers
test: create_single_shard_table
test: multi_test_catalog_views
test: multi_table_ddl
@ -67,7 +68,7 @@ test: multi_master_protocol multi_load_data multi_load_data_superuser multi_beha
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_behavioral_analytics_create_table_superuser
test: multi_shard_update_delete recursive_dml_with_different_planners_executors
test: insert_select_repartition window_functions dml_recursive multi_insert_select_window
test: multi_insert_select_conflict citus_table_triggers
test: multi_insert_select_conflict citus_table_triggers alter_table_single_shard_table
test: multi_row_insert insert_select_into_local_table alter_index
# following should not run in parallel because it relies on connection counts to workers
@ -199,6 +200,8 @@ test: local_table_join
test: local_dist_join_mixed
test: citus_local_dist_joins
test: recurring_outer_join
test: query_single_shard_table
test: insert_select_single_shard_table
test: pg_dump
# ---------

View File

@ -0,0 +1 @@
test: single_shard_table_prep

View File

@ -0,0 +1,98 @@
CREATE SCHEMA alter_null_dist_key;
SET search_path TO alter_null_dist_key;
SET citus.next_shard_id TO 1720000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
CREATE SEQUENCE dist_seq;
CREATE TABLE null_dist_table(a bigint DEFAULT nextval('dist_seq') UNIQUE, "b" text, c bigint GENERATED BY DEFAULT AS IDENTITY);
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT create_distributed_table('null_dist_table', null, colocate_with=>'none', distribution_type=>null);
-- add column
ALTER TABLE null_dist_table ADD COLUMN d bigint DEFAULT 2;
SELECT * FROM null_dist_table ORDER BY c;
-- alter default, set to 3
ALTER TABLE null_dist_table ALTER COLUMN d SET DEFAULT 3;
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT * FROM null_dist_table ORDER BY c;
-- drop default, see null
ALTER TABLE null_dist_table ALTER COLUMN d DROP DEFAULT;
INSERT INTO null_dist_table("b") VALUES ('test');
SELECT * FROM null_dist_table ORDER BY c;
-- cleanup the rows that were added to test the default behavior
DELETE FROM null_dist_table WHERE "b" = 'test' AND a > 1;
-- alter column type
ALTER TABLE null_dist_table ALTER COLUMN d TYPE text;
UPDATE null_dist_table SET d = 'this is a text' WHERE d = '2';
SELECT * FROM null_dist_table ORDER BY c;
-- drop seq column
ALTER TABLE null_dist_table DROP COLUMN a;
SELECT * FROM null_dist_table ORDER BY c;
-- add not null constraint
ALTER TABLE null_dist_table ALTER COLUMN b SET NOT NULL;
-- not null constraint violation, error out
INSERT INTO null_dist_table VALUES (NULL, 2, 'test');
-- drop not null constraint and try again
ALTER TABLE null_dist_table ALTER COLUMN b DROP NOT NULL;
INSERT INTO null_dist_table VALUES (NULL, 3, 'test');
SELECT * FROM null_dist_table ORDER BY c;
-- add exclusion constraint
ALTER TABLE null_dist_table ADD CONSTRAINT exc_b EXCLUDE USING btree (b with =);
-- rename the exclusion constraint, errors out
ALTER TABLE null_dist_table RENAME CONSTRAINT exc_b TO exc_b_1;
-- create exclusion constraint without a name
ALTER TABLE null_dist_table ADD EXCLUDE USING btree (b with =);
-- test setting autovacuum option
ALTER TABLE null_dist_table SET (autovacuum_enabled = false);
-- test multiple subcommands
ALTER TABLE null_dist_table ADD COLUMN int_column1 INTEGER,
DROP COLUMN d;
SELECT * FROM null_dist_table ORDER BY c;
-- test policy and row level security
CREATE TABLE null_dist_key_with_policy (table_user text);
INSERT INTO null_dist_key_with_policy VALUES ('user_1');
SELECT create_distributed_table('null_dist_key_with_policy', null);
-- enable rls
ALTER TABLE null_dist_key_with_policy ENABLE ROW LEVEL SECURITY;
-- user_1 will be allowed to see the inserted row
CREATE ROLE user_1 WITH LOGIN;
GRANT ALL ON SCHEMA alter_null_dist_key TO user_1;
GRANT ALL ON TABLE alter_null_dist_key.null_dist_key_with_policy TO user_1;
CREATE POLICY table_policy ON null_dist_key_with_policy TO user_1
USING (table_user = current_user);
-- user_2 will not be allowed to see the inserted row
CREATE ROLE user_2 WITH LOGIN;
GRANT ALL ON SCHEMA alter_null_dist_key TO user_2;
GRANT ALL ON TABLE alter_null_dist_key.null_dist_key_with_policy TO user_2;
CREATE POLICY table_policy_1 ON null_dist_key_with_policy TO user_2
USING (table_user = current_user);
\c - user_1 -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
\c - user_2 -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
-- postgres will always be allowed to see the row as a superuser
\c - postgres -
SELECT * FROM alter_null_dist_key.null_dist_key_with_policy;
-- cleanup
SET client_min_messages TO ERROR;
DROP SCHEMA alter_null_dist_key CASCADE;
DROP ROLE user_1, user_2;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,470 @@
CREATE SCHEMA insert_select_single_shard_table;
SET search_path TO insert_select_single_shard_table;
SET citus.next_shard_id TO 1820000;
SET citus.shard_count TO 32;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SET client_min_messages TO NOTICE;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
CREATE TABLE nullkey_c2_t1(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
CREATE TABLE distributed_table_c1_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t1', 'a');
CREATE TABLE distributed_table_c1_t2(a int, b int);
SELECT create_distributed_table('distributed_table_c1_t2', 'a');
CREATE TABLE distributed_table_c2_t1(a int, b int);
SELECT create_distributed_table('distributed_table_c2_t1', 'a', colocate_with=>'none');
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
CREATE TABLE postgres_local_table(a int, b int);
CREATE FUNCTION reload_tables() RETURNS void AS $$
BEGIN
SET LOCAL client_min_messages TO WARNING;
TRUNCATE nullkey_c1_t1, nullkey_c1_t2, nullkey_c2_t1, reference_table, distributed_table_c1_t1,
distributed_table_c1_t2, distributed_table_c2_t1, citus_local_table, postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c1_t2 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
INSERT INTO distributed_table_c1_t2 SELECT i, i FROM generate_series(2, 9) i;
INSERT INTO distributed_table_c2_t1 SELECT i, i FROM generate_series(5, 10) i;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
END;
$$ LANGUAGE plpgsql;
SELECT reload_tables();
CREATE TABLE append_table (a int, b int);
SELECT create_distributed_table('append_table', 'a', 'append');
SELECT master_create_empty_shard('append_table') AS shardid1 \gset
SELECT master_create_empty_shard('append_table') AS shardid2 \gset
SELECT master_create_empty_shard('append_table') AS shardid3 \gset
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid1);
1, 40
2, 42
3, 44
4, 46
5, 48
\.
COPY append_table (a, b) FROM STDIN WITH (format 'csv', append_to_shard :shardid2);
6, 50
7, 52
8, 54
9, 56
10, 58
\.
CREATE TABLE range_table(a int, b int);
SELECT create_distributed_table('range_table', 'a', 'range');
CALL public.create_range_partitioned_shards('range_table', '{"0","25"}','{"24","49"}');
INSERT INTO range_table VALUES (0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 50);
CREATE MATERIALIZED VIEW matview AS SELECT b*2+a AS a, a*a AS b FROM nullkey_c1_t1;
SET client_min_messages TO DEBUG2;
-- Test inserting into a distributed table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 RIGHT JOIN reference_table USING (b) WHERE reference_table.a >= 1 AND reference_table.a <= 5;
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 INTERSECT SELECT * FROM reference_table;
-- use a colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN matview USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c1_t2;
-- use a non-colocated single-shard table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT * FROM nullkey_c1_t1 UNION SELECT * FROM nullkey_c2_t1;
-- use a distributed table that is colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO distributed_table_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
-- use a distributed table that is not colocated with the target table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN distributed_table_c2_t1 USING (a);
-- use a citus local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO distributed_table_c1_t1 SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
-- use append / range distributed tables
INSERT INTO range_table SELECT * FROM nullkey_c1_t1;
INSERT INTO append_table SELECT * FROM nullkey_c1_t1;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1 ORDER BY 1, 2;
TRUNCATE distributed_table_c1_t1;
INSERT INTO distributed_table_c1_t1 SELECT i, i FROM generate_series(3, 8) i;
-- Test inserting into a reference table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b);
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 UNION SELECT * FROM reference_table;
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN reference_table USING (b) WHERE b IN (SELECT b FROM matview);
-- use a colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 FULL JOIN nullkey_c1_t2 USING (a);
-- use a non-colocated single-shard table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 LEFT JOIN nullkey_c2_t1 USING (a);
-- use a distributed table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (b);
INSERT INTO reference_table SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a) WHERE distributed_table_c1_t2.a = 1;
-- use a citus local table
INSERT INTO reference_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO reference_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
SELECT avg(a), avg(b) FROM reference_table ORDER BY 1, 2;
TRUNCATE reference_table;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
-- Test inserting into a citus local table by selecting from a combination of
-- different table types together with single-shard tables.
-- use a single-shard table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1;
-- use a reference table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
-- use a colocated single-shard table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN nullkey_c1_t2 USING (b);
-- use a distributed table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN distributed_table_c1_t2 USING (a);
-- use a citus local table
INSERT INTO citus_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN citus_local_table USING (a);
-- use a postgres local table
INSERT INTO citus_local_table SELECT nullkey_c1_t2.a, nullkey_c1_t2.b FROM nullkey_c1_t2 JOIN postgres_local_table USING (a);
SELECT avg(a), avg(b) FROM citus_local_table ORDER BY 1, 2;
TRUNCATE citus_local_table;
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
-- Test inserting into a single-shard table by selecting from a combination of
-- different table types, together with or without single-shard tables.
-- use a postgres local table
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table;
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table JOIN reference_table USING (a);
INSERT INTO nullkey_c1_t1 SELECT postgres_local_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN nullkey_c1_t1 USING (a);
-- use a citus local table
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table;
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN reference_table USING (a) JOIN postgres_local_table USING (a) ORDER BY 1,2 OFFSET 7;
INSERT INTO nullkey_c1_t1 SELECT citus_local_table.a, citus_local_table.b FROM citus_local_table JOIN nullkey_c1_t1 USING (a);
-- use a distributed table
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2;
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN reference_table USING (a);
INSERT INTO nullkey_c1_t1 SELECT distributed_table_c1_t2.a, distributed_table_c1_t2.b FROM distributed_table_c1_t2 JOIN nullkey_c1_t1 USING (a);
-- use a non-colocated single-shard table
INSERT INTO nullkey_c2_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table LEFT JOIN nullkey_c1_t1 USING (a)) q JOIN nullkey_c1_t2 USING (a);
-- use a materialized view
INSERT INTO nullkey_c1_t1 SELECT * FROM matview;
INSERT INTO nullkey_c1_t1 SELECT reference_table.a, reference_table.b FROM reference_table JOIN matview ON (reference_table.a = matview.a);
INSERT INTO nullkey_c1_t1 SELECT q.* FROM (SELECT reference_table.* FROM reference_table JOIN nullkey_c1_t1 USING (a)) q JOIN matview USING (a);
-- use append / range distributed tables
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table;
INSERT INTO nullkey_c1_t1 SELECT * FROM append_table;
SELECT avg(a), avg(b) FROM nullkey_c1_t1 ORDER BY 1, 2;
SELECT avg(a), avg(b) FROM nullkey_c2_t1 ORDER BY 1, 2;
TRUNCATE nullkey_c1_t1, nullkey_c2_t1;
INSERT INTO nullkey_c1_t1 SELECT i, i FROM generate_series(1, 8) i;
INSERT INTO nullkey_c2_t1 SELECT i, i FROM generate_series(2, 7) i;
-- Test inserting into a local table by selecting from a combination of
-- different table types, together with or without single-shard tables.
INSERT INTO postgres_local_table SELECT nullkey_c1_t1.a, nullkey_c1_t1.b FROM nullkey_c1_t1 JOIN reference_table USING (a);
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 ORDER BY 1,2 OFFSET 3 LIMIT 2;
WITH cte_1 AS (
DELETE FROM nullkey_c1_t1 WHERE a >= 1 and a <= 4 RETURNING *
)
INSERT INTO postgres_local_table SELECT cte_1.* FROM cte_1 LEFT JOIN nullkey_c1_t2 USING (a) WHERE nullkey_c1_t2.a IS NULL;
INSERT INTO postgres_local_table SELECT * FROM nullkey_c1_t1 EXCEPT SELECT * FROM postgres_local_table;
SELECT avg(a), avg(b) FROM postgres_local_table ORDER BY 1, 2;
TRUNCATE postgres_local_table;
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- Try slightly more complex queries.
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT reference_table.a, postgres_local_table.b FROM postgres_local_table LEFT JOIN reference_table USING (b)
)
INSERT INTO distributed_table_c1_t1
SELECT cte_1.* FROM cte_1 JOIN cte_2 USING (a) JOIN distributed_table_c1_t2 USING (a) ORDER BY 1,2;
WITH cte_1 AS (
SELECT nullkey_c1_t1.a, reference_table.b FROM nullkey_c1_t1 JOIN reference_table USING (a)
),
cte_2 AS (
SELECT * FROM nullkey_c1_t2 WHERE EXISTS (
SELECT 1 FROM reference_table WHERE reference_table.a = nullkey_c1_t2.a
)
ORDER BY 1,2 OFFSET 1 LIMIT 4
)
INSERT INTO distributed_table_c1_t1
SELECT * FROM cte_1 UNION SELECT * FROM cte_2 EXCEPT SELECT * FROM reference_table;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT b FROM nullkey_c1_t2 ORDER BY b DESC LIMIT 1
) t2
ON t1.b < t2.b;
INSERT INTO distributed_table_c1_t1 (a, b)
WITH cte AS (
SELECT a, b,
(SELECT a FROM nullkey_c1_t2 WHERE b = t.b) AS d1,
(SELECT a FROM reference_table WHERE b = t.b) AS d2
FROM nullkey_c1_t1 t
)
SELECT d1, COALESCE(d2, a) FROM cte WHERE d1 IS NOT NULL AND d2 IS NOT NULL;
INSERT INTO citus_local_table (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
CROSS JOIN (
SELECT b FROM nullkey_c2_t1 ORDER BY b LIMIT 1
) t2;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM reference_table t1
LEFT JOIN (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM nullkey_c1_t1
) t2 ON t1.b = t2.b
WHERE t2.rn > 0;
INSERT INTO nullkey_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT rn, b
FROM (
SELECT b, ROW_NUMBER() OVER (ORDER BY b DESC) AS rn
FROM distributed_table_c2_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.rn > 2;
INSERT INTO distributed_table_c1_t1 (a, b)
SELECT t1.a, t2.b
FROM nullkey_c1_t1 t1
JOIN (
SELECT sum_val, b
FROM (
SELECT b, SUM(a) OVER (PARTITION BY b) AS sum_val
FROM nullkey_c1_t1
) q
) t2 ON t1.b = t2.b
WHERE t2.sum_val > 2;
-- MultiTaskRouterSelectQuerySupported() is unnecessarily restrictive
-- about pushing down queries with DISTINCT ON clause even if the table
-- doesn't have a shard key. See https://github.com/citusdata/citus/pull/6752.
INSERT INTO nullkey_c1_t1 SELECT DISTINCT ON (a) a, b FROM nullkey_c1_t2;
-- Similarly, we could push down the following query as well. see
-- https://github.com/citusdata/citus/pull/6831.
INSERT INTO nullkey_c1_t1 SELECT b, SUM(a) OVER (ORDER BY b) AS sum_val FROM nullkey_c1_t1;
INSERT INTO nullkey_c2_t1
SELECT t2.a, t2.b
FROM nullkey_c1_t1 AS t2
JOIN reference_table AS t3 ON (t2.a = t3.a)
WHERE NOT EXISTS (
SELECT 1 FROM nullkey_c1_t2 AS t1 WHERE t1.b = t3.b
);
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM nullkey_c1_t1 AS t1
WHERE t1.a NOT IN (
SELECT DISTINCT t2.a FROM distributed_table_c1_t2 AS t2
);
INSERT INTO distributed_table_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
JOIN (
SELECT t2.a FROM (
SELECT a FROM nullkey_c1_t1
UNION
SELECT a FROM nullkey_c1_t2
) AS t2
) AS t3 ON t1.a = t3.a;
-- Temporaryly reduce the verbosity to avoid noise
-- in the output of the next query.
SET client_min_messages TO DEBUG1;
INSERT INTO nullkey_c1_t1
SELECT t1.a, t1.b
FROM reference_table AS t1
WHERE t1.a IN (
SELECT t2.a FROM (
SELECT t3.a FROM (
SELECT a FROM distributed_table_c1_t1 WHERE b > 4
) AS t3
JOIN (
SELECT a FROM distributed_table_c1_t2 WHERE b < 7
) AS t4 ON t3.a = t4.a
) AS t2
);
SET client_min_messages TO DEBUG2;
-- test upsert with plain INSERT query
CREATE TABLE upsert_test_1
(
unique_col int UNIQUE,
other_col int,
third_col int
);
SELECT create_distributed_table('upsert_test_1', null);
CREATE TABLE upsert_test_2(key int primary key, value text);
SELECT create_distributed_table('upsert_test_2', null);
INSERT INTO upsert_test_2 AS upsert_test_2_alias (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2_alias.value::int * 2)::text;
INSERT INTO upsert_test_2 (key, value) VALUES (1, '5') ON CONFLICT(key)
DO UPDATE SET value = (upsert_test_2.value::int * 3)::text;
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
INSERT INTO upsert_test_1 VALUES (3, 5, 7);
INSERT INTO upsert_test_1 (unique_col, other_col) VALUES (1, 1) ON CONFLICT (unique_col) WHERE unique_col = random()::int
DO UPDATE SET other_col = 5;
CREATE TABLE upsert_test_3 (key_1 int, key_2 bigserial, value text DEFAULT 'default_value', PRIMARY KEY (key_1, key_2));
SELECT create_distributed_table('upsert_test_3', null);
INSERT INTO upsert_test_3 VALUES (1, DEFAULT, '1') RETURNING *;
INSERT INTO upsert_test_3 VALUES (5, DEFAULT, DEFAULT) RETURNING *;
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_3 SELECT 7, other_col, 'harcoded_text_value' FROM upsert_test_1 RETURNING *;
SET client_min_messages TO DEBUG2;
-- test upsert with INSERT .. SELECT queries
SET client_min_messages TO DEBUG1;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = upsert_test_1.other_col + 1;
-- Fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = (SELECT count(*) from upsert_test_1);
SET client_min_messages TO DEBUG2;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = random()::int;
INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col FROM upsert_test_1 ON CONFLICT (unique_col)
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
SELECT reload_tables();
ALTER TABLE nullkey_c1_t1 ADD PRIMARY KEY (a);
ALTER TABLE distributed_table_c1_t1 ADD PRIMARY KEY (a,b);
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
SET client_min_messages TO DEBUG1;
INSERT INTO distributed_table_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM nullkey_c1_t2 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a, b)
DO UPDATE SET b = t1.b + 10;
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) ON CONFLICT (a)
DO UPDATE SET a = t1.a + 10;
-- This also fails due to https://github.com/citusdata/citus/issues/6826.
INSERT INTO nullkey_c1_t1 AS t1 (a, b) SELECT t3.a, t3.b FROM distributed_table_c1_t1 t2 JOIN reference_table t3 ON (t2.a = t3.a) WHERE t2.a = 3 ON CONFLICT (a)
DO UPDATE SET a = (SELECT max(b)+1 FROM distributed_table_c1_t1 WHERE a = 3);
SET client_min_messages TO DEBUG2;
SELECT avg(a), avg(b) FROM distributed_table_c1_t1;
SELECT avg(a), avg(b) FROM nullkey_c1_t1;
SELECT avg(a), avg(b) FROM nullkey_c1_t2;
SELECT * FROM upsert_test_1 ORDER BY unique_col;
SELECT * FROM upsert_test_2 ORDER BY key;
SELECT * FROM upsert_test_3 ORDER BY key_1, key_2;
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_single_shard_table CASCADE;
SELECT citus_remove_node('localhost', :master_port);

View File

@ -2051,6 +2051,118 @@ UPDATE SET val = dist_source.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_source.id, dist_source.val);
-- test merge with single-shard tables
CREATE SCHEMA query_single_shard_table;
SET search_path TO query_single_shard_table;
SET client_min_messages TO DEBUG2;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
CREATE TABLE nullkey_c2_t1(a int, b int);
CREATE TABLE nullkey_c2_t2(a int, b int);
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
CREATE TABLE reference_table(a int, b int);
SELECT create_reference_table('reference_table');
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
CREATE TABLE distributed_table(a int, b int);
SELECT create_distributed_table('distributed_table', 'a');
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
CREATE TABLE postgres_local_table(a int, b int);
INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
-- with a colocated table
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE;
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
-- with non-colocated single-shard table
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
-- with a distributed table
MERGE INTO nullkey_c1_t1 USING distributed_table ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN UPDATE SET b = distributed_table.b
WHEN NOT MATCHED THEN INSERT VALUES (distributed_table.a, distributed_table.b);
MERGE INTO distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = distributed_table.a)
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
-- with a reference table
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
MERGE INTO reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = reference_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
-- with a citus local table
MERGE INTO nullkey_c1_t1 USING citus_local_table ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN UPDATE SET b = citus_local_table.b;
MERGE INTO citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = citus_local_table.a)
WHEN MATCHED THEN DELETE;
-- with a postgres table
MERGE INTO nullkey_c1_t1 USING postgres_local_table ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = postgres_local_table.b;
MERGE INTO postgres_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = postgres_local_table.a)
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
-- using ctes
WITH cte AS (
SELECT * FROM nullkey_c1_t1
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
WITH cte AS (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
WITH cte AS materialized (
SELECT * FROM distributed_table
)
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
WHEN MATCHED THEN UPDATE SET b = cte.b;
SET client_min_messages TO WARNING;
DROP SCHEMA query_single_shard_table CASCADE;
RESET client_min_messages;
SET search_path TO merge_schema;
DROP SERVER foreign_server CASCADE;
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;

View File

@ -904,6 +904,20 @@ SELECT create_distributed_table('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a single-shard table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', null, colocate_with=>'none', distribution_type=>null);
-- and make sure that we can't remove the coordinator due to "test"
SELECT citus_remove_node('localhost', :master_port);
DROP TABLE test;
-- and now we should be able to remove the coordinator
SELECT citus_remove_node('localhost', :master_port);
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);

File diff suppressed because it is too large Load Diff

View File

@ -63,8 +63,43 @@ ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
-- try creating a single-shard table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output

View File

@ -0,0 +1,14 @@
ALTER FUNCTION create_distributed_table RENAME TO create_distributed_table_internal;
CREATE OR REPLACE FUNCTION pg_catalog.create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM create_distributed_table_internal(table_name, NULL, NULL, colocate_with, NULL);
END;
$function$;