mirror of https://github.com/citusdata/citus.git
remove shardgroups on colocation group removal
parent
acac3090ae
commit
9cacbeab7e
|
@ -141,6 +141,7 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
|
||||||
int replicationFactor,
|
int replicationFactor,
|
||||||
Oid distributionColumnType,
|
Oid distributionColumnType,
|
||||||
Oid distributionColumnCollation);
|
Oid distributionColumnCollation);
|
||||||
|
static char * ShardgroupDeleteCommandByColocationId(uint32 colocationId);
|
||||||
static char * ColocationGroupDeleteCommand(uint32 colocationId);
|
static char * ColocationGroupDeleteCommand(uint32 colocationId);
|
||||||
static char * RemoteTypeIdExpression(Oid typeId);
|
static char * RemoteTypeIdExpression(Oid typeId);
|
||||||
static char * RemoteCollationIdExpression(Oid colocationId);
|
static char * RemoteCollationIdExpression(Oid colocationId);
|
||||||
|
@ -169,6 +170,7 @@ PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
|
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_internal_delete_shardgroup_metadata);
|
||||||
|
|
||||||
|
|
||||||
static bool got_SIGTERM = false;
|
static bool got_SIGTERM = false;
|
||||||
|
@ -3852,6 +3854,30 @@ citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_internal_delete_shardgroup_metadata is an internal UDF to
|
||||||
|
* delte rows from pg_dist_shardgroup.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_internal_delete_shardgroup_metadata(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
EnsureSuperUser();
|
||||||
|
|
||||||
|
int colocationId = PG_GETARG_INT32(0);
|
||||||
|
|
||||||
|
if (!ShouldSkipMetadataChecks())
|
||||||
|
{
|
||||||
|
/* this UDF is not allowed allowed for executing as a separate command */
|
||||||
|
EnsureCoordinatorInitiatedOperation();
|
||||||
|
}
|
||||||
|
|
||||||
|
DeleteShardgroupForColocationIdLocally(colocationId);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
||||||
*/
|
*/
|
||||||
|
@ -3969,6 +3995,14 @@ RemoteCollationIdExpression(Oid colocationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId)
|
||||||
|
{
|
||||||
|
char *command = ShardgroupDeleteCommandByColocationId(colocationId);
|
||||||
|
SendCommandToWorkersWithMetadataViaSuperUser(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers.
|
* SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers.
|
||||||
*/
|
*/
|
||||||
|
@ -3985,6 +4019,20 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static char *
|
||||||
|
ShardgroupDeleteCommandByColocationId(uint32 colocationId)
|
||||||
|
{
|
||||||
|
StringInfoData deleteCommand = { 0 };
|
||||||
|
initStringInfo(&deleteCommand);
|
||||||
|
|
||||||
|
appendStringInfo(&deleteCommand,
|
||||||
|
"SELECT pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid => %d)",
|
||||||
|
colocationId);
|
||||||
|
|
||||||
|
return deleteCommand.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColocationGroupDeleteCommand returns a command for deleting a colocation group.
|
* ColocationGroupDeleteCommand returns a command for deleting a colocation group.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -61,3 +61,4 @@ DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(
|
||||||
);
|
);
|
||||||
|
|
||||||
#include "udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql"
|
#include "udfs/citus_internal_add_shardgroup_metadata/11.3-1.sql"
|
||||||
|
#include "udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql"
|
||||||
|
|
8
src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql
generated
Normal file
8
src/backend/distributed/sql/udfs/citus_internal_delete_shardgroup_metadata/11.3-1.sql
generated
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS
|
||||||
|
'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to';
|
|
@ -0,0 +1,8 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(colocationid int)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C
|
||||||
|
STRICT
|
||||||
|
AS 'MODULE_PATHNAME';
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shardgroup_metadata(int) IS
|
||||||
|
'deletes a shardgroups from pg_dist_shardgroup based on the colocation id they belong to';
|
|
@ -39,6 +39,7 @@
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "distributed/pg_dist_shardgroup.h"
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
|
@ -50,6 +51,7 @@ static bool HashPartitionedShardIntervalsEqual(ShardInterval *leftShardInterval,
|
||||||
static int CompareShardPlacementsByNode(const void *leftElement,
|
static int CompareShardPlacementsByNode(const void *leftElement,
|
||||||
const void *rightElement);
|
const void *rightElement);
|
||||||
static void DeleteColocationGroup(uint32 colocationId);
|
static void DeleteColocationGroup(uint32 colocationId);
|
||||||
|
static void DeleteShardgroupForColocationId(uint32 colocationId);
|
||||||
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
|
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
|
||||||
static void BreakColocation(Oid sourceRelationId);
|
static void BreakColocation(Oid sourceRelationId);
|
||||||
|
|
||||||
|
@ -1250,12 +1252,67 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId)
|
||||||
|
|
||||||
if (colocatedTableCount == 0)
|
if (colocatedTableCount == 0)
|
||||||
{
|
{
|
||||||
|
DeleteShardgroupForColocationId(colocationId);
|
||||||
DeleteColocationGroup(colocationId);
|
DeleteColocationGroup(colocationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
DeleteShardgroupForColocationId(uint32 colocationId)
|
||||||
|
{
|
||||||
|
DeleteShardgroupForColocationIdLocally(colocationId);
|
||||||
|
SyncDeleteShardgroupForColocationIdToNodes(colocationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
DeleteShardgroupForColocationIdLocally(uint32 colocationId)
|
||||||
|
{
|
||||||
|
ScanKeyData scanKey[1] = { 0 };
|
||||||
|
bool indexOK = false;
|
||||||
|
|
||||||
|
Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_shardgroup_colocationid,
|
||||||
|
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
|
||||||
|
|
||||||
|
/* TODO add index to find shardgroups by colocationid */
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup, InvalidOid, indexOK,
|
||||||
|
NULL, lengthof(scanKey), scanKey);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* simple_heap_delete() expects that the caller has at least an
|
||||||
|
* AccessShareLock on primary key index.
|
||||||
|
*
|
||||||
|
* XXX: This does not seem required, do we really need to acquire this lock?
|
||||||
|
* Postgres doesn't acquire such locks on indexes before deleting catalog tuples.
|
||||||
|
* Linking here the reasons we added this lock acquirement:
|
||||||
|
* https://github.com/citusdata/citus/pull/2851#discussion_r306569462
|
||||||
|
* https://github.com/citusdata/citus/pull/2855#discussion_r313628554
|
||||||
|
* https://github.com/citusdata/citus/issues/1890
|
||||||
|
*/
|
||||||
|
Relation replicaIndex =
|
||||||
|
index_open(RelationGetPrimaryKeyIndex(pgDistShardgroup), AccessShareLock);
|
||||||
|
|
||||||
|
/* for all records found delete from heap */
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||||
|
{
|
||||||
|
simple_heap_delete(pgDistShardgroup, &(heapTuple->t_self));
|
||||||
|
|
||||||
|
CitusInvalidateRelcacheByRelid(DistShardgroupRelationId());
|
||||||
|
}
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
table_close(replicaIndex, AccessShareLock);
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
table_close(pgDistShardgroup, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation
|
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation
|
||||||
* throughout the cluster.
|
* throughout the cluster.
|
||||||
|
|
|
@ -49,6 +49,7 @@ extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colo
|
||||||
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
|
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
|
||||||
extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
|
extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
|
||||||
extern void DeleteColocationGroupLocally(uint32 colocationId);
|
extern void DeleteColocationGroupLocally(uint32 colocationId);
|
||||||
|
extern void DeleteShardgroupForColocationIdLocally(uint32 colocationId);
|
||||||
extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel,
|
extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel,
|
||||||
Oid distributionColumnType,
|
Oid distributionColumnType,
|
||||||
Oid distributionColumnCollation,
|
Oid distributionColumnCollation,
|
||||||
|
|
|
@ -113,6 +113,7 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
|
||||||
int replicationFactor,
|
int replicationFactor,
|
||||||
Oid distributionColumType,
|
Oid distributionColumType,
|
||||||
Oid distributionColumnCollation);
|
Oid distributionColumnCollation);
|
||||||
|
extern void SyncDeleteShardgroupForColocationIdToNodes(uint32 colocationId);
|
||||||
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
||||||
|
|
||||||
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
||||||
|
|
Loading…
Reference in New Issue