Add support for creating distributed tables with a null shard key (#6745)

With this PR, we allow creating distributed tables with without
specifying a shard key via create_distributed_table(). Here are the
the important details about those tables:
* Specifying `shard_count` is not allowed because it is assumed to be 1.
* We mostly call such tables as "null shard-key" table in code /
comments.
* To avoid doing a breaking layout change in create_distributed_table();
instead of throwing an error, it will inform the user that
`distribution_type`
  param is ignored unless it's explicitly set to NULL or  'h'.
* `colocate_with` param allows colocating such null shard-key tables to
  each other.
* We define this table type, i.e., NULL_SHARD_KEY_TABLE, as a subclass
of
  DISTRIBUTED_TABLE because we mostly want to treat them as distributed
  tables in terms of SQL / DDL / operation support.
* Metadata for such tables look like:
  - distribution method => DISTRIBUTE_BY_NONE
  - replication model => REPLICATION_MODEL_STREAMING
- colocation id => **!=** INVALID_COLOCATION_ID (distinguishes from
Citus local tables)
* We assign colocation groups for such tables to different nodes in a
  round-robin fashion based on the modulo of "colocation id".

Note that this PR doesn't care about DDL (except CREATE TABLE) / SQL /
operation (i.e., Citus UDFs) support for such tables but adds a
preliminary
API.
pull/6867/head
Onur Tirtir 2023-03-17 14:17:36 +03:00
parent 2d005ac777
commit fa467e05e7
27 changed files with 2911 additions and 77 deletions

View File

@ -134,6 +134,7 @@ static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
@ -141,6 +142,8 @@ static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId);
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams,
Var *distributionColumn);
@ -216,51 +219,86 @@ create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
if (distributionColumnText)
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
if (PG_ARGISNULL(2))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
PG_RETURN_VOID();
}
shardCount = PG_GETARG_INT32(4);
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}
/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
shardCount = PG_GETARG_INT32(4);
/*
* If shard_count parameter is given, then we have to
* make sure table has that many shards.
*/
shardCountIsStrict = true;
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Assert(distributionColumnName != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
else
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
if (!PG_ARGISNULL(4))
{
ereport(ERROR, (errmsg("shard_count can't be specified when the "
"distribution column is null because in "
"that case it's automatically set to 1")));
}
CreateDistributedTable(relationId, distributionColumnName, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName);
if (!PG_ARGISNULL(2) &&
LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH)
{
/*
* As we do for shard_count parameter, we could throw an error if
* distribution_type is not NULL when creating a null-shard-key table.
* However, this requires changing the default value of distribution_type
* parameter to NULL and this would mean a breaking change for most
* users because they're mostly using this API to create sharded
* tables. For this reason, here we instead do nothing if the distribution
* method is DISTRIBUTE_BY_HASH.
*/
ereport(ERROR, (errmsg("distribution_type can't be specified "
"when the distribution column is null ")));
}
CreateNullShardKeyDistTable(relationId, colocateWithTableName);
}
PG_RETURN_VOID();
}
@ -276,11 +314,18 @@ create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
if (PG_ARGISNULL(0) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
if (PG_ARGISNULL(1))
{
ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently "
"to create a distributed table with a null shard "
"key, consider using create_distributed_table()")));
}
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
char *distributionColumnName = text_to_cstring(distributionColumnText);
@ -982,6 +1027,23 @@ CreateReferenceTable(Oid relationId)
}
/*
* CreateNullShardKeyDistTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
*/
static void
CreateNullShardKeyDistTable(Oid relationId, char *colocateWithTableName)
{
DistributedTableParams distributedTableParams = {
.colocateWithTableName = colocateWithTableName,
.shardCount = 1,
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, NULL_KEY_DISTRIBUTED_TABLE, &distributedTableParams);
}
/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
@ -1000,7 +1062,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
tableType == RANGE_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE) !=
(distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "
@ -1078,7 +1141,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
PropagatePrerequisiteObjectsForDistributedTable(relationId);
Var *distributionColumn = NULL;
if (distributedTableParams)
if (distributedTableParams && distributedTableParams->distributionColumnName)
{
distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributedTableParams->
@ -1150,6 +1213,11 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
{
CreateReferenceTableShard(relationId);
}
else if (tableType == NULL_KEY_DISTRIBUTED_TABLE)
{
CreateNullShardKeyDistTableShard(relationId, colocatedTableId,
colocationId);
}
if (ShouldSyncTableMetadata(relationId))
{
@ -1204,7 +1272,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}
/* copy over data for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
if (tableType == HASH_DISTRIBUTED || tableType == NULL_KEY_DISTRIBUTED_TABLE ||
tableType == REFERENCE_TABLE)
{
if (RegularTable(relationId))
{
@ -1268,6 +1337,13 @@ DecideCitusTableParams(CitusTableType tableType,
break;
}
case NULL_KEY_DISTRIBUTED_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING;
break;
}
case REFERENCE_TABLE:
{
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
@ -1630,6 +1706,41 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
}
/*
* CreateHashDistributedTableShards creates the shard of given null-shard-key
* distributed table.
*/
static void
CreateNullShardKeyDistTableShard(Oid relationId, Oid colocatedTableId,
uint32 colocationId)
{
if (colocatedTableId != InvalidOid)
{
/*
* We currently allow concurrent distribution of colocated tables (which
* we probably should not be allowing because of foreign keys /
* partitioning etc).
*
* We also prevent concurrent shard moves / copy / splits) while creating
* a colocated table.
*/
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
"colocate distributed table");
/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
CreateNullKeyShardWithRoundRobinPolicy(relationId, colocationId);
}
}
/*
* ColocationIdForNewTable returns a colocation id for given table
* according to given configuration. If there is no such configuration, it
@ -1662,8 +1773,8 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"for append / range distributed tables.")));
}
return colocationId;
@ -1679,10 +1790,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid;
/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
@ -1871,8 +1983,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
*/
if (PartitionedTableNoLock(relationId))
{
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
/*
* Distributing partitioned tables is only supported for hash-distribution
* or null-shard-key tables.
*/
bool isNullShardKeyTable =
distributionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_STREAMING &&
colocationId != INVALID_COLOCATION_ID;
if (distributionMethod != DISTRIBUTE_BY_HASH && !isNullShardKeyTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "

View File

@ -303,6 +303,11 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
/*
* Foreign keys from citus local tables or reference tables to distributed
* tables are not supported.
*
* We could support foreign keys from references tables to null-shard-key
* tables but this doesn't seem useful a lot. However, if we decide supporting
* this, then we need to expand relation access tracking check for the null-shard-key
* tables too.
*/
if (referencingIsCitusLocalOrRefTable && !referencedIsCitusLocalOrRefTable)
{
@ -361,7 +366,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
* if tables are hash-distributed and colocated, we need to make sure that
* the distribution key is included in foreign constraint.
*/
if (!referencedIsCitusLocalOrRefTable && !foreignConstraintOnDistKey)
bool referencedIsNullShardKeyTable =
IsNullShardKeyTableByDistParams(referencedDistMethod,
referencedReplicationModel,
referencedColocationId);
if (!referencedIsCitusLocalOrRefTable && !referencedIsNullShardKeyTable &&
!foreignConstraintOnDistKey)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),

View File

@ -2146,6 +2146,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
}
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
!IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE) &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),

View File

@ -324,7 +324,8 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
if (IsCitusTable(relationId) && !HasDistributionKey(relationId) &&
if ((IsCitusTableType(relationId, REFERENCE_TABLE) ||
IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);

View File

@ -508,11 +508,21 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
return partitionMethod == DISTRIBUTE_BY_RANGE;
}
case NULL_KEY_DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}
case DISTRIBUTED_TABLE:
{
return partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_APPEND;
partitionMethod == DISTRIBUTE_BY_APPEND ||
(partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID);
}
case STRICTLY_PARTITIONED_DISTRIBUTED_TABLE:
@ -815,6 +825,21 @@ IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
}
/*
* IsNullShardKeyTableByDistParams returns true if given partitionMethod,
* replicationModel and colocationId would identify a distributed table that
* has a null shard key.
*/
bool
IsNullShardKeyTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId)
{
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC &&
colocationId != INVALID_COLOCATION_ID;
}
/*
* CitusTableList returns a list that includes all the valid distributed table
* cache entries.

View File

@ -515,7 +515,7 @@ ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
/*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is a hash distributed table or
* reference/citus local table.
* a Citus table that doesn't have shard key.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
@ -537,10 +537,11 @@ ShouldSyncTableMetadata(Oid relationId)
/*
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a distributed table should
* be propagated to metadata workers, i.e. the table is an MX table or reference table.
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a Citus table should
* be propagated to metadata workers, i.e. the table is an MX table or Citus table
* that doesn't have shard key.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
* considered as MX tables.
*
* ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads
* from catalog tables directly.
@ -1080,7 +1081,7 @@ EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId)
/*
* DistributionCreateCommands generates a commands that can be
* executed to replicate the metadata for a distributed table.
* executed to replicate the metadata for a Citus table.
*/
char *
DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)

View File

@ -217,9 +217,9 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId);
CheckHashPartitionedTable(sourceRelationId);
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
Assert(targetCacheEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
targetCacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
/*
* In contrast to append/range partitioned tables it makes more sense to
@ -259,10 +259,20 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
*newShardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, newShardIdPtr);
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
text *shardMinValueText = NULL;
text *shardMaxValueText = NULL;
if (targetCacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{
Assert(list_length(sourceShardIntervalList) == 1);
}
else
{
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
shardMinValueText = IntegerToText(shardMinValue);
shardMaxValueText = IntegerToText(shardMaxValue);
}
List *sourceShardPlacementList = ShardPlacementListSortedByWorker(
sourceShardId);
@ -362,6 +372,70 @@ CreateReferenceTableShard(Oid distributedTableId)
}
/*
* CreateNullKeyShardWithRoundRobinPolicy creates a single shard for the given
* distributedTableId. The created shard does not have min/max values.
* Unlike CreateReferenceTableShard, the shard is _not_ replicated to
* all nodes but would have a single placement like Citus local tables.
* However, this placement doesn't necessarily need to be placed on
* coordinator. This is determined based on modulo of the colocation
* id that given table has been associated to.
*/
void
CreateNullKeyShardWithRoundRobinPolicy(Oid relationId, uint32 colocationId)
{
EnsureTableOwner(relationId);
/* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(relationId, ExclusiveLock);
/*
* Load and sort the worker node list for deterministic placement.
*
* Also take a RowShareLock on pg_dist_node to disallow concurrent
* node list changes that require an exclusive lock.
*/
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}
char shardStorageType = ShardStorageType(relationId);
text *minHashTokenText = NULL;
text *maxHashTokenText = NULL;
uint64 shardId = GetNextShardId();
InsertShardRow(relationId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
/* determine the node index based on colocation id */
int roundRobinNodeIdx = colocationId % workerNodeCount;
int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
/*
* We don't need to force using exclusive connections because we're anyway
* creating a single shard.
*/
bool useExclusiveConnection = false;
bool colocatedShard = false;
CreateShardsOnWorkers(relationId, insertedShardPlacements,
useExclusiveConnection, colocatedShard);
}
/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function

View File

@ -521,7 +521,8 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
relationShard->shardId = shardInterval->shardId;
List *relationShardList = list_make1(relationShard);
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
if ((IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE)) &&
cacheEntry->colocationId != INVALID_COLOCATION_ID)
{
shardIndex = ShardIndex(shardInterval);

View File

@ -715,6 +715,13 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}
}
else if (IsCitusTableType(targetRelationId, NULL_KEY_DISTRIBUTED_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"distributed INSERT ... SELECT cannot target a distributed "
"table with a null shard key",
NULL, NULL);
}
else
{
/*

View File

@ -2487,7 +2487,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
/* non-distributed tables have only one shard */
shardInterval = cacheEntry->sortedShardIntervalArray[0];
/* only use reference table as anchor shard if none exists yet */
/* use as anchor shard only if we couldn't find any yet */
if (anchorShardId == INVALID_SHARD_ID)
{
anchorShardId = shardInterval->shardId;

View File

@ -2684,7 +2684,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
if (!HasDistributionKey(relationId))
{
/* we don't need to do shard pruning for non-distributed tables */
/* we don't need to do shard pruning for single shard tables */
return list_make1(LoadShardIntervalList(relationId));
}
@ -2974,7 +2974,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
Assert(query->commandType == CMD_INSERT);
/* reference tables and citus local tables can only have one shard */
/* tables that don't have distribution column can only have one shard */
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
@ -2992,6 +2992,12 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
ereport(ERROR, (errmsg("local table cannot have %d shards",
shardCount)));
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, NULL_KEY_DISTRIBUTED_TABLE))
{
ereport(ERROR, (errmsg("distributed tables having a null shard key "
"cannot have %d shards",
shardCount)));
}
}
ShardInterval *shardInterval = linitial(shardIntervalList);

View File

@ -195,7 +195,7 @@ RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType acce
* recursively calling RecordRelationAccessBase(), so becareful about
* removing this check.
*/
if (IsCitusTable(relationId) && HasDistributionKey(relationId))
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{
return;
}
@ -732,7 +732,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (HasDistributionKeyCacheEntry(cacheEntry) ||
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) ||
cacheEntry->referencingRelationsViaForeignKey == NIL)
{
return;
@ -931,7 +931,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
* We're only interested in foreign keys to reference tables and citus
* local tables.
*/
if (IsCitusTable(referencedRelation) && HasDistributionKey(referencedRelation))
if (IsCitusTableType(referencedRelation, DISTRIBUTED_TABLE))
{
continue;
}
@ -993,7 +993,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
bool holdsConflictingLocks = false;
Assert(!HasDistributionKeyCacheEntry(cacheEntry));
Assert(!IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE));
Oid referencingRelation = InvalidOid;
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)

View File

@ -1384,17 +1384,19 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId)
{
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
if (IsCitusTableTypeCacheEntry(sourceTableEntry, APPEND_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(sourceTableEntry, RANGE_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(sourceTableEntry, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
errdetail("Currently, colocate_with option is not supported "
"with append / range distributed tables and local "
"tables added to metadata.")));
}
char sourceReplicationModel = sourceTableEntry->replicationModel;
if (sourceReplicationModel != replicationModel)
{
char *relationName = get_rel_name(relationId);
@ -1406,7 +1408,9 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
sourceRelationName, relationName)));
}
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = !sourceDistributionColumn ? InvalidOid :
sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);

View File

@ -135,7 +135,7 @@ BuildDistributionKeyFromColumnName(Oid relationId, char *columnName, LOCKMODE lo
char *tableName = get_rel_name(relationId);
/* short circuit for reference tables */
/* short circuit for reference tables and null-shard key tables */
if (columnName == NULL)
{
return NULL;

View File

@ -206,7 +206,7 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
*
* For hash partitioned tables, it calculates hash value of a number in its
* range (e.g. min value) and finds which shard should contain the hashed
* value. For reference tables and citus local tables, it simply returns 0.
* value. For the tables that don't have a shard key, it simply returns 0.
* For the other table types, the function errors out.
*/
int
@ -231,12 +231,11 @@ ShardIndex(ShardInterval *shardInterval)
"tables that are added to citus metadata")));
}
/* short-circuit for reference tables */
/* short-circuit for the tables that don't have a distribution key */
if (!HasDistributionKeyCacheEntry(cacheEntry))
{
/*
* Reference tables and citus local tables have only a single shard,
* so the index is fixed to 0.
* Such tables have only a single shard, so the index is fixed to 0.
*/
shardIndex = 0;

View File

@ -262,6 +262,7 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void CreateNullKeyShardWithRoundRobinPolicy(Oid relationId, uint32 colocationId);
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
List *ddlCommandList,
List *foreignConstraintCommandList);

View File

@ -123,6 +123,7 @@ typedef enum
HASH_DISTRIBUTED,
APPEND_DISTRIBUTED,
RANGE_DISTRIBUTED,
NULL_KEY_DISTRIBUTED_TABLE,
/* hash, range or append distributed table */
DISTRIBUTED_TABLE,
@ -157,6 +158,8 @@ extern uint32 ColocationIdViaCatalog(Oid relationId);
bool IsReferenceTableByDistParams(char partitionMethod, char replicationModel);
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId);
extern bool IsNullShardKeyTableByDistParams(char partitionMethod, char replicationModel,
uint32 colocationId);
extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern bool ShardExists(uint64 shardId);

View File

@ -128,6 +128,7 @@ DEPS = {
"multi_mx_copy_data": TestDeps(None, ["multi_mx_create_table"]),
"multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]),
"multi_simple_queries": TestDeps("base_schedule"),
"create_null_dist_key": TestDeps("minimal_schedule"),
}

File diff suppressed because it is too large Load Diff

View File

@ -612,10 +612,10 @@ CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
DETAIL: Currently, colocate_with option is not supported with append / range distributed tables and local tables added to metadata.
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
DETAIL: Currently, colocate_with option is not supported for append / range distributed tables.
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
ERROR: relation table_postgresql is not distributed
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');

View File

@ -1735,6 +1735,33 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a null shard key table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', null, colocate_with=>'none', distribution_type=>null);
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- and make sure that we can't remove the coordinator due to "test"
SELECT citus_remove_node('localhost', :master_port);
ERROR: cannot remove or disable the node localhost:xxxxx because because it contains the only shard placement for shard xxxxx
DETAIL: One of the table(s) that prevents the operation complete successfully is public.test
HINT: To proceed, either drop the tables or use undistribute_table() function to convert them to local tables
DROP TABLE test;
-- and now we should be able to remove the coordinator
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);

View File

@ -101,8 +101,63 @@ SELECT pg_reload_conf();
t
(1 row)
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- try creating a null-shard-key distributed table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;

View File

@ -101,8 +101,63 @@ SELECT pg_reload_conf();
t
(1 row)
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- try creating a null-shard-key distributed table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
ERROR: relation "single_node_nullkey_c1_90630532" is a shard relation
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output
SET citus.next_shard_id TO 90630500;

View File

@ -32,6 +32,7 @@ test: escape_extension_name
test: ref_citus_local_fkeys
test: alter_database_owner
test: distributed_triggers
test: create_null_dist_key
test: multi_test_catalog_views
test: multi_table_ddl

View File

@ -0,0 +1,962 @@
CREATE SCHEMA create_null_dist_key;
SET search_path TO create_null_dist_key;
SET citus.next_shard_id TO 1720000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SELECT 1 FROM citus_remove_node('localhost', :worker_1_port);
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
CREATE TABLE add_node_test(a int, "b" text);
-- add a node before creating the null-shard-key table
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
SELECT create_distributed_table('add_node_test', null, colocate_with=>'none', distribution_type=>null);
-- add a node after creating the null-shard-key table
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- make sure that table is created on the worker nodes added before/after create_distributed_table
SELECT result FROM run_command_on_workers($$
SELECT COUNT(*)=1 FROM pg_class WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname='add_node_test'
$$);
-- and check the metadata tables
SELECT result FROM run_command_on_workers($$
SELECT (partmethod, partkey, repmodel, autoconverted) FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.add_node_test'::regclass
$$);
SELECT result FROM run_command_on_workers($$
SELECT (shardstorage, shardminvalue, shardmaxvalue) FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.add_node_test'::regclass
$$);
SELECT result FROM run_command_on_workers($$
SELECT COUNT(*)=1 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.add_node_test'::regclass
);
$$);
SELECT result FROM run_command_on_workers($$
SELECT (shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation) FROM pg_dist_colocation
WHERE colocationid = (
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.add_node_test'::regclass
);
$$);
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
SET client_min_messages TO NOTICE;
CREATE TABLE invalid_configs_1(a int primary key);
SELECT create_distributed_table('invalid_configs_1', null, shard_count=>2);
SELECT create_distributed_table('invalid_configs_1', null, shard_count=>1);
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
CREATE TABLE nullkey_c1_t3(a int, b int);
SELECT create_distributed_table('nullkey_c1_t1', null, colocate_with=>'none');
SELECT colocationid AS nullkey_c1_t1_colocation_id FROM pg_dist_partition WHERE logicalrelid = 'create_null_dist_key.nullkey_c1_t1'::regclass \gset
BEGIN;
DROP TABLE nullkey_c1_t1;
-- make sure that we delete the colocation group after dropping the last table that belongs to it
SELECT COUNT(*)=0 FROM pg_dist_colocation WHERE colocationid = :'nullkey_c1_t1_colocation_id';
ROLLBACK;
SELECT create_distributed_table('nullkey_c1_t2', null, colocate_with=>'nullkey_c1_t1');
SELECT create_distributed_table('nullkey_c1_t3', null, colocate_with=>'nullkey_c1_t1', distribution_type=>'append');
SELECT create_distributed_table('nullkey_c1_t3', null, colocate_with=>'nullkey_c1_t1');
CREATE TABLE nullkey_c2_t1(a int, b int);
CREATE TABLE nullkey_c2_t2(a int, b int);
CREATE TABLE nullkey_c2_t3(a int, b int);
-- create_distributed_table_concurrently is not yet supported yet
SELECT create_distributed_table_concurrently('nullkey_c2_t1', null);
SELECT create_distributed_table_concurrently('nullkey_c2_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>'hash'); -- distribution_type is ignored anyway
SELECT create_distributed_table('nullkey_c2_t3', null, colocate_with=>'nullkey_c2_t2', distribution_type=>null);
-- check the metadata for the colocated tables whose names start with nullkey_c1_
SELECT logicalrelid, partmethod, partkey, repmodel, autoconverted FROM pg_dist_partition
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c1_%'
)
ORDER BY 1;
-- make sure that all those 3 tables belong to same colocation group
SELECT COUNT(*) FROM pg_dist_partition
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c1_%'
)
GROUP BY colocationid;
SELECT logicalrelid, shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c1_%'
)
ORDER BY 1;
-- make sure that all those 3 shards are created on the same node group
SELECT COUNT(*) FROM pg_dist_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c1_%'
)
)
GROUP BY groupid;
-- check the metadata for the colocated tables whose names start with nullkey_c2_
SELECT logicalrelid, partmethod, partkey, repmodel, autoconverted FROM pg_dist_partition
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c2_%'
)
ORDER BY 1;
-- make sure that all those 3 tables belong to same colocation group
SELECT COUNT(*) FROM pg_dist_partition
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c2_%'
)
GROUP BY colocationid;
SELECT logicalrelid, shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c2_%'
)
ORDER BY 1;
-- make sure that all those 3 shards created on the same node group
SELECT COUNT(*) FROM pg_dist_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid IN (
SELECT oid FROM pg_class
WHERE relnamespace = 'create_null_dist_key'::regnamespace AND
relname LIKE 'nullkey_c2_%'
)
)
GROUP BY groupid;
-- Make sure that the colocated tables whose names start with nullkey_c1_
-- belong to a different colocation group than the ones whose names start
-- with nullkey_c2_.
--
-- It's ok to only compare nullkey_c1_t1 and nullkey_c2_t1 because we already
-- verified that null_dist_key.nullkey_c1_t1 is colocated with the other two
-- and null_dist_key.nullkey_c2_t1 is colocated with the other two.
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.nullkey_c1_t1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.nullkey_c2_t1'::regclass
);
-- Since we determine node for the placement based on the module of colocation id,
-- we don't expect those two colocation groups to get assigned to same node.
SELECT
(
SELECT groupid FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.nullkey_c1_t1'::regclass
)
)
!=
(
SELECT groupid FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.nullkey_c2_t1'::regclass
)
);
-- It's ok to only check nullkey_c1_t1 and nullkey_c2_t1 because we already
-- verified that null_dist_key.nullkey_c1_t1 is colocated with the other two
-- and null_dist_key.nullkey_c2_t1 is colocated with the other two.
SELECT shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation FROM pg_dist_colocation
WHERE colocationid = (
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.nullkey_c1_t1'::regclass
);
SELECT shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation FROM pg_dist_colocation
WHERE colocationid = (
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_null_dist_key.nullkey_c2_t1'::regclass
);
CREATE TABLE round_robin_test_c1(a int, b int);
SELECT create_distributed_table('round_robin_test_c1', null, colocate_with=>'none', distribution_type=>null);
\c - - - :master_port
SET search_path TO create_null_dist_key;
SET citus.next_shard_id TO 1730000;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
SET client_min_messages TO NOTICE;
CREATE TABLE round_robin_test_c2(a int, b int);
SELECT create_distributed_table('round_robin_test_c2', null, colocate_with=>'none', distribution_type=>null);
-- Since we determine node for the placement based on the module of colocation id,
-- we don't expect those two colocation groups to get assigned to same node even
-- after reconnecting to the coordinator.
SELECT
(
SELECT groupid FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.round_robin_test_c1'::regclass
)
)
!=
(
SELECT groupid FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'create_null_dist_key.round_robin_test_c2'::regclass
)
);
CREATE TABLE distributed_table(a int, b int);
-- cannot colocate a sharded table with null shard key table
SELECT create_distributed_table('distributed_table', 'a', colocate_with=>'nullkey_c1_t1');
CREATE TABLE reference_table(a int, b int);
CREATE TABLE local(a int, b int);
SELECT create_reference_table('reference_table');
SELECT create_distributed_table('distributed_table', 'a');
-- cannot colocate null shard key tables with other table types
CREATE TABLE cannot_colocate_with_other_types (a int, b int);
SELECT create_distributed_table('cannot_colocate_with_other_types', null, colocate_with=>'reference_table');
SELECT create_distributed_table('cannot_colocate_with_other_types', null, colocate_with=>'distributed_table');
SELECT create_distributed_table('cannot_colocate_with_other_types', null, colocate_with=>'local'); -- postgres local
SELECT citus_add_local_table_to_metadata('local');
-- cannot colocate null shard key tables with citus local tables
SELECT create_distributed_table('cannot_colocate_with_other_types', null, colocate_with=>'local'); -- citus local
SET client_min_messages TO WARNING;
-- can't create such a distributed table from another Citus table, except Citus local tables
SELECT create_distributed_table('reference_table', null, colocate_with=>'none');
SELECT create_distributed_table('distributed_table', null, colocate_with=>'none');
SELECT create_distributed_table('local', null, colocate_with=>'none');
BEGIN;
-- creating a null-shard-key table from a temporary table is not supported
CREATE TEMPORARY TABLE temp_table (a int);
SELECT create_distributed_table('temp_table', null, colocate_with=>'none', distribution_type=>null);
ROLLBACK;
-- creating a null-shard-key table from a catalog table is not supported
SELECT create_distributed_table('pg_catalog.pg_index', NULL, distribution_type=>null);
-- creating a null-shard-key table from an unlogged table is supported
CREATE UNLOGGED TABLE unlogged_table (a int);
SELECT create_distributed_table('unlogged_table', null, colocate_with=>'none', distribution_type=>null);
-- creating a null-shard-key table from a foreign table is not supported
CREATE FOREIGN TABLE foreign_table (
id bigint not null,
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true', table_name 'foreign_table');
SELECT create_distributed_table('foreign_table', null, colocate_with=>'none', distribution_type=>null);
-- create a null dist key table that has no tuples
CREATE TABLE null_dist_key_table_1 (a int primary key);
SELECT create_distributed_table('null_dist_key_table_1', null, colocate_with=>'none');
-- create a null dist key table that has some tuples
CREATE TABLE null_dist_key_table_2(a int primary key);
INSERT INTO null_dist_key_table_2 VALUES(1);
SELECT create_distributed_table('null_dist_key_table_2', null, colocate_with=>'none');
SELECT * FROM null_dist_key_table_2 ORDER BY a;
DROP TABLE null_dist_key_table_1, null_dist_key_table_2;
-- create indexes before creating the null dist key tables
-- .. for an initially empty table
CREATE TABLE null_dist_key_table_1(a int);
CREATE INDEX null_dist_key_table_1_idx ON null_dist_key_table_1(a);
SELECT create_distributed_table('null_dist_key_table_1', null, colocate_with=>'none');
-- .. and for another table having data in it before creating null dist key table
CREATE TABLE null_dist_key_table_2(a int);
INSERT INTO null_dist_key_table_2 VALUES(1);
CREATE INDEX null_dist_key_table_2_idx ON null_dist_key_table_2(a);
SELECT create_distributed_table('null_dist_key_table_2', null, colocate_with=>'none');
SELECT * FROM null_dist_key_table_2 ORDER BY a;
-- show that we do not support inheritance relationships
CREATE TABLE parent_table (a int, b text);
CREATE TABLE child_table () INHERITS (parent_table);
-- both of below should error out
SELECT create_distributed_table('parent_table', null, colocate_with=>'none');
SELECT create_distributed_table('child_table', null, colocate_with=>'none');
-- show that we support policies
BEGIN;
CREATE TABLE null_dist_key_table_3 (table_user text);
ALTER TABLE null_dist_key_table_3 ENABLE ROW LEVEL SECURITY;
CREATE ROLE table_users;
CREATE POLICY table_policy ON null_dist_key_table_3 TO table_users
USING (table_user = current_user);
SELECT create_distributed_table('null_dist_key_table_3', null, colocate_with=>'none');
ROLLBACK;
-- drop them for next tests
DROP TABLE null_dist_key_table_1, null_dist_key_table_2, distributed_table;
-- tests for object names that should be escaped properly
CREATE SCHEMA "NULL_!_dist_key";
CREATE TABLE "NULL_!_dist_key"."my_TABLE.1!?!"(id int, "Second_Id" int);
SELECT create_distributed_table('"NULL_!_dist_key"."my_TABLE.1!?!"', null, colocate_with=>'none');
-- drop the table before creating it when the search path is set
SET search_path to "NULL_!_dist_key" ;
DROP TABLE "my_TABLE.1!?!";
CREATE TYPE int_jsonb_type AS (key int, value jsonb);
CREATE DOMAIN age_with_default AS int CHECK (value >= 0) DEFAULT 0;
CREATE TYPE yes_no_enum AS ENUM ('yes', 'no');
CREATE EXTENSION btree_gist;
CREATE SEQUENCE my_seq_1 START WITH 10;
CREATE TABLE "Table?!.1Table"(
id int PRIMARY KEY,
"Second_Id" int,
"local_Type" int_jsonb_type,
"jsondata" jsonb NOT NULL,
name text,
price numeric CHECK (price > 0),
age_with_default_col age_with_default,
yes_no_enum_col yes_no_enum,
seq_col_1 bigserial,
seq_col_2 int DEFAULT nextval('my_seq_1'),
generated_column int GENERATED ALWAYS AS (seq_col_1 * seq_col_2 + 4) STORED,
UNIQUE (id, price),
EXCLUDE USING GIST (name WITH =));
-- create some objects before create_distributed_table
CREATE INDEX "my!Index1" ON "Table?!.1Table"(id) WITH ( fillfactor = 80 ) WHERE id > 10;
CREATE INDEX text_index ON "Table?!.1Table"(name);
CREATE UNIQUE INDEX uniqueIndex ON "Table?!.1Table" (id);
CREATE STATISTICS stats_1 ON id, price FROM "Table?!.1Table";
CREATE TEXT SEARCH CONFIGURATION text_search_cfg (parser = default);
CREATE INDEX text_search_index ON "Table?!.1Table"
USING gin (to_tsvector('text_search_cfg'::regconfig, (COALESCE(name, ''::character varying))::text));
-- ingest some data before create_distributed_table
INSERT INTO "Table?!.1Table" VALUES (1, 1, (1, row_to_json(row(1,1)))::int_jsonb_type, row_to_json(row(1,1), true)),
(2, 1, (2, row_to_json(row(2,2)))::int_jsonb_type, row_to_json(row(2,2), 'false'));
-- create a replica identity before create_distributed_table
ALTER TABLE "Table?!.1Table" REPLICA IDENTITY USING INDEX uniqueIndex;
SELECT create_distributed_table('"Table?!.1Table"', null, colocate_with=>'none');
INSERT INTO "Table?!.1Table" VALUES (10, 15, (150, row_to_json(row(4,8)))::int_jsonb_type, '{}', 'text_1', 10, 27, 'yes', 60, 70);
INSERT INTO "Table?!.1Table" VALUES (5, 5, (5, row_to_json(row(5,5)))::int_jsonb_type, row_to_json(row(5,5), true));
-- tuples that are supposed to violate different data type / check constraints
INSERT INTO "Table?!.1Table"(id, jsondata, name) VALUES (101, '{"a": 1}', 'text_1');
INSERT INTO "Table?!.1Table"(id, jsondata, price) VALUES (101, '{"a": 1}', -1);
INSERT INTO "Table?!.1Table"(id, jsondata, age_with_default_col) VALUES (101, '{"a": 1}', -1);
INSERT INTO "Table?!.1Table"(id, jsondata, yes_no_enum_col) VALUES (101, '{"a": 1}', 'what?');
SELECT * FROM "Table?!.1Table" ORDER BY id;
SET search_path TO create_null_dist_key;
-- create a partitioned table with some columns that
-- are going to be dropped within the tests
CREATE TABLE sensors(
col_to_drop_1 text,
measureid integer,
eventdatetime date,
measure_data jsonb,
PRIMARY KEY (measureid, eventdatetime, measure_data))
PARTITION BY RANGE(eventdatetime);
-- drop column even before attaching any partitions
ALTER TABLE sensors DROP COLUMN col_to_drop_1;
CREATE TABLE sensors_2000 PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2001-01-01');
-- cannot distribute child table without distributing the parent
SELECT create_distributed_table('sensors_2000', NULL, distribution_type=>null);
SELECT create_distributed_table('sensors', NULL, distribution_type=>null);
CREATE TABLE multi_level_partitioning_parent(
measureid integer,
eventdatetime date,
measure_data jsonb)
PARTITION BY RANGE(eventdatetime);
CREATE TABLE multi_level_partitioning_level_1(
measureid integer,
eventdatetime date,
measure_data jsonb)
PARTITION BY RANGE(eventdatetime);
ALTER TABLE multi_level_partitioning_parent ATTACH PARTITION multi_level_partitioning_level_1
FOR VALUES FROM ('2000-01-01') TO ('2001-01-01');
CREATE TABLE multi_level_partitioning_level_2 PARTITION OF multi_level_partitioning_level_1
FOR VALUES FROM ('2000-01-01') TO ('2000-06-06');
-- multi-level partitioning is not supported
SELECT create_distributed_table('multi_level_partitioning_parent', NULL, distribution_type=>null);
CREATE FUNCTION normalize_generate_always_as_error(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'cannot insert into column %' OR
SQLERRM LIKE 'cannot insert a non-DEFAULT value into column %'
THEN
RAISE 'cannot insert a non-DEFAULT value into column';
ELSE
RAISE 'unknown error';
END IF;
END;
$$LANGUAGE plpgsql;
CREATE TABLE identity_test (
a int GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 100 INCREMENT BY 100),
c bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 1000 INCREMENT BY 1000)
);
SELECT create_distributed_table('identity_test', NULL, distribution_type=>null);
DROP TABLE identity_test;
-- Above failed because we don't support using a data type other than BIGINT
-- for identity columns, so drop the table and create a new one with BIGINT
-- identity columns.
CREATE TABLE identity_test (
a bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 10 INCREMENT BY 10),
b bigint GENERATED ALWAYS AS IDENTITY (START WITH 100 INCREMENT BY 100),
c bigint GENERATED BY DEFAULT AS IDENTITY (START WITH 1000 INCREMENT BY 1000)
);
SELECT create_distributed_table('identity_test', NULL, distribution_type=>null);
INSERT INTO identity_test (a) VALUES (5);
SELECT normalize_generate_always_as_error($$INSERT INTO identity_test (b) VALUES (5)$$); -- fails due to missing OVERRIDING SYSTEM VALUE
INSERT INTO identity_test (b) OVERRIDING SYSTEM VALUE VALUES (5);
INSERT INTO identity_test (c) VALUES (5);
SELECT result, success FROM run_command_on_workers($$
INSERT INTO create_null_dist_key.identity_test (a) VALUES (6)
$$);
SELECT result, success FROM run_command_on_workers($$
SELECT create_null_dist_key.normalize_generate_always_as_error('INSERT INTO create_null_dist_key.identity_test (b) VALUES (1)')
$$);
-- This should fail due to missing OVERRIDING SYSTEM VALUE.
SELECT result, success FROM run_command_on_workers($$
SELECT create_null_dist_key.normalize_generate_always_as_error('INSERT INTO create_null_dist_key.identity_test (a, b) VALUES (1, 1)')
$$);
SELECT result, success FROM run_command_on_workers($$
INSERT INTO create_null_dist_key.identity_test (a, b) OVERRIDING SYSTEM VALUE VALUES (7, 7)
$$);
SELECT result, success FROM run_command_on_workers($$
INSERT INTO create_null_dist_key.identity_test (c, a) OVERRIDING SYSTEM VALUE VALUES (8, 8)
$$);
-- test foreign keys
CREATE TABLE referenced_table(a int UNIQUE, b int);
CREATE TABLE referencing_table(a int, b int,
FOREIGN KEY (a) REFERENCES referenced_table(a));
-- to a colocated null dist key table
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'referenced_table');
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ROLLBACK;
-- to a non-colocated null dist key table
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
ROLLBACK;
-- to a sharded table
BEGIN;
SELECT create_distributed_table('referenced_table', 'a');
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null);
ROLLBACK;
-- to a reference table
BEGIN;
SELECT create_reference_table('referenced_table');
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null);
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ROLLBACK;
-- to a citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('referenced_table', cascade_via_foreign_keys=>true);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null);
ROLLBACK;
-- to a postgres table
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null);
-- from a reference table
BEGIN;
SELECT create_reference_table('referencing_table');
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
ROLLBACK;
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
SELECT create_reference_table('referencing_table');
ROLLBACK;
-- from a sharded table
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
SELECT create_distributed_table('referencing_table', 'a');
ROLLBACK;
-- from a citus local table
BEGIN;
SELECT citus_add_local_table_to_metadata('referencing_table', cascade_via_foreign_keys=>true);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
ROLLBACK;
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
SELECT citus_add_local_table_to_metadata('referencing_table', cascade_via_foreign_keys=>true);
ROLLBACK;
-- from a postgres table (only useful to preserve legacy behavior)
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
ROLLBACK;
-- make sure that we enforce the foreign key constraint when inserting from workers too
SELECT create_reference_table('referenced_table');
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null);
INSERT INTO referenced_table VALUES (1, 1);
-- ok
SELECT result, success FROM run_command_on_workers($$
INSERT INTO create_null_dist_key.referencing_table VALUES (1, 2)
$$);
-- fails
SELECT result, success FROM run_command_on_workers($$
INSERT INTO create_null_dist_key.referencing_table VALUES (2, 2)
$$);
DROP TABLE referencing_table, referenced_table;
CREATE TABLE self_fkey_test(a int UNIQUE, b int,
FOREIGN KEY (b) REFERENCES self_fkey_test(a),
FOREIGN KEY (a) REFERENCES self_fkey_test(a));
SELECT create_distributed_table('self_fkey_test', NULL, distribution_type=>null);
INSERT INTO self_fkey_test VALUES (1, 1); -- ok
INSERT INTO self_fkey_test VALUES (2, 3); -- fails
-- similar foreign key tests but this time create the referencing table later on
-- referencing table is a null shard key table
-- to a colocated null dist key table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'referenced_table');
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ROLLBACK;
BEGIN;
CREATE TABLE referenced_table(a int, b int, UNIQUE(b, a));
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a, b) REFERENCES referenced_table(b, a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'referenced_table');
INSERT INTO referenced_table VALUES (1, 2);
INSERT INTO referencing_table VALUES (2, 1);
-- fails
INSERT INTO referencing_table VALUES (1, 2);
ROLLBACK;
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a) ON UPDATE SET NULL);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'referenced_table');
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
UPDATE referenced_table SET a = 5;
SELECT * FROM referencing_table;
ROLLBACK;
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
CREATE TABLE referencing_table(a serial, b int, FOREIGN KEY (a) REFERENCES referenced_table(a) ON UPDATE SET DEFAULT);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'referenced_table');
ROLLBACK;
-- to a non-colocated null dist key table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
ROLLBACK;
-- to a sharded table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', 'a');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
ROLLBACK;
-- to a reference table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_reference_table('referenced_table');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ROLLBACK;
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_reference_table('referenced_table');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a) ON DELETE CASCADE);
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
INSERT INTO referenced_table VALUES (1, 1);
INSERT INTO referencing_table VALUES (1, 2);
DELETE FROM referenced_table CASCADE;
SELECT * FROM referencing_table;
ROLLBACK;
-- to a citus local table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT citus_add_local_table_to_metadata('referenced_table');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
ROLLBACK;
-- to a postgres table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', NULL, distribution_type=>null, colocate_with=>'none');
ROLLBACK;
-- referenced table is a null shard key table
-- from a sharded table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null, colocate_with=>'none');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_distributed_table('referencing_table', 'a');
ROLLBACK;
-- from a reference table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null, colocate_with=>'none');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT create_reference_table('referencing_table');
ROLLBACK;
-- from a citus local table
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null, colocate_with=>'none');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
SELECT citus_add_local_table_to_metadata('referencing_table', cascade_via_foreign_keys=>true);
ROLLBACK;
-- from a postgres table (only useful to preserve legacy behavior)
BEGIN;
CREATE TABLE referenced_table(a int UNIQUE, b int);
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null, colocate_with=>'none');
CREATE TABLE referencing_table(a int, b int, FOREIGN KEY (a) REFERENCES referenced_table(a));
ROLLBACK;
-- Test whether we switch to sequential execution to enforce foreign
-- key restrictions.
CREATE TABLE referenced_table(id int PRIMARY KEY, value_1 int);
SELECT create_reference_table('referenced_table');
CREATE TABLE referencing_table(id int PRIMARY KEY, value_1 int, CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referenced_table(id) ON UPDATE CASCADE);
SELECT create_distributed_table('referencing_table', null, colocate_with=>'none', distribution_type=>null);
SET client_min_messages TO DEBUG1;
BEGIN;
-- Switches to sequential execution because referenced_table is a reference table
-- and referenced by a null-shard-key distributed table.
--
-- Given that we cannot do parallel access on null-shard-key, this is not useful.
-- However, this is already what we're doing for, e.g., a foreign key from a
-- reference table to another reference table.
TRUNCATE referenced_table CASCADE;
SELECT COUNT(*) FROM referencing_table;
COMMIT;
BEGIN;
SELECT COUNT(*) FROM referencing_table;
-- Doesn't fail because the SELECT didn't perform parallel execution.
TRUNCATE referenced_table CASCADE;
COMMIT;
BEGIN;
UPDATE referencing_table SET value_1 = 15;
-- Doesn't fail because the UPDATE didn't perform parallel execution.
TRUNCATE referenced_table CASCADE;
COMMIT;
BEGIN;
SELECT COUNT(*) FROM referenced_table;
-- doesn't switch to sequential execution
ALTER TABLE referencing_table ADD COLUMN X INT;
ROLLBACK;
BEGIN;
-- Switches to sequential execution because referenced_table is a reference table
-- and referenced by a null-shard-key distributed table.
--
-- Given that we cannot do parallel access on null-shard-key, this is not useful.
-- However, this is already what we're doing for, e.g., a foreign key from a
-- reference table to another reference table.
UPDATE referenced_table SET id = 101 WHERE id = 99;
UPDATE referencing_table SET value_1 = 15;
ROLLBACK;
BEGIN;
UPDATE referencing_table SET value_1 = 15;
-- Doesn't fail because prior UPDATE didn't perform parallel execution.
UPDATE referenced_table SET id = 101 WHERE id = 99;
ROLLBACK;
SET client_min_messages TO WARNING;
DROP TABLE referenced_table, referencing_table;
-- Test whether we unnecessarily switch to sequential execution
-- when the referenced relation is a null-shard-key table.
CREATE TABLE referenced_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('referenced_table', null, colocate_with=>'none', distribution_type=>null);
CREATE TABLE referencing_table(id int PRIMARY KEY, value_1 int, CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES referenced_table(id) ON UPDATE CASCADE);
SELECT create_distributed_table('referencing_table', null, colocate_with=>'referenced_table', distribution_type=>null);
SET client_min_messages TO DEBUG1;
BEGIN;
SELECT COUNT(*) FROM referenced_table;
-- Doesn't switch to sequential execution because the referenced_table is
-- a null-shard-key distributed table.
ALTER TABLE referencing_table ADD COLUMN X INT;
ROLLBACK;
BEGIN;
-- Doesn't switch to sequential execution because the referenced_table is
-- a null-shard-key distributed table.
TRUNCATE referenced_table CASCADE;
SELECT COUNT(*) FROM referencing_table;
COMMIT;
SET client_min_messages TO WARNING;
CREATE FUNCTION increment_value() RETURNS trigger AS $increment_value$
BEGIN
NEW.value := NEW.value+1;
RETURN NEW;
END;
$increment_value$ LANGUAGE plpgsql;
CREATE TABLE trigger_table_1 (value int);
CREATE TRIGGER trigger_1
BEFORE INSERT ON trigger_table_1
FOR EACH ROW EXECUTE FUNCTION increment_value();
SELECT create_distributed_table('trigger_table_1', NULL, distribution_type=>null);
SET citus.enable_unsafe_triggers TO ON;
SELECT create_distributed_table('trigger_table_1', NULL, distribution_type=>null);
INSERT INTO trigger_table_1 VALUES(1), (2);
SELECT * FROM trigger_table_1 ORDER BY value;
CREATE FUNCTION insert_some() RETURNS trigger AS $insert_some$
BEGIN
RAISE NOTICE 'inserted some rows';
RETURN NEW;
END;
$insert_some$ LANGUAGE plpgsql;
CREATE TABLE trigger_table_2 (value int);
CREATE TRIGGER trigger_2
AFTER INSERT ON trigger_table_2
FOR EACH STATEMENT EXECUTE FUNCTION insert_some();
ALTER TABLE trigger_table_2 DISABLE TRIGGER trigger_2;
SELECT create_distributed_table('trigger_table_2', NULL, distribution_type=>null);
SET client_min_messages TO NOTICE;
INSERT INTO trigger_table_2 VALUES(3), (4);
SET client_min_messages TO WARNING;
SELECT * FROM trigger_table_2 ORDER BY value;
CREATE FUNCTION combine_old_new_val() RETURNS trigger AS $combine_old_new_val$
BEGIN
NEW.value = NEW.value * 10 + OLD.value;
RETURN NEW;
END;
$combine_old_new_val$ LANGUAGE plpgsql;
CREATE FUNCTION notice_truncate() RETURNS trigger AS $notice_truncate$
BEGIN
RAISE NOTICE 'notice_truncate()';
RETURN NEW;
END;
$notice_truncate$ LANGUAGE plpgsql;
CREATE TABLE trigger_table_3 (value int);
CREATE TRIGGER trigger_3
BEFORE UPDATE ON trigger_table_3
FOR EACH ROW EXECUTE FUNCTION combine_old_new_val();
CREATE TRIGGER trigger_4
AFTER TRUNCATE ON trigger_table_3
FOR EACH STATEMENT EXECUTE FUNCTION notice_truncate();
INSERT INTO trigger_table_3 VALUES(3), (4);
SELECT create_distributed_table('trigger_table_3', NULL, distribution_type=>null);
UPDATE trigger_table_3 SET value = 5;
SELECT * FROM trigger_table_3 ORDER BY value;
SET client_min_messages TO NOTICE;
TRUNCATE trigger_table_3;
SET client_min_messages TO WARNING;
-- try a few simple queries at least to make sure that we don't crash
BEGIN;
INSERT INTO nullkey_c1_t1 SELECT * FROM nullkey_c2_t1;
ROLLBACK;
-- cleanup at exit
SET client_min_messages TO ERROR;
DROP SCHEMA create_null_dist_key, "NULL_!_dist_key" CASCADE;

View File

@ -904,6 +904,20 @@ SELECT create_distributed_table('test','x');
DROP TABLE test;
TRUNCATE pg_dist_node;
-- confirm that we can create a null shard key table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', null, colocate_with=>'none', distribution_type=>null);
-- and make sure that we can't remove the coordinator due to "test"
SELECT citus_remove_node('localhost', :master_port);
DROP TABLE test;
-- and now we should be able to remove the coordinator
SELECT citus_remove_node('localhost', :master_port);
-- confirm that we can create a reference table on an empty node
CREATE TABLE test (x int, y int);
INSERT INTO test VALUES (1,2);

View File

@ -63,8 +63,43 @@ ALTER SYSTEM RESET citus.local_shared_pool_size;
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
SELECT pg_reload_conf();
CREATE TABLE single_node_nullkey_c1(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c1', null, colocate_with=>'none', distribution_type=>null);
CREATE TABLE single_node_nullkey_c2(a int, b int);
SELECT create_distributed_table('single_node_nullkey_c2', null, colocate_with=>'none', distribution_type=>null);
-- created on different colocation groups ..
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
-- .. but both are associated to coordinator
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass
);
SELECT groupid = 0 FROM pg_dist_placement
WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE logicalrelid = 'single_node.single_node_nullkey_c2'::regclass
);
-- try creating a null-shard-key distributed table from a shard relation
SELECT shardid AS round_robin_test_c1_shard_id FROM pg_dist_shard WHERE logicalrelid = 'single_node.single_node_nullkey_c1'::regclass \gset
SELECT create_distributed_table('single_node_nullkey_c1_' || :round_robin_test_c1_shard_id , null, colocate_with=>'none', distribution_type=>null);
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
RESET client_min_messages;
-- so that we don't have to update rest of the test output