mirror of https://github.com/citusdata/citus.git
Merge pull request #1255 from citusdata/remove-unused-func
Remove unused metadata functions. cr: @jasonmp85pull/1274/head
commit
f5fbf1e621
|
@ -9,7 +9,8 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17
|
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||||
|
6.2-1
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -129,6 +130,8 @@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16.
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql
|
$(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -1,29 +1,5 @@
|
||||||
/* citus--5.0--5.0-1.sql */
|
/* citus--5.0--5.0-1.sql */
|
||||||
|
|
||||||
CREATE FUNCTION pg_catalog.master_stage_shard_row(logicalrelid oid,
|
|
||||||
shardid bigint,
|
|
||||||
shardstorage "char",
|
|
||||||
shardminvalue text,
|
|
||||||
shardmaxvalue text)
|
|
||||||
RETURNS VOID
|
|
||||||
LANGUAGE C
|
|
||||||
AS 'MODULE_PATHNAME', $$master_stage_shard_row$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.master_stage_shard_row(oid, bigint, "char", text, text)
|
|
||||||
IS 'deprecated function to insert a row into pg_dist_shard';
|
|
||||||
|
|
||||||
CREATE FUNCTION pg_catalog.master_stage_shard_placement_row(shardid int8,
|
|
||||||
shardstate int4,
|
|
||||||
shardlength int8,
|
|
||||||
nodename text,
|
|
||||||
nodeport int4)
|
|
||||||
RETURNS VOID
|
|
||||||
STRICT
|
|
||||||
LANGUAGE C
|
|
||||||
AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$;
|
|
||||||
COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4)
|
|
||||||
IS 'deprecated function to insert a row into pg_dist_shard_placement';
|
|
||||||
|
|
||||||
|
|
||||||
ALTER FUNCTION pg_catalog.citus_drop_trigger() SECURITY DEFINER;
|
ALTER FUNCTION pg_catalog.citus_drop_trigger() SECURITY DEFINER;
|
||||||
|
|
||||||
GRANT SELECT ON pg_catalog.pg_dist_partition TO public;
|
GRANT SELECT ON pg_catalog.pg_dist_partition TO public;
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
/* citus--6.1-17--6.2-1.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS master_get_local_first_candidate_nodes();
|
||||||
|
DROP FUNCTION IF EXISTS master_get_round_robin_candidate_nodes();
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS master_stage_shard_row();
|
||||||
|
DROP FUNCTION IF EXISTS master_stage_shard_placement_row();
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.1-17'
|
default_version = '6.2-1'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -129,14 +129,6 @@ CREATE FUNCTION master_get_new_shardid()
|
||||||
COMMENT ON FUNCTION master_get_new_shardid()
|
COMMENT ON FUNCTION master_get_new_shardid()
|
||||||
IS 'fetch unique shardId';
|
IS 'fetch unique shardId';
|
||||||
|
|
||||||
CREATE FUNCTION master_get_local_first_candidate_nodes(OUT node_name text,
|
|
||||||
OUT node_port bigint)
|
|
||||||
RETURNS SETOF record
|
|
||||||
LANGUAGE C STRICT ROWS 100
|
|
||||||
AS 'MODULE_PATHNAME', $$master_get_local_first_candidate_nodes$$;
|
|
||||||
COMMENT ON FUNCTION master_get_local_first_candidate_nodes()
|
|
||||||
IS 'fetch set of candidate nodes for shard uploading choosing the local node first';
|
|
||||||
|
|
||||||
CREATE FUNCTION master_create_empty_shard(text)
|
CREATE FUNCTION master_create_empty_shard(text)
|
||||||
RETURNS bigint
|
RETURNS bigint
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
|
@ -174,15 +166,6 @@ CREATE FUNCTION master_get_active_worker_nodes(OUT node_name text, OUT node_port
|
||||||
COMMENT ON FUNCTION master_get_active_worker_nodes()
|
COMMENT ON FUNCTION master_get_active_worker_nodes()
|
||||||
IS 'fetch set of active worker nodes';
|
IS 'fetch set of active worker nodes';
|
||||||
|
|
||||||
CREATE FUNCTION master_get_round_robin_candidate_nodes(shard_id bigint,
|
|
||||||
OUT node_name text,
|
|
||||||
OUT node_port bigint)
|
|
||||||
RETURNS SETOF record
|
|
||||||
LANGUAGE C STRICT ROWS 100
|
|
||||||
AS 'MODULE_PATHNAME', $$master_get_round_robin_candidate_nodes$$;
|
|
||||||
COMMENT ON FUNCTION master_get_round_robin_candidate_nodes(shard_id bigint)
|
|
||||||
IS 'fetch set of candidate nodes for shard uploading in round-robin manner';
|
|
||||||
|
|
||||||
CREATE FUNCTION master_create_distributed_table(table_name regclass,
|
CREATE FUNCTION master_create_distributed_table(table_name regclass,
|
||||||
distribution_column text,
|
distribution_column text,
|
||||||
distribution_method citus.distribution_type)
|
distribution_method citus.distribution_type)
|
||||||
|
|
|
@ -59,11 +59,6 @@ static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
|
||||||
HeapTuple heapTuple);
|
HeapTuple heapTuple);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
|
||||||
PG_FUNCTION_INFO_V1(master_stage_shard_row);
|
|
||||||
PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TableShardReplicationFactor returns the current replication factor of the
|
* TableShardReplicationFactor returns the current replication factor of the
|
||||||
* given relation by looking into shard placements. It errors out if there
|
* given relation by looking into shard placements. It errors out if there
|
||||||
|
@ -1020,143 +1015,6 @@ TableOwner(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* master_stage_shard_row() inserts a row into pg_dist_shard, after performing
|
|
||||||
* basic permission checks.
|
|
||||||
*
|
|
||||||
* TODO: This function only exists for csql's \stage, and should not otherwise
|
|
||||||
* be used. Once \stage is removed, it'll be removed too.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
master_stage_shard_row(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
Oid distributedRelationId = InvalidOid;
|
|
||||||
uint64 shardId = 0;
|
|
||||||
char storageType = 0;
|
|
||||||
text *shardMinValue = NULL;
|
|
||||||
text *shardMaxValue = NULL;
|
|
||||||
Relation relation;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Have to check arguments for NULLness as it can't be declared STRICT
|
|
||||||
* because of min/max arguments, which have to be NULLable for new shards.
|
|
||||||
*/
|
|
||||||
if (PG_ARGISNULL(0))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
||||||
errmsg("relation cannot be null")));
|
|
||||||
}
|
|
||||||
else if (PG_ARGISNULL(1))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
||||||
errmsg("shard cannot be null")));
|
|
||||||
}
|
|
||||||
else if (PG_ARGISNULL(2))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
||||||
errmsg("storage type cannot be null")));
|
|
||||||
}
|
|
||||||
|
|
||||||
distributedRelationId = PG_GETARG_OID(0);
|
|
||||||
shardId = PG_GETARG_INT64(1);
|
|
||||||
storageType = PG_GETARG_CHAR(2);
|
|
||||||
|
|
||||||
if (!PG_ARGISNULL(3))
|
|
||||||
{
|
|
||||||
shardMinValue = PG_GETARG_TEXT_P(3);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!PG_ARGISNULL(4))
|
|
||||||
{
|
|
||||||
shardMaxValue = PG_GETARG_TEXT_P(4);
|
|
||||||
}
|
|
||||||
|
|
||||||
relation = heap_open(distributedRelationId, RowExclusiveLock);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check permissions on relation. Note we require ACL_INSERT and not owner
|
|
||||||
* rights - it'd be worse for security to require every user performing
|
|
||||||
* data loads to be made a table owner - besides being more complex to set
|
|
||||||
* up.
|
|
||||||
*/
|
|
||||||
EnsureTablePermissions(distributedRelationId, ACL_INSERT);
|
|
||||||
|
|
||||||
/* and finally actually insert the row */
|
|
||||||
InsertShardRow(distributedRelationId, shardId, storageType,
|
|
||||||
shardMinValue, shardMaxValue);
|
|
||||||
|
|
||||||
heap_close(relation, NoLock);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* master_stage_shard_placement_row() inserts a row into
|
|
||||||
* pg_dist_shard_placment, after performing some basic checks.
|
|
||||||
*
|
|
||||||
* TODO: This function only exists for csql's \stage, and should not otherwise
|
|
||||||
* be used. Once \stage is removed, it'll be removed too.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
master_stage_shard_placement_row(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
uint64 shardId = PG_GETARG_INT64(0);
|
|
||||||
int32 shardState = PG_GETARG_INT32(1);
|
|
||||||
int32 shardLength = PG_GETARG_INT64(2);
|
|
||||||
char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(3));
|
|
||||||
int nodePort = PG_GETARG_INT32(4);
|
|
||||||
|
|
||||||
Oid distributedRelationId = InvalidOid;
|
|
||||||
Relation relation = NULL;
|
|
||||||
|
|
||||||
Relation pgDistShard = heap_open(DistShardRelationId(), RowExclusiveLock);
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
int scanKeyCount = 1;
|
|
||||||
SysScanDesc scanDescriptor = NULL;
|
|
||||||
HeapTuple heapTuple = NULL;
|
|
||||||
|
|
||||||
/* Lookup which table the shardid belongs to */
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid,
|
|
||||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
|
|
||||||
|
|
||||||
scanDescriptor = systable_beginscan(pgDistShard,
|
|
||||||
DistShardShardidIndexId(), true,
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
if (HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
|
||||||
Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple);
|
|
||||||
distributedRelationId = pgDistShardForm->logicalrelid;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not find valid entry for shard "
|
|
||||||
UINT64_FORMAT, shardId)));
|
|
||||||
}
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
|
|
||||||
relation = heap_open(distributedRelationId, RowExclusiveLock);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check permissions on relation. Note we require ACL_INSERT and not owner
|
|
||||||
* rights - it'd be worse for security to require every user performing
|
|
||||||
* data loads to be made a table owner - besides being more complex to set
|
|
||||||
* up.
|
|
||||||
*/
|
|
||||||
EnsureTablePermissions(distributedRelationId, ACL_INSERT);
|
|
||||||
|
|
||||||
/* finally insert placement */
|
|
||||||
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength,
|
|
||||||
nodeName, nodePort);
|
|
||||||
|
|
||||||
heap_close(relation, NoLock);
|
|
||||||
heap_close(pgDistShard, NoLock);
|
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TableReferenced function checks whether given table is referenced by another table
|
* TableReferenced function checks whether given table is referenced by another table
|
||||||
* via foreign constraints. If it is referenced, this function returns true. To check
|
* via foreign constraints. If it is referenced, this function returns true. To check
|
||||||
|
|
|
@ -77,8 +77,6 @@ PG_FUNCTION_INFO_V1(master_get_table_metadata);
|
||||||
PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
|
PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
|
||||||
PG_FUNCTION_INFO_V1(master_get_new_shardid);
|
PG_FUNCTION_INFO_V1(master_get_new_shardid);
|
||||||
PG_FUNCTION_INFO_V1(master_get_new_placementid);
|
PG_FUNCTION_INFO_V1(master_get_new_placementid);
|
||||||
PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes);
|
|
||||||
PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes);
|
|
||||||
PG_FUNCTION_INFO_V1(master_get_active_worker_nodes);
|
PG_FUNCTION_INFO_V1(master_get_active_worker_nodes);
|
||||||
|
|
||||||
|
|
||||||
|
@ -366,182 +364,6 @@ GetNextPlacementId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* master_get_local_first_candidate_nodes returns a set of candidate host names
|
|
||||||
* and port numbers on which to place new shards. The function makes sure to
|
|
||||||
* always allocate the first candidate node as the node the caller is connecting
|
|
||||||
* from; and allocates additional nodes until the shard replication factor is
|
|
||||||
* met. The function errors if the caller's remote node name is not found in the
|
|
||||||
* membership list, or if the number of available nodes falls short of the
|
|
||||||
* replication factor.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
FuncCallContext *functionContext = NULL;
|
|
||||||
uint32 desiredNodeCount = 0;
|
|
||||||
uint32 currentNodeCount = 0;
|
|
||||||
|
|
||||||
if (SRF_IS_FIRSTCALL())
|
|
||||||
{
|
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
|
||||||
uint32 liveNodeCount = 0;
|
|
||||||
bool hasOid = false;
|
|
||||||
|
|
||||||
/* create a function context for cross-call persistence */
|
|
||||||
functionContext = SRF_FIRSTCALL_INIT();
|
|
||||||
|
|
||||||
/* switch to memory context appropriate for multiple function calls */
|
|
||||||
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
|
||||||
|
|
||||||
functionContext->user_fctx = NIL;
|
|
||||||
functionContext->max_calls = ShardReplicationFactor;
|
|
||||||
|
|
||||||
/* if enough live nodes, return an extra candidate node as backup */
|
|
||||||
liveNodeCount = WorkerGetLiveNodeCount();
|
|
||||||
if (liveNodeCount > ShardReplicationFactor)
|
|
||||||
{
|
|
||||||
functionContext->max_calls = ShardReplicationFactor + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This tuple descriptor must match the output parameters declared for
|
|
||||||
* the function in pg_proc.
|
|
||||||
*/
|
|
||||||
tupleDescriptor = CreateTemplateTupleDesc(CANDIDATE_NODE_FIELDS, hasOid);
|
|
||||||
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "node_name",
|
|
||||||
TEXTOID, -1, 0);
|
|
||||||
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "node_port",
|
|
||||||
INT8OID, -1, 0);
|
|
||||||
|
|
||||||
functionContext->tuple_desc = BlessTupleDesc(tupleDescriptor);
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
functionContext = SRF_PERCALL_SETUP();
|
|
||||||
desiredNodeCount = functionContext->max_calls;
|
|
||||||
currentNodeCount = functionContext->call_cntr;
|
|
||||||
|
|
||||||
if (currentNodeCount < desiredNodeCount)
|
|
||||||
{
|
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
List *currentNodeList = NIL;
|
|
||||||
WorkerNode *candidateNode = NULL;
|
|
||||||
Datum candidateDatum = 0;
|
|
||||||
|
|
||||||
/* switch to memory context appropriate for multiple function calls */
|
|
||||||
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
|
||||||
currentNodeList = functionContext->user_fctx;
|
|
||||||
|
|
||||||
candidateNode = WorkerGetLocalFirstCandidateNode(currentNodeList);
|
|
||||||
if (candidateNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could only find %u of %u required nodes",
|
|
||||||
currentNodeCount, desiredNodeCount)));
|
|
||||||
}
|
|
||||||
|
|
||||||
currentNodeList = lappend(currentNodeList, candidateNode);
|
|
||||||
functionContext->user_fctx = currentNodeList;
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
candidateDatum = WorkerNodeGetDatum(candidateNode, functionContext->tuple_desc);
|
|
||||||
|
|
||||||
SRF_RETURN_NEXT(functionContext, candidateDatum);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
SRF_RETURN_DONE(functionContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* master_get_round_robin_candidate_nodes returns a set of candidate host names
|
|
||||||
* and port numbers on which to place new shards. The function uses the round
|
|
||||||
* robin policy to choose the nodes and tries to ensure that there is an even
|
|
||||||
* distribution of shards across the worker nodes. This function errors out if
|
|
||||||
* the number of available nodes falls short of the replication factor.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
uint64 shardId = PG_GETARG_INT64(0);
|
|
||||||
FuncCallContext *functionContext = NULL;
|
|
||||||
uint32 desiredNodeCount = 0;
|
|
||||||
uint32 currentNodeCount = 0;
|
|
||||||
|
|
||||||
if (SRF_IS_FIRSTCALL())
|
|
||||||
{
|
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
TupleDesc tupleDescriptor = NULL;
|
|
||||||
List *workerNodeList = NIL;
|
|
||||||
TypeFuncClass resultTypeClass = 0;
|
|
||||||
uint32 workerNodeCount = 0;
|
|
||||||
|
|
||||||
/* create a function context for cross-call persistence */
|
|
||||||
functionContext = SRF_FIRSTCALL_INIT();
|
|
||||||
|
|
||||||
/* switch to memory context appropriate for multiple function calls */
|
|
||||||
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
|
|
||||||
|
|
||||||
/* get the worker node list and sort it for determinism */
|
|
||||||
workerNodeList = WorkerNodeList();
|
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
|
||||||
|
|
||||||
functionContext->user_fctx = workerNodeList;
|
|
||||||
functionContext->max_calls = ShardReplicationFactor;
|
|
||||||
|
|
||||||
/* if we enough live nodes, return an extra candidate node as backup */
|
|
||||||
workerNodeCount = (uint32) list_length(workerNodeList);
|
|
||||||
if (workerNodeCount > ShardReplicationFactor)
|
|
||||||
{
|
|
||||||
functionContext->max_calls = ShardReplicationFactor + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* create tuple descriptor for return value */
|
|
||||||
resultTypeClass = get_call_result_type(fcinfo, NULL, &tupleDescriptor);
|
|
||||||
if (resultTypeClass != TYPEFUNC_COMPOSITE)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("return type must be a row type")));
|
|
||||||
}
|
|
||||||
|
|
||||||
functionContext->tuple_desc = tupleDescriptor;
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
functionContext = SRF_PERCALL_SETUP();
|
|
||||||
desiredNodeCount = functionContext->max_calls;
|
|
||||||
currentNodeCount = functionContext->call_cntr;
|
|
||||||
|
|
||||||
if (currentNodeCount < desiredNodeCount)
|
|
||||||
{
|
|
||||||
List *workerNodeList = functionContext->user_fctx;
|
|
||||||
WorkerNode *candidateNode = NULL;
|
|
||||||
Datum candidateDatum = 0;
|
|
||||||
|
|
||||||
candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId,
|
|
||||||
currentNodeCount);
|
|
||||||
if (candidateNode == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could only find %u of %u required nodes",
|
|
||||||
currentNodeCount, desiredNodeCount)));
|
|
||||||
}
|
|
||||||
|
|
||||||
candidateDatum = WorkerNodeGetDatum(candidateNode, functionContext->tuple_desc);
|
|
||||||
|
|
||||||
SRF_RETURN_NEXT(functionContext, candidateDatum);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
SRF_RETURN_DONE(functionContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_get_active_worker_nodes returns a set of active worker host names and
|
* master_get_active_worker_nodes returns a set of active worker host names and
|
||||||
* port numbers in deterministic order. Currently we assume that all worker
|
* port numbers in deterministic order. Currently we assume that all worker
|
||||||
|
|
|
@ -126,8 +126,6 @@ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS);
|
extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_get_new_shardid(PG_FUNCTION_ARGS);
|
extern Datum master_get_new_shardid(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_get_new_placementid(PG_FUNCTION_ARGS);
|
extern Datum master_get_new_placementid(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS);
|
|
||||||
extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS);
|
|
||||||
extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);
|
extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
/* Function declarations to help with data staging and deletion */
|
/* Function declarations to help with data staging and deletion */
|
||||||
|
|
|
@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-15';
|
ALTER EXTENSION citus UPDATE TO '6.1-15';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-16';
|
ALTER EXTENSION citus UPDATE TO '6.1-16';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM pg_depend AS pgd,
|
FROM pg_depend AS pgd,
|
||||||
|
|
|
@ -25,20 +25,6 @@ SELECT * FROM master_get_new_shardid();
|
||||||
740000
|
740000
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM master_get_round_robin_candidate_nodes(1);
|
|
||||||
node_name | node_port
|
|
||||||
-----------+-----------
|
|
||||||
localhost | 57638
|
|
||||||
localhost | 57637
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
SELECT * FROM master_get_round_robin_candidate_nodes(2);
|
|
||||||
node_name | node_port
|
|
||||||
-----------+-----------
|
|
||||||
localhost | 57637
|
|
||||||
localhost | 57638
|
|
||||||
(2 rows)
|
|
||||||
|
|
||||||
SELECT * FROM master_get_active_worker_nodes();
|
SELECT * FROM master_get_active_worker_nodes();
|
||||||
node_name | node_port
|
node_name | node_port
|
||||||
-----------+-----------
|
-----------+-----------
|
||||||
|
|
|
@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-15';
|
ALTER EXTENSION citus UPDATE TO '6.1-15';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-16';
|
ALTER EXTENSION citus UPDATE TO '6.1-16';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
ALTER EXTENSION citus UPDATE TO '6.1-17';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.2-1';
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
|
|
|
@ -15,8 +15,4 @@ SELECT * FROM master_get_table_ddl_events('lineitem');
|
||||||
|
|
||||||
SELECT * FROM master_get_new_shardid();
|
SELECT * FROM master_get_new_shardid();
|
||||||
|
|
||||||
SELECT * FROM master_get_round_robin_candidate_nodes(1);
|
|
||||||
|
|
||||||
SELECT * FROM master_get_round_robin_candidate_nodes(2);
|
|
||||||
|
|
||||||
SELECT * FROM master_get_active_worker_nodes();
|
SELECT * FROM master_get_active_worker_nodes();
|
||||||
|
|
Loading…
Reference in New Issue