From 7a818b78fb34f1fd9e0e1824bce0528e75d4f0c8 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 13 Dec 2022 14:19:24 +0100 Subject: [PATCH] implement splits when a target colocation has been set --- .../citus_add_local_table_to_metadata.c | 3 +- .../distributed/metadata/metadata_cache.c | 8 ++ .../distributed/metadata/metadata_sync.c | 8 +- .../distributed/operations/create_shards.c | 15 +++- .../distributed/operations/shard_split.c | 81 +++++++++++++++---- .../sql/downgrades/citus--11.3-1--11.2-1.sql | 4 +- src/backend/distributed/utils/shardgroup.c | 68 ++++++++++++++++ .../distributed/coordinator_protocol.h | 1 + src/include/distributed/metadata_cache.h | 1 + src/include/distributed/metadata_utility.h | 4 +- src/include/distributed/relay_utility.h | 1 + src/include/distributed/shardgroup.h | 11 +++ 12 files changed, 176 insertions(+), 29 deletions(-) create mode 100644 src/backend/distributed/utils/shardgroup.c create mode 100644 src/include/distributed/shardgroup.h diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index b94b44cac..5e58a26b2 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1406,6 +1406,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, char replicationModel = REPLICATION_MODEL_STREAMING; uint32 colocationId = INVALID_COLOCATION_ID; + int64 shardgroupId = INVALID_SHARDGROUP_ID; Var *distributionColumn = NULL; InsertIntoPgDistPartition(citusLocalTableId, distributionMethod, distributionColumn, colocationId, @@ -1417,7 +1418,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId, text *shardMinValue = NULL; text *shardMaxValue = NULL; InsertShardRow(citusLocalTableId, shardId, shardStorageType, - shardMinValue, shardMaxValue, NULL); + shardMinValue, shardMaxValue, &shardgroupId); List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError()); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 802010ddd..8fe7ae931 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -2885,6 +2885,14 @@ DistShardgroupPkeyId(void) } +Oid +DistShardgroupColocaionidIndexId(void) +{ + /* TODO add index on colocation id */ + return InvalidOid; +} + + /* return oid of pg_dist_placement_shardid_index */ Oid DistPlacementShardidIndexId(void) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 57c0da666..fadc814aa 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1409,10 +1409,10 @@ ShardgroupInsertCommand(List *shardgroups) appendStringInfo(&command, "(%ld, %d, %s, %s)", shardgroup->shardgroupId, shardgroup->colocationId, - TextToSQLLiteral(IntegerToText( - DatumGetInt32(shardgroup->minShardValue))), - TextToSQLLiteral(IntegerToText( - DatumGetInt32(shardgroup->maxShardValue)))); + shardgroup->minShardValue == NULL ? "NULL" : quote_literal_cstr( + shardgroup->minShardValue), + shardgroup->maxShardValue == NULL ? "NULL" : quote_literal_cstr( + shardgroup->maxShardValue)); } appendStringInfo(&command, ") "); diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index ecccaa59d..818e87860 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -494,14 +494,21 @@ CheckHashPartitionedTable(Oid distributedTableId) } -/* Helper function to convert an integer value to a text type */ -text * -IntegerToText(int32 value) +char * +IntegerToCStr(int32 value) { StringInfo valueString = makeStringInfo(); appendStringInfo(valueString, "%d", value); - text *valueText = cstring_to_text(valueString->data); + return valueString->data; +} + + +/* Helper function to convert an integer value to a text type */ +text * +IntegerToText(int32 value) +{ + text *valueText = cstring_to_text(IntegerToCStr(value)); return valueText; } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 03c0332de..d4f000f4b 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -44,6 +44,7 @@ #include "distributed/shardsplit_logical_replication.h" #include "distributed/deparse_shard_query.h" #include "distributed/shard_rebalancer.h" +#include "distributed/shardgroup.h" #include "postmaster/postmaster.h" /* @@ -129,7 +130,8 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList, DistributionColumnMap *distCols, char distributionMethod, int shardCount, - uint32 colocationId); + uint32 colocationId, + List **shardgroupSplits); static void InsertSplitChildrenShardgroupMetadata(List *newShardgroups); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); @@ -1066,11 +1068,11 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild(); shardgroup->colocationId = colocationId; - shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); - shardgroup->maxShardValue = splitPoint; + shardgroup->minShardValue = IntegerToCStr(currentSplitChildMinValue); + shardgroup->maxShardValue = IntegerToCStr(DatumGetInt32(splitPoint)); /* increment for next shardgroup */ - currentSplitChildMinValue = Int32GetDatum(DatumGetInt32(splitPoint) + 1); + currentSplitChildMinValue = DatumGetInt32(splitPoint) + 1; newShardgroups = lappend(newShardgroups, shardgroup); } @@ -1083,8 +1085,8 @@ CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); shardgroup->shardgroupId = GetNextShardgroupIdForSplitChild(); shardgroup->colocationId = colocationId; - shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); - shardgroup->maxShardValue = splitParentMaxValue; + shardgroup->minShardValue = IntegerToCStr(currentSplitChildMinValue); + shardgroup->maxShardValue = IntegerToCStr(DatumGetInt32(splitParentMaxValue)); newShardgroups = lappend(newShardgroups, shardgroup); return newShardgroups; @@ -1135,9 +1137,12 @@ CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard, splitChildShardInterval->shardId = GetNextShardIdForSplitChild(); splitChildShardInterval->shardGroupId = shardgroup->shardgroupId; splitChildShardInterval->minValueExists = true; - splitChildShardInterval->minValue = shardgroup->minShardValue; + splitChildShardInterval->minValue = + Int32GetDatum(pg_strtoint32(shardgroup->minShardValue)); splitChildShardInterval->maxValueExists = true; - splitChildShardInterval->maxValue = shardgroup->maxShardValue; + splitChildShardInterval->maxValue = + Int32GetDatum(pg_strtoint32(shardgroup->maxShardValue)); + shardSplitChildrenIntervalList = lappend(shardSplitChildrenIntervalList, splitChildShardInterval); @@ -1165,8 +1170,25 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, DistributionColumnMap *distributionColumnMap, char distributionMethod, int shardCount, - uint32 colocationId) + uint32 colocationId, + List **shardgroupSplits) { + if (colocationId != INVALID_COLOCATION_ID) + { + /* + * We know we are colocating the shards with already existing shards in their + * respective shardgroups. To prevent the new groups from being inserted we forget + * them here. + */ + + *shardgroupSplits = NIL; + + /* + * TODO make sure we have either already used the existing shardgroup id's or + * update the newly created shards to have to correct shardgroup id's + */ + } + ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, colocatedShardList) { @@ -1187,6 +1209,13 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, ShardReplicationFactor, distributionColumn->vartype, distributionColumn->varcollid); + + /* Update the shardgroup splits with the newly assigned colocation id */ + Shardgroup *shardgroup = NULL; + foreach_ptr(shardgroup, *shardgroupSplits) + { + shardgroup->colocationId = colocationId; + } } UpdateDistributionColumnGlobally(relationId, distributionMethod, @@ -1198,14 +1227,19 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, static void InsertSplitChildrenShardgroupMetadata(List *newShardgroups) { + if (list_length(newShardgroups) <= 0) + { + return; + } + Shardgroup *shardgroup = NULL; foreach_ptr(shardgroup, newShardgroups) { InsertShardGroupRow( shardgroup->shardgroupId, shardgroup->colocationId, - IntegerToText(DatumGetInt32(shardgroup->minShardValue)), - IntegerToText(DatumGetInt32(shardgroup->maxShardValue))); + cstring_to_text(shardgroup->minShardValue), + cstring_to_text(shardgroup->maxShardValue)); } if (/* TODO check if shardgroup needs to propagate */ true) @@ -1479,10 +1513,23 @@ NonBlockingShardSplit(SplitOperation splitOperation, char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); - List *shardgroupSplits = CreateNewShardgroups( - colocationId, - (ShardInterval *) linitial(sourceColocatedShardIntervalList), - shardSplitPointsList); + List *shardgroupSplits = NIL; + if (targetColocationId == INVALID_COLOCATION_ID) + { + shardgroupSplits = CreateNewShardgroups( + colocationId, + (ShardInterval *) linitial(sourceColocatedShardIntervalList), + shardSplitPointsList); + } + else + { + /* + * We are distributing a new table onto an existing colocation id, load + * shardgroupSplits from there. + */ + + shardgroupSplits = ShardgroupForColocationId(targetColocationId); + } /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( @@ -1685,7 +1732,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, distributionColumnOverrides, distributionMethod, shardCount, - targetColocationId); + targetColocationId, + &shardgroupSplits); + } /* 12) Insert new shardgroup, shard and placement metadata */ InsertSplitChildrenShardgroupMetadata(shardgroupSplits); diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index c7f75cf3e..3f5544e59 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -1,7 +1,7 @@ -- citus--11.3-1--11.2-1 -- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now DROP TABLE pg_catalog.pg_dist_shardgroup; -DROP SEQUENCE citus.pg_dist_shardgroupid_seq; +DROP SEQUENCE pg_catalog.pg_dist_shardgroupid_seq; ALTER TABLE pg_catalog.pg_dist_shard DROP COLUMN shardgroupid; @@ -9,4 +9,4 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "ch #include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql" DROP FUNCTION pg_catalog.citus_internal_add_shardgroup_metadata(bigint, integer, text, text); -DROP FUNCTION FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int); +DROP FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(bigint, int); diff --git a/src/backend/distributed/utils/shardgroup.c b/src/backend/distributed/utils/shardgroup.c new file mode 100644 index 000000000..094535196 --- /dev/null +++ b/src/backend/distributed/utils/shardgroup.c @@ -0,0 +1,68 @@ +/*------------------------------------------------------------------------- + * + * shardgroup.c + * + * This file contains functions to perform useful operations on shardgroups. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "utils/fmgroids.h" +#include "access/skey.h" +#include "utils/relcache.h" +#include "storage/lockdefs.h" +#include "utils/builtins.h" + +#include "distributed/listutils.h" +#include "distributed/shardgroup.h" +#include "distributed/pg_dist_shardgroup.h" +#include "distributed/metadata_cache.h" + +List * +ShardgroupForColocationId(uint32 colocationId) +{ + ScanKeyData scanKey[1] = { 0 }; + Form_pg_dist_shardgroup shardgroupForm = NULL; + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardgroup); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + SysScanDesc scanDescriptor = + systable_beginscan(pgDistShardgroup, DistShardgroupColocaionidIndexId(), true, + NULL, lengthof(scanKey), scanKey); + + HeapTuple heapTuple = NULL; + List *shardgroups = NIL; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum datumArray[Natts_pg_dist_shardgroup] = { 0 }; + bool isNullArray[Natts_pg_dist_shardgroup] = { 0 }; + heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); + shardgroupForm = (Form_pg_dist_shardgroup) GETSTRUCT(heapTuple); + + Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); + shardgroup->shardgroupId = shardgroupForm->shardgroupid; + shardgroup->colocationId = shardgroupForm->colocationid; + + /* load shard min/max values */ + Datum minValueTextDatum = datumArray[Anum_pg_dist_shardgroup_shardminvalue - 1]; + Datum maxValueTextDatum = datumArray[Anum_pg_dist_shardgroup_shardmaxvalue - 1]; + shardgroup->minShardValue = TextDatumGetCString(minValueTextDatum); + shardgroup->maxShardValue = TextDatumGetCString(maxValueTextDatum); + + + shardgroups = lappend(shardgroups, shardgroup); + } + + systable_endscan(scanDescriptor); + table_close(pgDistShardgroup, NoLock); + + return shardgroups; +} diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index 65c7958f7..8aeeb8856 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -274,6 +274,7 @@ extern Oid ForeignConstraintGetReferencedTableId(const char *queryString); extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName); +extern char * IntegerToCStr(int32 value); extern text * IntegerToText(int32 value); /* Function declarations for generating metadata for shard and placement creation */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index e1a0a5965..e604e1725 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -253,6 +253,7 @@ extern Oid DistBackgroundTaskDependDependsOnIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardgroupPkeyId(void); +extern Oid DistShardgroupColocaionidIndexId(void); extern Oid DistPlacementShardidIndexId(void); extern Oid DistPlacementPlacementidIndexId(void); extern Oid DistColocationIndexId(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index c81c92e20..0aafc8692 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -75,8 +75,8 @@ typedef struct Shardgroup { int64 shardgroupId; uint32 colocationId; - Datum minShardValue; /* a shard's typed min value datum */ - Datum maxShardValue; /* a shard's typed max value datum */ + const char *minShardValue; + const char *maxShardValue; } Shardgroup; diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index f5a37da45..cead11ad1 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -23,6 +23,7 @@ /* Shard name and identifier related defines */ #define SHARD_NAME_SEPARATOR '_' #define INVALID_SHARD_ID 0 +#define INVALID_SHARDGROUP_ID 0 #define INVALID_PLACEMENT_ID 0 diff --git a/src/include/distributed/shardgroup.h b/src/include/distributed/shardgroup.h new file mode 100644 index 000000000..05dffebe5 --- /dev/null +++ b/src/include/distributed/shardgroup.h @@ -0,0 +1,11 @@ + +#ifndef CITUS_SHARDGROUP_H +#define CITUS_SHARDGROUP_H + +#include "postgres.h" + +#include "distributed/listutils.h" + +extern List * ShardgroupForColocationId(uint32 colocationId); + +#endif /*CITUS_SHARDGROUP_H */