Merge remote-tracking branch 'upstream/main' into issue/6694

issue/6694
Gokhan Gulbiz 2023-03-09 11:37:18 +03:00
commit b649df41c5
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
36 changed files with 1022 additions and 390 deletions

View File

@ -1352,8 +1352,7 @@ CreateCitusTableLike(TableConversionState *con)
}
else if (IsCitusTableType(con->relationId, REFERENCE_TABLE))
{
CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false,
NULL);
CreateReferenceTable(con->newRelationId);
}
else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE))
{

View File

@ -107,11 +107,17 @@ static void CreateDistributedTableConcurrently(Oid relationId,
char *colocateWithTableName,
int shardCount,
bool shardCountIsStrict);
static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName);
static char DecideDistTableReplicationModel(char distributionMethod,
char *colocateWithTableName);
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateCitusTable(Oid relationId, char *distributionColumnName,
char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName,
char replicationModel);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
@ -380,8 +386,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
EnsureForeignKeysForDistributedTableConcurrently(relationId);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);
/*
* we fail transaction before local table conversion if the table could not be colocated with
@ -627,8 +633,8 @@ static void
EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
char *distributionColumnName, char *colocateWithTableName)
{
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);
/*
* we fail transaction before local table conversion if the table could not be colocated with
@ -865,9 +871,6 @@ create_reference_table(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
char *colocateWithTableName = NULL;
char *distributionColumnName = NULL;
EnsureCitusTableCanBeCreated(relationId);
/* enable create_reference_table on an empty node */
@ -900,8 +903,7 @@ create_reference_table(PG_FUNCTION_ARGS)
errdetail("There are no active worker nodes.")));
}
CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE,
ShardCount, false, colocateWithTableName);
CreateReferenceTable(relationId);
PG_RETURN_VOID();
}
@ -956,17 +958,61 @@ EnsureRelationExists(Oid relationId)
/*
* CreateDistributedTable creates distributed table in the given configuration.
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
* distributed table.
*/
void
CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName)
{
Assert(distributionMethod != DISTRIBUTE_BY_NONE);
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName);
CreateCitusTable(relationId, distributionColumnName,
distributionMethod, shardCount,
shardCountIsStrict, colocateWithTableName,
replicationModel);
}
/*
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
* reference table.
*/
void
CreateReferenceTable(Oid relationId)
{
char *distributionColumnName = NULL;
char distributionMethod = DISTRIBUTE_BY_NONE;
int shardCount = 1;
bool shardCountIsStrict = true;
char *colocateWithTableName = NULL;
char replicationModel = REPLICATION_MODEL_2PC;
CreateCitusTable(relationId, distributionColumnName,
distributionMethod, shardCount,
shardCountIsStrict, colocateWithTableName,
replicationModel);
}
/*
* CreateCitusTable is the internal method that creates a Citus table in
* given configuration.
*
* This functions contains all necessary logic to create distributed tables. It
* performs necessary checks to ensure distributing the table is safe. If it is
* safe to distribute the table, this function creates distributed table metadata,
* creates shards and copies local data to shards. This function also handles
* partitioned tables by distributing its partitions as well.
*/
void
CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName)
static void
CreateCitusTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName,
char replicationModel)
{
ErrorIfTableHasUnsupportedIdentityColumn(relationId);
@ -1029,9 +1075,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
PropagatePrerequisiteObjectsForDistributedTable(relationId);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
NoLock);
@ -1429,18 +1472,16 @@ DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
/*
* DecideReplicationModel function decides which replication model should be
* used depending on given distribution configuration.
* DecideDistTableReplicationModel function decides which replication model should be
* used for a distributed table depending on given distribution configuration.
*/
static char
DecideReplicationModel(char distributionMethod, char *colocateWithTableName)
DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName)
{
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
return REPLICATION_MODEL_2PC;
}
else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
!IsColocateWithNone(colocateWithTableName))
Assert(distributionMethod != DISTRIBUTE_BY_NONE);
if (!IsColocateWithDefault(colocateWithTableName) &&
!IsColocateWithNone(colocateWithTableName))
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);

View File

@ -985,7 +985,7 @@ AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval)
appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId);
appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName);
appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName);
appendStringInfo(selectQuery, PG_TOTAL_RELATION_SIZE_FUNCTION, quotedShardName);
}

View File

@ -1918,6 +1918,10 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode)
{
int32 groupId = workerNode->groupId;
List *shardPlacements = AllShardPlacementsOnNodeGroup(groupId);
/* sort the list to prevent regression tests getting flaky */
shardPlacements = SortList(shardPlacements, CompareGroupShardPlacements);
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacements)
{

View File

@ -475,6 +475,7 @@ GetRebalanceSteps(RebalanceOptions *options)
/* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers();
List *activeShardPlacementListList = NIL;
List *unbalancedShards = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
@ -490,8 +491,29 @@ GetRebalanceSteps(RebalanceOptions *options)
shardPlacementList, options->workerNode);
}
activeShardPlacementListList =
lappend(activeShardPlacementListList, activeShardPlacementListForRelation);
if (list_length(activeShardPlacementListForRelation) >= list_length(
activeWorkerList))
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
activeShardPlacementListForRelation);
}
else
{
/*
* If the number of shard groups are less than the number of worker nodes,
* at least one of the worker nodes will remain empty. For such cases,
* we consider those shard groups as a colocation group and try to
* distribute them across the cluster.
*/
unbalancedShards = list_concat(unbalancedShards,
activeShardPlacementListForRelation);
}
}
if (list_length(unbalancedShards) > 0)
{
activeShardPlacementListList = lappend(activeShardPlacementListList,
unbalancedShards);
}
if (options->threshold < options->rebalanceStrategy->minimumThreshold)

View File

@ -53,8 +53,14 @@ worker_copy_table_to_node(PG_FUNCTION_ARGS)
targetNodeId);
StringInfo selectShardQueryForCopy = makeStringInfo();
/*
* Even though we do COPY(SELECT ...) all the columns, we can't just do SELECT * because we need to not COPY generated colums.
*/
const char *columnList = CopyableColumnNamesFromRelationName(relationSchemaName,
relationName);
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", relationQualifiedName);
"SELECT %s FROM %s;", columnList, relationQualifiedName);
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,

View File

@ -73,7 +73,7 @@ static void ShardCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool CanUseLocalCopy(uint32_t destinationNodeId);
static StringInfo ConstructShardCopyStatement(List *destinationShardFullyQualifiedName,
bool
useBinaryFormat);
useBinaryFormat, TupleDesc tupleDesc);
static void WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
@ -105,7 +105,8 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
copyDest->copyOutState->binary,
copyDest->tupleDescriptor);
if (!SendRemoteCommand(copyDest->connection, copyStatement->data))
{
@ -344,21 +345,80 @@ ShardCopyDestReceiverDestroy(DestReceiver *dest)
}
/*
* CopyableColumnNamesFromTupleDesc function creates and returns a comma seperated column names string to be used in COPY
* and SELECT statements when copying a table. The COPY and SELECT statements should filter out the GENERATED columns since COPY
* statement fails to handle them. Iterating over the attributes of the table we also need to skip the dropped columns.
*/
const char *
CopyableColumnNamesFromTupleDesc(TupleDesc tupDesc)
{
StringInfo columnList = makeStringInfo();
bool firstInList = true;
for (int i = 0; i < tupDesc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
if (att->attgenerated || att->attisdropped)
{
continue;
}
if (!firstInList)
{
appendStringInfo(columnList, ",");
}
firstInList = false;
appendStringInfo(columnList, "%s", quote_identifier(NameStr(att->attname)));
}
return columnList->data;
}
/*
* CopyableColumnNamesFromRelationName function is a wrapper for CopyableColumnNamesFromTupleDesc.
*/
const char *
CopyableColumnNamesFromRelationName(const char *schemaName, const char *relationName)
{
Oid namespaceOid = get_namespace_oid(schemaName, true);
Oid relationId = get_relname_relid(relationName, namespaceOid);
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDesc = RelationGetDescr(relation);
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
relation_close(relation, NoLock);
return columnList;
}
/*
* ConstructShardCopyStatement constructs the text of a COPY statement
* for copying into a result table
*/
static StringInfo
ConstructShardCopyStatement(List *destinationShardFullyQualifiedName, bool
useBinaryFormat)
useBinaryFormat,
TupleDesc tupleDesc)
{
char *destinationShardSchemaName = linitial(destinationShardFullyQualifiedName);
char *destinationShardRelationName = lsecond(destinationShardFullyQualifiedName);
StringInfo command = makeStringInfo();
appendStringInfo(command, "COPY %s.%s FROM STDIN",
const char *columnList = CopyableColumnNamesFromTupleDesc(tupleDesc);
appendStringInfo(command, "COPY %s.%s (%s) FROM STDIN",
quote_identifier(destinationShardSchemaName), quote_identifier(
destinationShardRelationName));
destinationShardRelationName), columnList);
if (useBinaryFormat)
{

View File

@ -110,8 +110,13 @@ worker_split_copy(PG_FUNCTION_ARGS)
splitCopyInfoList))));
StringInfo selectShardQueryForCopy = makeStringInfo();
const char *columnList = CopyableColumnNamesFromRelationName(
sourceShardToCopySchemaName,
sourceShardToCopyName);
appendStringInfo(selectShardQueryForCopy,
"SELECT * FROM %s;", sourceShardToCopyQualifiedName);
"SELECT %s FROM %s;", columnList,
sourceShardToCopyQualifiedName);
ParamListInfo params = NULL;
ExecuteQueryStringIntoDestReceiver(selectShardQueryForCopy->data, params,

View File

@ -5343,8 +5343,7 @@ ActiveShardPlacementLists(List *taskList)
/*
* CompareShardPlacements compares two shard placements by their tuple oid; this
* oid reflects the tuple's insertion order into pg_dist_placement.
* CompareShardPlacements compares two shard placements by placement id.
*/
int
CompareShardPlacements(const void *leftElement, const void *rightElement)
@ -5370,6 +5369,35 @@ CompareShardPlacements(const void *leftElement, const void *rightElement)
}
/*
* CompareGroupShardPlacements compares two group shard placements by placement id.
*/
int
CompareGroupShardPlacements(const void *leftElement, const void *rightElement)
{
const GroupShardPlacement *leftPlacement =
*((const GroupShardPlacement **) leftElement);
const GroupShardPlacement *rightPlacement =
*((const GroupShardPlacement **) rightElement);
uint64 leftPlacementId = leftPlacement->placementId;
uint64 rightPlacementId = rightPlacement->placementId;
if (leftPlacementId < rightPlacementId)
{
return -1;
}
else if (leftPlacementId > rightPlacementId)
{
return 1;
}
else
{
return 0;
}
}
/*
* LeftRotateList returns a copy of the given list that has been cyclically
* shifted to the left by the given rotation count. For this, the function

View File

@ -147,6 +147,26 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
List *unbalancedShards = NIL;
ListCell *shardPlacementListCell = NULL;
foreach(shardPlacementListCell, shardPlacementListList)
{
List *placementList = (List *) lfirst(shardPlacementListCell);
if (list_length(placementList) < list_length(workerNodeList))
{
unbalancedShards = list_concat(unbalancedShards,
placementList);
shardPlacementListList = foreach_delete_current(shardPlacementListList,
shardPlacementListCell);
}
}
if (list_length(unbalancedShards) > 0)
{
shardPlacementListList = lappend(shardPlacementListList, unbalancedShards);
}
rebalancePlanFunctions.context = &context;
/* sort the lists to make the function more deterministic */

View File

@ -503,45 +503,6 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag)
}
/*
* LockPlacementCleanup takes an exclusive lock to ensure that only one process
* can cleanup placements at the same time.
*/
void
LockPlacementCleanup(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
/* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE.
* This will change as we add support for parallel moves.
*/
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
}
/*
* TryLockPlacementCleanup takes an exclusive lock to ensure that only one
* process can cleanup placements at the same time.
*/
bool
TryLockPlacementCleanup(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
/* Moves acquire lock with a constant operation id CITUS_SHARD_MOVE.
* This will change as we add support for parallel moves.
*/
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_SHARD_MOVE);
bool lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
return lockAcquired;
}
/*
* LockReferencedReferenceShardDistributionMetadata acquires shard distribution
* metadata locks with the given lock mode on the reference tables which has a

View File

@ -325,6 +325,7 @@ extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName);
extern void CreateReferenceTable(Oid relationId);
extern void CreateTruncateTrigger(Oid relationId);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);

View File

@ -553,6 +553,7 @@ extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOpe
/* helper functions */
extern Var * MakeInt4Column(void);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
extern int CompareGroupShardPlacements(const void *leftElement, const void *rightElement);
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);
extern bool ShardIntervalsOverlapWithParams(Datum firstMin, Datum firstMax,

View File

@ -53,8 +53,7 @@ typedef enum CitusOperations
CITUS_NONBLOCKING_SPLIT = 1,
CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2,
CITUS_CREATE_COLOCATION_DEFAULT = 3,
CITUS_SHARD_MOVE = 4,
CITUS_BACKGROUND_TASK_MONITOR = 5
CITUS_BACKGROUND_TASK_MONITOR = 4
} CitusOperations;
/* reuse advisory lock, but with different, unused field 4 (4)*/
@ -165,8 +164,6 @@ enum DistLockConfigs
/* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern void LockPlacementCleanup(void);
extern bool TryLockPlacementCleanup(void);
extern void EnsureShardOwner(uint64 shardId, bool missingOk);
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
extern void BlockWritesToShardList(List *shardList);

View File

@ -19,4 +19,9 @@ extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
List *destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const
char *relationName);
extern const char * CopyableColumnNamesFromTupleDesc(TupleDesc tupdesc);
#endif /* WORKER_SHARD_COPY_H_ */

View File

@ -151,14 +151,24 @@ def copy_test_files_with_names(test_names, sql_dir_path, expected_dir_path, conf
continue
sql_name = os.path.join("./sql", test_name + ".sql")
output_name = os.path.join("./expected", test_name + ".out")
shutil.copy(sql_name, sql_dir_path)
if os.path.isfile(output_name):
# for a test named <t>, all files:
# <t>.out, <t>_0.out, <t>_1.out ...
# are considered as valid outputs for the test
# by the testing tool (pg_regress)
# so copy such files to the testing directory
output_name = os.path.join("./expected", test_name + ".out")
alt_output_version_no = 0
while os.path.isfile(output_name):
# it might be the first time we run this test and the expected file
# might not be there yet, in that case, we don't want to error out
# while copying the file.
shutil.copy(output_name, expected_dir_path)
output_name = os.path.join(
"./expected", f"{test_name}_{alt_output_version_no}.out"
)
alt_output_version_no += 1
def run_tests(configs, sql_schedule_name):

View File

@ -84,9 +84,20 @@ if __name__ == "__main__":
),
"create_role_propagation": TestDeps(None, ["multi_cluster_management"]),
"single_node_enterprise": TestDeps(None),
"single_node": TestDeps(None),
"single_node_truncate": TestDeps(None),
"multi_extension": TestDeps(None, repeatable=False),
"multi_test_helpers": TestDeps(None),
"multi_insert_select": TestDeps("base_schedule"),
"multi_mx_create_table": TestDeps(
None,
[
"multi_test_helpers_superuser",
"multi_mx_node_metadata",
"multi_cluster_management",
"multi_mx_function_table_reference",
],
),
}
if not (test_file_name or test_file_path):

View File

@ -60,7 +60,7 @@ SELECT create_reference_table('reference_table');
(1 row)
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
create_distributed_table
@ -84,8 +84,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------

View File

@ -56,7 +56,7 @@ SELECT create_reference_table('reference_table');
(1 row)
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
create_distributed_table
@ -80,8 +80,9 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- END : Create Foreign key constraints.
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
count
---------------------------------------------------------------------

View File

@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_hash');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_total_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential;
SELECT citus_update_table_statistics('test_table_statistics_append');
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx');
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_total_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx

View File

@ -238,8 +238,40 @@ ORDER BY
LIMIT 1 OFFSET 1;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
-- Check that shards of a table with GENERATED columns can be moved.
\c - - - :master_port
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Check that dropped columns are handled properly in a move.
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
-- Move a shard from worker 1 to worker 2
SELECT
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_table_with_generated_column'::regclass
AND nodeport = :worker_1_port
ORDER BY
shardid
LIMIT 1;
citus_move_shard_placement
---------------------------------------------------------------------
(1 row)
-- Cleanup
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP TABLE mx_table_with_generated_column;
DROP TABLE mx_table_1;
DROP TABLE mx_table_2;
DROP TABLE mx_table_3;

View File

@ -497,22 +497,22 @@ ORDER BY table_name::text;
SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text;
shard_name | table_name | citus_table_type | shard_size
---------------------------------------------------------------------
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220097 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220099 | app_analytics_events_mx | distributed | 0
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220096 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220097 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220098 | app_analytics_events_mx | distributed | 8192
app_analytics_events_mx_1220099 | app_analytics_events_mx | distributed | 8192
articles_hash_mx_1220104 | articles_hash_mx | distributed | 0
articles_hash_mx_1220104 | articles_hash_mx | distributed | 0
articles_hash_mx_1220104 | articles_hash_mx | distributed | 0
@ -608,22 +608,22 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
citus_mx_test_schema.nation_hash_collation_search_path_1220046 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 0
citus_mx_test_schema.nation_hash_collation_search_path_1220046 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 0
citus_mx_test_schema.nation_hash_collation_search_path_1220047 | citus_mx_test_schema.nation_hash_collation_search_path | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220049 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 0
citus_mx_test_schema.nation_hash_composite_types_1220051 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220048 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220049 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220050 | citus_mx_test_schema.nation_hash_composite_types | distributed | 8192
citus_mx_test_schema.nation_hash_composite_types_1220051 | citus_mx_test_schema.nation_hash_composite_types | distributed | 16384
citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0
citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0
citus_mx_test_schema_join_1.nation_hash_1220032 | citus_mx_test_schema_join_1.nation_hash | distributed | 0
@ -696,109 +696,109 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
customer_mx_1220084 | customer_mx | reference | 0
customer_mx_1220084 | customer_mx | reference | 0
customer_mx_1220084 | customer_mx | reference | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
labs_mx_1220102 | labs_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220092 | limit_orders_mx | distributed | 0
limit_orders_mx_1220093 | limit_orders_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220052 | lineitem_mx | distributed | 0
lineitem_mx_1220053 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220054 | lineitem_mx | distributed | 0
lineitem_mx_1220055 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220056 | lineitem_mx | distributed | 0
lineitem_mx_1220057 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220058 | lineitem_mx | distributed | 0
lineitem_mx_1220059 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220060 | lineitem_mx | distributed | 0
lineitem_mx_1220061 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220062 | lineitem_mx | distributed | 0
lineitem_mx_1220063 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220064 | lineitem_mx | distributed | 0
lineitem_mx_1220065 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220066 | lineitem_mx | distributed | 0
lineitem_mx_1220067 | lineitem_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 0
multiple_hash_mx_1220095 | multiple_hash_mx | distributed | 0
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220089 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220090 | mx_ddl_table | distributed | 8192
mx_ddl_table_1220091 | mx_ddl_table | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
labs_mx_1220102 | labs_mx | distributed | 8192
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220092 | limit_orders_mx | distributed | 16384
limit_orders_mx_1220093 | limit_orders_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220052 | lineitem_mx | distributed | 16384
lineitem_mx_1220053 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220054 | lineitem_mx | distributed | 16384
lineitem_mx_1220055 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220056 | lineitem_mx | distributed | 16384
lineitem_mx_1220057 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220058 | lineitem_mx | distributed | 16384
lineitem_mx_1220059 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220060 | lineitem_mx | distributed | 16384
lineitem_mx_1220061 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220062 | lineitem_mx | distributed | 16384
lineitem_mx_1220063 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220064 | lineitem_mx | distributed | 16384
lineitem_mx_1220065 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220066 | lineitem_mx | distributed | 16384
lineitem_mx_1220067 | lineitem_mx | distributed | 16384
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220094 | multiple_hash_mx | distributed | 8192
multiple_hash_mx_1220095 | multiple_hash_mx | distributed | 8192
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220088 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220089 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220090 | mx_ddl_table | distributed | 24576
mx_ddl_table_1220091 | mx_ddl_table | distributed | 24576
nation_hash_1220000 | nation_hash | distributed | 0
nation_hash_1220000 | nation_hash | distributed | 0
nation_hash_1220000 | nation_hash | distributed | 0
@ -871,77 +871,77 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
nation_mx_1220085 | nation_mx | reference | 0
nation_mx_1220085 | nation_mx | reference | 0
nation_mx_1220085 | nation_mx | reference | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220068 | orders_mx | distributed | 0
orders_mx_1220069 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220070 | orders_mx | distributed | 0
orders_mx_1220071 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220072 | orders_mx | distributed | 0
orders_mx_1220073 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220074 | orders_mx | distributed | 0
orders_mx_1220075 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220076 | orders_mx | distributed | 0
orders_mx_1220077 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220078 | orders_mx | distributed | 0
orders_mx_1220079 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220080 | orders_mx | distributed | 0
orders_mx_1220081 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220082 | orders_mx | distributed | 0
orders_mx_1220083 | orders_mx | distributed | 0
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
objects_mx_1220103 | objects_mx | distributed | 16384
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220068 | orders_mx | distributed | 8192
orders_mx_1220069 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220070 | orders_mx | distributed | 8192
orders_mx_1220071 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220072 | orders_mx | distributed | 8192
orders_mx_1220073 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220074 | orders_mx | distributed | 8192
orders_mx_1220075 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220076 | orders_mx | distributed | 8192
orders_mx_1220077 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220078 | orders_mx | distributed | 8192
orders_mx_1220079 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220080 | orders_mx | distributed | 8192
orders_mx_1220081 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220082 | orders_mx | distributed | 8192
orders_mx_1220083 | orders_mx | distributed | 8192
part_mx_1220086 | part_mx | reference | 0
part_mx_1220086 | part_mx | reference | 0
part_mx_1220086 | part_mx | reference | 0
@ -950,14 +950,14 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
part_mx_1220086 | part_mx | reference | 0
part_mx_1220086 | part_mx | reference | 0
part_mx_1220086 | part_mx | reference | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 0
researchers_mx_1220101 | researchers_mx | distributed | 0
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220100 | researchers_mx | distributed | 8192
researchers_mx_1220101 | researchers_mx | distributed | 8192
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0
supplier_mx_1220087 | supplier_mx | reference | 0

View File

@ -2626,6 +2626,94 @@ RESET citus.shard_count;
DROP VIEW table_placements_per_node;
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
-- add colocation groups with shard group count < worker count
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
create table single_shard_colocation_1a (a int primary key);
create table single_shard_colocation_1b (a int primary key);
create table single_shard_colocation_1c (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b');
create_distributed_table
---------------------------------------------------------------------
(1 row)
create table single_shard_colocation_2a (a bigint);
create table single_shard_colocation_2b (a bigint);
select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1);
create_distributed_table
---------------------------------------------------------------------
(1 row)
select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- all shards are placed on the first worker node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
logicalrelid | nodeport
---------------------------------------------------------------------
single_shard_colocation_1a | 57637
single_shard_colocation_1b | 57637
single_shard_colocation_1c | 57637
single_shard_colocation_2a | 57637
single_shard_colocation_2b | 57637
(5 rows)
-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
select rebalance_table_shards();
NOTICE: Moving shard xxxxx from localhost:xxxxx to localhost:xxxxx ...
rebalance_table_shards
---------------------------------------------------------------------
(1 row)
-- verify some shards are moved to the new node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
logicalrelid | nodeport
---------------------------------------------------------------------
single_shard_colocation_1a | 57638
single_shard_colocation_1b | 57638
single_shard_colocation_1c | 57638
single_shard_colocation_2a | 57637
single_shard_colocation_2b | 57637
(5 rows)
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;

View File

@ -742,3 +742,75 @@ HINT: If you do want these moves to happen, try changing improvement_threshold
{"updatetype":1,"shardid":2,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows)
-- Test single shard colocation groups
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
(2 rows)
-- Test colocation groups with shard count < worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
(3 rows)
-- Test colocation groups with shard count < worker count
-- mixed with a colocation group shard_count > worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}',
'{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}',
'{"shardid":8, "cost":50, "nodename":"b"}',
'{"shardid":9, "cost":50, "nodename":"b"}',
'{"shardid":10, "cost":50, "nodename":"b"}',
'{"shardid":11, "cost":50, "nodename":"b"}',
'{"shardid":12, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));
unnest
---------------------------------------------------------------------
{"updatetype":1,"shardid":7,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":8,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":9,"sourcename":"b","sourceport":5432,"targetname":"a","targetport":5432}
{"updatetype":1,"shardid":10,"sourcename":"b","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":4,"sourcename":"a","sourceport":5432,"targetname":"b","targetport":5432}
{"updatetype":1,"shardid":5,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
{"updatetype":1,"shardid":1,"sourcename":"a","sourceport":5432,"targetname":"c","targetport":5432}
(7 rows)

View File

@ -1829,6 +1829,7 @@ SELECT pg_sleep(0.1);
-- backend(s) that execute on the shards will be terminated
-- so show that there no internal backends
SET search_path TO single_node;
SET citus.next_shard_id TO 90730500;
SELECT count(*) from should_commit;
count
---------------------------------------------------------------------
@ -1882,6 +1883,7 @@ BEGIN;
ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
SET citus.next_shard_id TO 90830500;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
SET citus.force_max_query_parallelization TO false;
@ -2106,10 +2108,10 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_disabling_drop_and_truncate (a int);
SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
@ -2117,24 +2119,24 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
SET citus.enable_manual_changes_to_shards TO off;
-- these should error out
DROP TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
DROP TABLE test_disabling_drop_and_truncate_90830500;
ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040;
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate;
-- test creating distributed or reference tables from shards
CREATE TABLE test_creating_distributed_relation_table_from_shard (a int);
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
@ -2142,11 +2144,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
-- these should error because shards cannot be used to:
-- create distributed table
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a');
ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation
-- create reference table
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504');
ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation
RESET citus.shard_replication_factor;
DROP TABLE test_creating_distributed_relation_table_from_shard;
-- lets flush the copy often to make sure everyhing is fine

View File

@ -1829,6 +1829,7 @@ SELECT pg_sleep(0.1);
-- backend(s) that execute on the shards will be terminated
-- so show that there no internal backends
SET search_path TO single_node;
SET citus.next_shard_id TO 90730500;
SELECT count(*) from should_commit;
count
---------------------------------------------------------------------
@ -1882,6 +1883,7 @@ BEGIN;
ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
SET citus.next_shard_id TO 90830500;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
SET citus.force_max_query_parallelization TO false;
@ -2106,10 +2108,10 @@ NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS co
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_disabling_drop_and_truncate (a int);
SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102040, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102041, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102042, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102043, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830500, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830501, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830502, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'CREATE TABLE single_node.test_disabling_drop_and_truncate (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830503, 'single_node', 'ALTER TABLE single_node.test_disabling_drop_and_truncate OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
@ -2117,24 +2119,24 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
SET citus.enable_manual_changes_to_shards TO off;
-- these should error out
DROP TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
DROP TABLE test_disabling_drop_and_truncate_90830500;
ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
ERROR: cannot modify "test_disabling_drop_and_truncate_102040" because it is a shard of a distributed table
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
ERROR: cannot modify "test_disabling_drop_and_truncate_90830500" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040;
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate;
-- test creating distributed or reference tables from shards
CREATE TABLE test_creating_distributed_relation_table_from_shard (a int);
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830504, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830505, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830506, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) USING heap');SELECT worker_apply_shard_ddl_command (90830507, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres')
create_distributed_table
---------------------------------------------------------------------
@ -2142,11 +2144,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
-- these should error because shards cannot be used to:
-- create distributed table
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a');
ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation
-- create reference table
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044');
ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504');
ERROR: relation "test_creating_distributed_relation_table_from_shard_90830504" is a shard relation
RESET citus.shard_replication_factor;
DROP TABLE test_creating_distributed_relation_table_from_shard;
-- lets flush the copy often to make sure everyhing is fine

View File

@ -1,6 +1,7 @@
CREATE SCHEMA single_node_truncate;
SET search_path TO single_node_truncate;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 91630500;
-- helper view that prints out local table names and sizes in the schema
CREATE VIEW table_sizes AS
SELECT
@ -28,12 +29,12 @@ CREATE TABLE citus_local(id int, ref_id int REFERENCES ref(id));
INSERT INTO citus_local SELECT x,x FROM generate_series(1,10000) x;
-- verify that shell tables for citus local tables are empty
SELECT * FROM table_sizes;
name | has_data
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102049 | t
ref | t
ref_102048 | t
citus_local | f
citus_local_91630501 | t
ref | t
ref_91630500 | t
(4 rows)
-- verify that this UDF is noop on Citus local tables
@ -44,12 +45,12 @@ SELECT truncate_local_data_after_distributing_table('citus_local');
(1 row)
SELECT * FROM table_sizes;
name | has_data
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102049 | t
ref | t
ref_102048 | t
citus_local | f
citus_local_91630501 | t
ref | t
ref_91630500 | t
(4 rows)
-- test that we allow cascading truncates to citus local tables
@ -62,12 +63,12 @@ NOTICE: truncate cascades to table "citus_local"
(1 row)
SELECT * FROM table_sizes;
name | has_data
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102049 | t
ref | f
ref_102048 | t
citus_local | f
citus_local_91630501 | t
ref | f
ref_91630500 | t
(4 rows)
ROLLBACK;
@ -95,17 +96,17 @@ NOTICE: truncate cascades to table "dist"
(1 row)
SELECT * FROM table_sizes;
name | has_data
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102049 | t
dist | f
dist_102051 | t
dist_102052 | t
dist_102053 | t
dist_102054 | t
ref | f
ref_102048 | t
citus_local | f
citus_local_91630501 | t
dist | f
dist_91630503 | t
dist_91630504 | t
dist_91630505 | t
dist_91630506 | t
ref | f
ref_91630500 | t
(9 rows)
ROLLBACK;
@ -118,17 +119,17 @@ SELECT truncate_local_data_after_distributing_table('dist');
(1 row)
SELECT * FROM table_sizes;
name | has_data
name | has_data
---------------------------------------------------------------------
citus_local | f
citus_local_102049 | t
dist | f
dist_102051 | t
dist_102052 | t
dist_102053 | t
dist_102054 | t
ref | t
ref_102048 | t
citus_local | f
citus_local_91630501 | t
dist | f
dist_91630503 | t
dist_91630504 | t
dist_91630505 | t
dist_91630506 | t
ref | t
ref_91630500 | t
(9 rows)
ROLLBACK;

View File

@ -142,8 +142,90 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
(1 row)
-- END: List updated row count for local targets shard.
-- Check that GENERATED columns are handled properly in a shard split operation.
\c - - - :master_port
SET search_path TO worker_split_copy_test;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 81080000;
-- BEGIN: Create distributed table and insert data.
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Check that dropped columns are filtered out in COPY command.
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
-- END: Create distributed table and insert data.
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
\c - - - :worker_1_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
\c - - - :worker_2_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
-- BEGIN: List row count for source shard and targets shard in Worker1.
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
count
---------------------------------------------------------------------
510
(1 row)
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
count
---------------------------------------------------------------------
0
(1 row)
-- BEGIN: List row count for target shard in Worker2.
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_1_port
SELECT * from worker_split_copy(
81080000, -- source shard id to copy
'id',
ARRAY[
-- split copy info for split children 1
ROW(81080015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::pg_catalog.split_copy_info,
-- split copy info for split children 2
ROW(81080016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_2_node)::pg_catalog.split_copy_info
]
);
worker_split_copy
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
count
---------------------------------------------------------------------
247
(1 row)
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
count
---------------------------------------------------------------------
263
(1 row)
-- BEGIN: CLEANUP.
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP SCHEMA worker_split_copy_test CASCADE;
-- END: CLEANUP.

View File

@ -53,7 +53,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
@ -70,9 +70,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;

View File

@ -49,7 +49,7 @@ SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none');
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
SELECT create_reference_table('reference_table');
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY, genid integer GENERATED ALWAYS AS ( measureid + 3 ) stored, value varchar(44), col_todrop integer);
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors');
@ -66,9 +66,11 @@ ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) RE
-- BEGIN : Load data into tables.
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
INSERT INTO colocated_dist_table(measureid, value, col_todrop) SELECT i,'Value',i FROM generate_series(0,1000)i;
INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i;
ALTER TABLE colocated_dist_table DROP COLUMN col_todrop;
SELECT COUNT(*) FROM sensors;
SELECT COUNT(*) FROM reference_table;
SELECT COUNT(*) FROM colocated_dist_table;

View File

@ -151,8 +151,34 @@ ORDER BY
shardid
LIMIT 1 OFFSET 1;
-- Check that shards of a table with GENERATED columns can be moved.
\c - - - :master_port
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_table_with_generated_column (a int, b int GENERATED ALWAYS AS ( a + 3 ) STORED, c int);
SELECT create_distributed_table('mx_table_with_generated_column', 'a');
-- Check that dropped columns are handled properly in a move.
ALTER TABLE mx_table_with_generated_column DROP COLUMN c;
-- Move a shard from worker 1 to worker 2
SELECT
citus_move_shard_placement(shardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical')
FROM
pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_table_with_generated_column'::regclass
AND nodeport = :worker_1_port
ORDER BY
shardid
LIMIT 1;
-- Cleanup
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP TABLE mx_table_with_generated_column;
DROP TABLE mx_table_1;
DROP TABLE mx_table_2;
DROP TABLE mx_table_3;

View File

@ -1462,6 +1462,41 @@ DROP VIEW table_placements_per_node;
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='capacity_high_worker_2';
DELETE FROM pg_catalog.pg_dist_rebalance_strategy WHERE name='only_worker_1';
-- add colocation groups with shard group count < worker count
-- the rebalancer should balance those "unbalanced shards" evenly as much as possible
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
create table single_shard_colocation_1a (a int primary key);
create table single_shard_colocation_1b (a int primary key);
create table single_shard_colocation_1c (a int primary key);
SET citus.shard_replication_factor = 1;
select create_distributed_table('single_shard_colocation_1a','a', colocate_with => 'none', shard_count => 1);
select create_distributed_table('single_shard_colocation_1b','a',colocate_with=>'single_shard_colocation_1a');
select create_distributed_table('single_shard_colocation_1c','a',colocate_with=>'single_shard_colocation_1b');
create table single_shard_colocation_2a (a bigint);
create table single_shard_colocation_2b (a bigint);
select create_distributed_table('single_shard_colocation_2a','a', colocate_with => 'none', shard_count => 1);
select create_distributed_table('single_shard_colocation_2b','a',colocate_with=>'single_shard_colocation_2a');
-- all shards are placed on the first worker node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
-- add the second node back, then rebalance
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 16;
select 1 from citus_add_node('localhost', :worker_2_port);
select rebalance_table_shards();
-- verify some shards are moved to the new node
SELECT sh.logicalrelid, pl.nodeport
FROM pg_dist_shard sh JOIN pg_dist_shard_placement pl ON sh.shardid = pl.shardid
WHERE sh.logicalrelid::text IN ('single_shard_colocation_1a', 'single_shard_colocation_1b', 'single_shard_colocation_1c', 'single_shard_colocation_2a', 'single_shard_colocation_2b')
ORDER BY sh.logicalrelid;
DROP TABLE single_shard_colocation_1a, single_shard_colocation_1b, single_shard_colocation_1c, single_shard_colocation_2a, single_shard_colocation_2b CASCADE;
\c - - - :worker_1_port
SET citus.enable_ddl_propagation TO OFF;
REVOKE ALL ON SCHEMA public FROM testrole;

View File

@ -530,3 +530,57 @@ SELECT unnest(shard_placement_rebalance_array(
]::json[],
improvement_threshold := 0.6
));
-- Test single shard colocation groups
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a", "next_colocation": true}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a", "next_colocation": true}'
]::json[],
improvement_threshold := 0.1
));
-- Test colocation groups with shard count < worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}'
]::json[],
improvement_threshold := 0.1
));
-- Test colocation groups with shard count < worker count
-- mixed with a colocation group shard_count > worker count
SELECT unnest(shard_placement_rebalance_array(
ARRAY['{"node_name": "a"}',
'{"node_name": "b"}',
'{"node_name": "c"}']::json[],
ARRAY['{"shardid":1, "cost":20, "nodename":"a"}',
'{"shardid":2, "cost":10, "nodename":"a"}',
'{"shardid":3, "cost":10, "nodename":"a", "next_colocation": true}',
'{"shardid":4, "cost":100, "nodename":"a"}',
'{"shardid":5, "cost":50, "nodename":"a", "next_colocation": true}',
'{"shardid":6, "cost":50, "nodename":"a"}',
'{"shardid":7, "cost":50, "nodename":"b", "next_colocation": true}',
'{"shardid":8, "cost":50, "nodename":"b"}',
'{"shardid":9, "cost":50, "nodename":"b"}',
'{"shardid":10, "cost":50, "nodename":"b"}',
'{"shardid":11, "cost":50, "nodename":"b"}',
'{"shardid":12, "cost":50, "nodename":"b"}'
]::json[],
improvement_threshold := 0.1
));

View File

@ -975,6 +975,7 @@ SELECT pg_sleep(0.1);
-- backend(s) that execute on the shards will be terminated
-- so show that there no internal backends
SET search_path TO single_node;
SET citus.next_shard_id TO 90730500;
SELECT count(*) from should_commit;
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'citus_internal%';
SELECT get_all_active_client_backend_count();
@ -998,6 +999,7 @@ ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
SET citus.next_shard_id TO 90830500;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
@ -1069,14 +1071,14 @@ SELECT create_distributed_table('test_disabling_drop_and_truncate', 'a');
SET citus.enable_manual_changes_to_shards TO off;
-- these should error out
DROP TABLE test_disabling_drop_and_truncate_102040;
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_90830500;
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
RESET citus.enable_manual_changes_to_shards ;
-- these should work as expected
TRUNCATE TABLE test_disabling_drop_and_truncate_102040;
DROP TABLE test_disabling_drop_and_truncate_102040;
TRUNCATE TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate_90830500;
DROP TABLE test_disabling_drop_and_truncate;
@ -1086,10 +1088,10 @@ SELECT create_distributed_table('test_creating_distributed_relation_table_from_s
-- these should error because shards cannot be used to:
-- create distributed table
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a');
SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_90830504', 'a');
-- create reference table
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044');
SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_90830504');
RESET citus.shard_replication_factor;
DROP TABLE test_creating_distributed_relation_table_from_shard;

View File

@ -1,6 +1,7 @@
CREATE SCHEMA single_node_truncate;
SET search_path TO single_node_truncate;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 91630500;
-- helper view that prints out local table names and sizes in the schema
CREATE VIEW table_sizes AS

View File

@ -110,8 +110,66 @@ SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_810700
SELECT COUNT(*) FROM worker_split_copy_test."test !/ \n _""dist_123_table_81070016";
-- END: List updated row count for local targets shard.
-- Check that GENERATED columns are handled properly in a shard split operation.
\c - - - :master_port
SET search_path TO worker_split_copy_test;
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 81080000;
-- BEGIN: Create distributed table and insert data.
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char, col_todrop int);
SELECT create_distributed_table('dist_table_with_generated_col', 'id');
-- Check that dropped columns are filtered out in COPY command.
ALTER TABLE dist_table_with_generated_col DROP COLUMN col_todrop;
INSERT INTO dist_table_with_generated_col (id, value) (SELECT g.id, 'N' FROM generate_series(1, 1000) AS g(id));
-- END: Create distributed table and insert data.
-- BEGIN: Create target shards in Worker1 and Worker2 for a 2-way split copy.
\c - - - :worker_1_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080015(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
\c - - - :worker_2_port
CREATE TABLE worker_split_copy_test.dist_table_with_generated_col_81080016(id int primary key, new_id int GENERATED ALWAYS AS ( id + 3 ) stored, value char);
-- BEGIN: List row count for source shard and targets shard in Worker1.
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080000;
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
-- BEGIN: List row count for target shard in Worker2.
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
\c - - - :worker_1_port
SELECT * from worker_split_copy(
81080000, -- source shard id to copy
'id',
ARRAY[
-- split copy info for split children 1
ROW(81080015, -- destination shard id
-2147483648, -- split range begin
-1073741824, --split range end
:worker_1_node)::pg_catalog.split_copy_info,
-- split copy info for split children 2
ROW(81080016, --destination shard id
-1073741823, --split range begin
-1, --split range end
:worker_2_node)::pg_catalog.split_copy_info
]
);
\c - - - :worker_1_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080015;
\c - - - :worker_2_port
SELECT COUNT(*) FROM worker_split_copy_test.dist_table_with_generated_col_81080016;
-- BEGIN: CLEANUP.
\c - - - :master_port
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources();
DROP SCHEMA worker_split_copy_test CASCADE;
-- END: CLEANUP.