mirror of https://github.com/citusdata/citus.git
Replace direct inserts in csql's \stage by serverside functions.
\stage so far directly inserted into pg_dist_shard and pg_dist_shard_placement. That makes it hard to do effective permission checks. Thus move the inserts into two C functions. These two new functions aren't the nicest abstraction. But as we are planning to obsolete \stage, it doesn't seem worthwhile to refactor the client-side code of \stage to allow the use of master_create_empty_shard() et al.pull/471/head
parent
12a246de37
commit
bf87e08331
|
@ -1,3 +1,24 @@
|
|||
/* citus--5.0--5.0-1.sql */
|
||||
|
||||
-- Currently nothing to do here
|
||||
CREATE FUNCTION pg_catalog.master_stage_shard_row(logicalrelid oid,
|
||||
shardid bigint,
|
||||
shardstorage "char",
|
||||
shardminvalue text,
|
||||
shardmaxvalue text)
|
||||
RETURNS VOID
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$master_stage_shard_row$$;
|
||||
COMMENT ON FUNCTION pg_catalog.master_stage_shard_row(oid, bigint, "char", text, text)
|
||||
IS 'deprecated function to insert a row into pg_dist_shard';
|
||||
|
||||
CREATE FUNCTION pg_catalog.master_stage_shard_placement_row(shardid int8,
|
||||
shardstate int4,
|
||||
shardlength int8,
|
||||
nodename text,
|
||||
nodeport int4)
|
||||
RETURNS VOID
|
||||
STRICT
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$;
|
||||
COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4)
|
||||
IS 'deprecated function to insert a row into pg_dist_shard_placement';
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "distributed/worker_manager.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "parser/scansup.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/datum.h"
|
||||
|
@ -44,6 +45,11 @@
|
|||
static uint64 * AllocateUint64(uint64 value);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(master_stage_shard_row);
|
||||
PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
|
||||
|
||||
|
||||
/*
|
||||
* LoadShardIntervalList returns a list of shard intervals related for a given
|
||||
* distributed table. The function returns an empty list if no shards can be
|
||||
|
@ -647,3 +653,138 @@ EnsureTableOwner(Oid relationId)
|
|||
get_rel_name(relationId));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* master_stage_shard_row() inserts a row into pg_dist_shard, after performing
|
||||
* basic permission checks.
|
||||
*
|
||||
* TODO: This function only exists for csql's \stage, and should not otherwise
|
||||
* be used. Once \stage is removed, it'll be removed too.
|
||||
*/
|
||||
Datum
|
||||
master_stage_shard_row(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid distributedRelationId = InvalidOid;
|
||||
uint64 shardId = 0;
|
||||
char storageType = 0;
|
||||
text *shardMinValue = NULL;
|
||||
text *shardMaxValue = NULL;
|
||||
Relation relation;
|
||||
|
||||
/*
|
||||
* Have to check arguments for NULLness as it can't be declared STRICT
|
||||
* because of min/max arguments, which have to be NULLable for new shards.
|
||||
*/
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("relation cannot be null")));
|
||||
}
|
||||
else if (PG_ARGISNULL(1))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("shard cannot be null")));
|
||||
}
|
||||
else if (PG_ARGISNULL(2))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("storage type cannot be null")));
|
||||
}
|
||||
|
||||
distributedRelationId = PG_GETARG_OID(0);
|
||||
shardId = PG_GETARG_INT64(1);
|
||||
storageType = PG_GETARG_CHAR(2);
|
||||
|
||||
if (!PG_ARGISNULL(3))
|
||||
{
|
||||
shardMinValue = PG_GETARG_TEXT_P(3);
|
||||
}
|
||||
|
||||
if (!PG_ARGISNULL(4))
|
||||
{
|
||||
shardMaxValue = PG_GETARG_TEXT_P(4);
|
||||
}
|
||||
|
||||
relation = heap_open(distributedRelationId, RowExclusiveLock);
|
||||
|
||||
/*
|
||||
* Check permissions on relation. Note we require ACL_INSERT and not owner
|
||||
* rights - it'd be worse for security to require every user performing
|
||||
* data loads to be made a table owner - besides being more complex to set
|
||||
* up.
|
||||
*/
|
||||
EnsureTablePermissions(distributedRelationId, ACL_INSERT);
|
||||
|
||||
/* and finally actually insert the row */
|
||||
InsertShardRow(distributedRelationId, shardId, storageType,
|
||||
shardMinValue, shardMaxValue);
|
||||
|
||||
heap_close(relation, NoLock);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* master_stage_shard_placement_row() inserts a row into
|
||||
* pg_dist_shard_placment, after performing some basic checks.
|
||||
*
|
||||
* TODO: This function only exists for csql's \stage, and should not otherwise
|
||||
* be used. Once \stage is removed, it'll be removed too.
|
||||
*/
|
||||
Datum
|
||||
master_stage_shard_placement_row(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint64 shardId = PG_GETARG_INT64(0);
|
||||
int32 shardState = PG_GETARG_INT32(1);
|
||||
int32 shardLength = PG_GETARG_INT64(2);
|
||||
char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(3));
|
||||
int nodePort = PG_GETARG_INT32(4);
|
||||
|
||||
Oid distributedRelationId = InvalidOid;
|
||||
Relation relation = NULL;
|
||||
|
||||
Relation pgDistShard = heap_open(DistShardRelationId(), RowExclusiveLock);
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
/* Lookup which table the shardid belongs to */
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
|
||||
|
||||
scanDescriptor = systable_beginscan(pgDistShard,
|
||||
DistShardShardidIndexId(), true,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
if (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
|
||||
distributedRelationId = pgDistShardForm->logicalrelid;
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for shard "
|
||||
UINT64_FORMAT, shardId)));
|
||||
}
|
||||
systable_endscan(scanDescriptor);
|
||||
|
||||
relation = heap_open(distributedRelationId, RowExclusiveLock);
|
||||
|
||||
/*
|
||||
* Check permissions on relation. Note we require ACL_INSERT and not owner
|
||||
* rights - it'd be worse for security to require every user performing
|
||||
* data loads to be made a table owner - besides being more complex to set
|
||||
* up.
|
||||
*/
|
||||
EnsureTablePermissions(distributedRelationId, ACL_INSERT);
|
||||
|
||||
/* finally insert placement */
|
||||
InsertShardPlacementRow(shardId, shardState, shardLength, nodeName, nodePort);
|
||||
|
||||
heap_close(relation, NoLock);
|
||||
heap_close(pgDistShard, NoLock);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -39,13 +39,11 @@
|
|||
"SELECT * FROM master_get_round_robin_candidate_nodes($1::int8)"
|
||||
|
||||
#define MASTER_INSERT_SHARD_ROW \
|
||||
"INSERT INTO pg_dist_shard " \
|
||||
"(logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES " \
|
||||
"($1::oid, $2::int8, $3::char, $4::text, $5::text)"
|
||||
"SELECT master_stage_shard_row(\
|
||||
$1::oid, $2::int8, $3::\"char\", $4::text, $5::text)"
|
||||
#define MASTER_INSERT_PLACEMENT_ROW \
|
||||
"INSERT INTO pg_dist_shard_placement " \
|
||||
"(shardid, shardstate, shardlength, nodename, nodeport) VALUES " \
|
||||
"($1::int8, $2::int4, $3::int8, $4::text, $5::int4)"
|
||||
"SELECT master_stage_shard_placement_row(\
|
||||
$1::int8, $2::int4, $3::int8, $4::text, $5::int4)"
|
||||
|
||||
/* Column names used to identify response fields as returned from the master. */
|
||||
#define LOGICAL_RELID_FIELD "logical_relid"
|
||||
|
|
Loading…
Reference in New Issue