mirror of https://github.com/citusdata/citus.git
Remove hacky group id check and unused UDFs
parent
697d1468fe
commit
f3522763e5
|
@ -241,7 +241,6 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
|
||||||
if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId))
|
if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId))
|
||||||
{
|
{
|
||||||
/* skip table metadata creation when the Citus table is owned by an extension */
|
/* skip table metadata creation when the Citus table is owned by an extension */
|
||||||
/* TODO: Check sequence next val type */
|
|
||||||
List *commandList = NIL;
|
List *commandList = NIL;
|
||||||
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
|
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
|
||||||
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
|
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
|
||||||
|
|
|
@ -2187,7 +2187,7 @@ LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
|
||||||
* there were changes in pg_dist_node and we will get those invalidations
|
* there were changes in pg_dist_node and we will get those invalidations
|
||||||
* in LookupNodeForGroup.
|
* in LookupNodeForGroup.
|
||||||
*/
|
*/
|
||||||
WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId, false);
|
WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId);
|
||||||
*nodeName = workerNode->workerName;
|
*nodeName = workerNode->workerName;
|
||||||
*nodePort = workerNode->workerPort;
|
*nodePort = workerNode->workerPort;
|
||||||
}
|
}
|
||||||
|
|
|
@ -833,12 +833,7 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
|
||||||
|
|
||||||
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
|
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
|
||||||
int32 groupId = groupShardPlacement->groupId;
|
int32 groupId = groupShardPlacement->groupId;
|
||||||
WorkerNode *workerNode = LookupNodeForGroup(groupId, true);
|
WorkerNode *workerNode = LookupNodeForGroup(groupId);
|
||||||
|
|
||||||
if (workerNode == NULL)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* copy everything into shardPlacement but preserve the header */
|
/* copy everything into shardPlacement but preserve the header */
|
||||||
CitusNode header = shardPlacement->type;
|
CitusNode header = shardPlacement->type;
|
||||||
|
@ -934,7 +929,7 @@ LookupNodeByNodeIdOrError(uint32 nodeId)
|
||||||
* appropriate error message.
|
* appropriate error message.
|
||||||
*/
|
*/
|
||||||
WorkerNode *
|
WorkerNode *
|
||||||
LookupNodeForGroup(int32 groupId, bool checkExistence)
|
LookupNodeForGroup(int32 groupId)
|
||||||
{
|
{
|
||||||
bool foundAnyNodes = false;
|
bool foundAnyNodes = false;
|
||||||
|
|
||||||
|
@ -958,20 +953,10 @@ LookupNodeForGroup(int32 groupId, bool checkExistence)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!foundAnyNodes)
|
if (!foundAnyNodes)
|
||||||
{
|
|
||||||
if (checkExistence)
|
|
||||||
{
|
|
||||||
ereport(NOTICE, (errmsg("there is a shard placement in node group %d but "
|
|
||||||
"there are no nodes in that group", groupId)));
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("there is a shard placement in node group %d but "
|
ereport(ERROR, (errmsg("there is a shard placement in node group %d but "
|
||||||
"there are no nodes in that group", groupId)));
|
"there are no nodes in that group", groupId)));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
switch (ReadFromSecondaries)
|
switch (ReadFromSecondaries)
|
||||||
{
|
{
|
||||||
|
@ -1029,11 +1014,6 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
|
||||||
tableEntry,
|
tableEntry,
|
||||||
shardIndex);
|
shardIndex);
|
||||||
|
|
||||||
if (shardPlacement == NULL)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
placementList = lappend(placementList, shardPlacement);
|
placementList = lappend(placementList, shardPlacement);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -391,7 +391,7 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
|
||||||
placementIndex++)
|
placementIndex++)
|
||||||
{
|
{
|
||||||
GroupShardPlacement *groupPlacement = &placementArray[placementIndex];
|
GroupShardPlacement *groupPlacement = &placementArray[placementIndex];
|
||||||
WorkerNode *worker = LookupNodeForGroup(groupPlacement->groupId, false);
|
WorkerNode *worker = LookupNodeForGroup(groupPlacement->groupId);
|
||||||
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
|
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
|
||||||
placement->shardId = groupPlacement->shardId;
|
placement->shardId = groupPlacement->shardId;
|
||||||
placement->shardLength = groupPlacement->shardLength;
|
placement->shardLength = groupPlacement->shardLength;
|
||||||
|
|
|
@ -455,7 +455,7 @@ CoordinatorNodeIfAddedAsWorkerOrError()
|
||||||
{
|
{
|
||||||
ErrorIfCoordinatorNotAddedAsWorkerNode();
|
ErrorIfCoordinatorNotAddedAsWorkerNode();
|
||||||
|
|
||||||
WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID, false);
|
WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID);
|
||||||
|
|
||||||
WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode));
|
WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode));
|
||||||
*coordinatorNodeCopy = *coordinatorNode;
|
*coordinatorNodeCopy = *coordinatorNode;
|
||||||
|
|
|
@ -40,12 +40,6 @@ CREATE FUNCTION worker_drop_distributed_table_only(table_name text)
|
||||||
COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text)
|
COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text)
|
||||||
IS 'drop the distributed table only without the metadata';
|
IS 'drop the distributed table only without the metadata';
|
||||||
|
|
||||||
CREATE FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid)
|
|
||||||
RETURNS VOID
|
|
||||||
LANGUAGE C STRICT
|
|
||||||
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_metadata_only$$;
|
|
||||||
COMMENT ON FUNCTION worker_drop_distributed_table_metadata_only(table_oid oid)
|
|
||||||
IS 'drops the metadata of the given table oid';
|
|
||||||
-- Here we keep track of partitioned tables that exists before Citus 11
|
-- Here we keep track of partitioned tables that exists before Citus 11
|
||||||
-- where we need to call fix_all_partition_shard_index_names() before
|
-- where we need to call fix_all_partition_shard_index_names() before
|
||||||
-- metadata is synced. Note that after citus-11, we automatically
|
-- metadata is synced. Note that after citus-11, we automatically
|
||||||
|
|
|
@ -35,7 +35,6 @@
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
|
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
|
||||||
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_only);
|
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_only);
|
||||||
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_metadata_only);
|
|
||||||
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);
|
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);
|
||||||
|
|
||||||
|
|
||||||
|
@ -235,49 +234,6 @@ worker_drop_distributed_table_only(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* worker_drop_distributed_table_metadata_only removes the associated rows from pg_dist_partition,
|
|
||||||
* pg_dist_shard and pg_dist_placement for the given relation.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
worker_drop_distributed_table_metadata_only(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
CheckCitusVersion(ERROR);
|
|
||||||
|
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
|
||||||
|
|
||||||
List *shardList = LoadShardList(relationId);
|
|
||||||
|
|
||||||
/* iterate over shardList to delete the corresponding rows */
|
|
||||||
uint64 *shardIdPointer = NULL;
|
|
||||||
foreach_ptr(shardIdPointer, shardList)
|
|
||||||
{
|
|
||||||
uint64 shardId = *shardIdPointer;
|
|
||||||
|
|
||||||
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
|
|
||||||
if (shardPlacementList == NULL)
|
|
||||||
{
|
|
||||||
ereport(WARNING, (errmsg("placement for relation with oid %d does not exist, skipping", relationId)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardPlacement *placement = NULL;
|
|
||||||
foreach_ptr(placement, shardPlacementList)
|
|
||||||
{
|
|
||||||
/* delete the row from pg_dist_placement */
|
|
||||||
DeleteShardPlacementRow(placement->placementId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* delete the row from pg_dist_shard */
|
|
||||||
DeleteShardRow(shardId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* delete the row from pg_dist_partition */
|
|
||||||
DeletePartitionRow(relationId);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* worker_drop_sequence_dependency is a UDF that removes the dependency
|
* worker_drop_sequence_dependency is a UDF that removes the dependency
|
||||||
* of all the sequences for the given table.
|
* of all the sequences for the given table.
|
||||||
|
|
|
@ -211,7 +211,7 @@ extern bool HasAnyNodes(void);
|
||||||
extern HTAB * GetWorkerNodeHash(void);
|
extern HTAB * GetWorkerNodeHash(void);
|
||||||
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
|
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
|
||||||
extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId);
|
extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId);
|
||||||
extern WorkerNode * LookupNodeForGroup(int32 groupId, bool checkExistence);
|
extern WorkerNode * LookupNodeForGroup(int32 groupId);
|
||||||
|
|
||||||
/* namespace oids */
|
/* namespace oids */
|
||||||
extern Oid CitusCatalogNamespaceId(void);
|
extern Oid CitusCatalogNamespaceId(void);
|
||||||
|
|
|
@ -77,8 +77,6 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
|
||||||
#define DELETE_ALL_PARTITIONS "TRUNCATE pg_dist_partition CASCADE"
|
#define DELETE_ALL_PARTITIONS "TRUNCATE pg_dist_partition CASCADE"
|
||||||
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
|
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
|
||||||
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
|
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
|
||||||
#define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \
|
|
||||||
"SELECT worker_drop_distributed_table_metadata_only(logicalrelid::oid) FROM pg_dist_partition"
|
|
||||||
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
|
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
|
||||||
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
|
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
|
||||||
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
|
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
|
||||||
|
|
|
@ -120,16 +120,11 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port + 1);
|
||||||
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port);
|
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port);
|
||||||
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
||||||
|
|
||||||
-- try to manipulate node metadata via privileged user
|
-- show that non-admin role can not activate a node
|
||||||
SET ROLE node_metadata_user;
|
SET ROLE node_metadata_user;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);
|
||||||
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
|
||||||
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
|
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
|
||||||
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port);
|
|
||||||
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
|
|
||||||
SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport;
|
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
\c - postgres - :master_port
|
\c - postgres - :master_port
|
||||||
|
|
Loading…
Reference in New Issue