diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 541358ca5..c0dd16d94 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,8 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ + 6.2-1 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -129,6 +130,8 @@ $(EXTENSION)--6.1-16.sql: $(EXTENSION)--6.1-15.sql $(EXTENSION)--6.1-15--6.1-16. cat $^ > $@ $(EXTENSION)--6.1-17.sql: $(EXTENSION)--6.1-16.sql $(EXTENSION)--6.1-16--6.1-17.sql cat $^ > $@ +$(EXTENSION)--6.2-1.sql: $(EXTENSION)--6.1-17.sql $(EXTENSION)--6.1-17--6.2-1.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.0--5.0-1.sql b/src/backend/distributed/citus--5.0--5.0-1.sql index d1900078e..298510a21 100644 --- a/src/backend/distributed/citus--5.0--5.0-1.sql +++ b/src/backend/distributed/citus--5.0--5.0-1.sql @@ -1,29 +1,5 @@ /* citus--5.0--5.0-1.sql */ -CREATE FUNCTION pg_catalog.master_stage_shard_row(logicalrelid oid, - shardid bigint, - shardstorage "char", - shardminvalue text, - shardmaxvalue text) - RETURNS VOID - LANGUAGE C - AS 'MODULE_PATHNAME', $$master_stage_shard_row$$; -COMMENT ON FUNCTION pg_catalog.master_stage_shard_row(oid, bigint, "char", text, text) - IS 'deprecated function to insert a row into pg_dist_shard'; - -CREATE FUNCTION pg_catalog.master_stage_shard_placement_row(shardid int8, - shardstate int4, - shardlength int8, - nodename text, - nodeport int4) - RETURNS VOID - STRICT - LANGUAGE C - AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$; -COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4) - IS 'deprecated function to insert a row into pg_dist_shard_placement'; - - ALTER FUNCTION pg_catalog.citus_drop_trigger() SECURITY DEFINER; GRANT SELECT ON pg_catalog.pg_dist_partition TO public; diff --git a/src/backend/distributed/citus--6.1-17--6.2-1.sql b/src/backend/distributed/citus--6.1-17--6.2-1.sql new file mode 100644 index 000000000..3508f3fe0 --- /dev/null +++ b/src/backend/distributed/citus--6.1-17--6.2-1.sql @@ -0,0 +1,11 @@ +/* citus--6.1-17--6.2-1.sql */ + +SET search_path = 'pg_catalog'; + +DROP FUNCTION IF EXISTS master_get_local_first_candidate_nodes(); +DROP FUNCTION IF EXISTS master_get_round_robin_candidate_nodes(); + +DROP FUNCTION IF EXISTS master_stage_shard_row(); +DROP FUNCTION IF EXISTS master_stage_shard_placement_row(); + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 57d2534ec..daccbdeb5 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-17' +default_version = '6.2-1' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/citus.sql b/src/backend/distributed/citus.sql index b1b213b6f..b068a3bae 100644 --- a/src/backend/distributed/citus.sql +++ b/src/backend/distributed/citus.sql @@ -129,14 +129,6 @@ CREATE FUNCTION master_get_new_shardid() COMMENT ON FUNCTION master_get_new_shardid() IS 'fetch unique shardId'; -CREATE FUNCTION master_get_local_first_candidate_nodes(OUT node_name text, - OUT node_port bigint) - RETURNS SETOF record - LANGUAGE C STRICT ROWS 100 - AS 'MODULE_PATHNAME', $$master_get_local_first_candidate_nodes$$; -COMMENT ON FUNCTION master_get_local_first_candidate_nodes() - IS 'fetch set of candidate nodes for shard uploading choosing the local node first'; - CREATE FUNCTION master_create_empty_shard(text) RETURNS bigint LANGUAGE C STRICT @@ -174,15 +166,6 @@ CREATE FUNCTION master_get_active_worker_nodes(OUT node_name text, OUT node_port COMMENT ON FUNCTION master_get_active_worker_nodes() IS 'fetch set of active worker nodes'; -CREATE FUNCTION master_get_round_robin_candidate_nodes(shard_id bigint, - OUT node_name text, - OUT node_port bigint) - RETURNS SETOF record - LANGUAGE C STRICT ROWS 100 - AS 'MODULE_PATHNAME', $$master_get_round_robin_candidate_nodes$$; -COMMENT ON FUNCTION master_get_round_robin_candidate_nodes(shard_id bigint) - IS 'fetch set of candidate nodes for shard uploading in round-robin manner'; - CREATE FUNCTION master_create_distributed_table(table_name regclass, distribution_column text, distribution_method citus.distribution_type) diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 0e435993f..1ee0bf208 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -59,11 +59,6 @@ static ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, HeapTuple heapTuple); -/* exports for SQL callable functions */ -PG_FUNCTION_INFO_V1(master_stage_shard_row); -PG_FUNCTION_INFO_V1(master_stage_shard_placement_row); - - /* * TableShardReplicationFactor returns the current replication factor of the * given relation by looking into shard placements. It errors out if there @@ -1020,143 +1015,6 @@ TableOwner(Oid relationId) } -/* - * master_stage_shard_row() inserts a row into pg_dist_shard, after performing - * basic permission checks. - * - * TODO: This function only exists for csql's \stage, and should not otherwise - * be used. Once \stage is removed, it'll be removed too. - */ -Datum -master_stage_shard_row(PG_FUNCTION_ARGS) -{ - Oid distributedRelationId = InvalidOid; - uint64 shardId = 0; - char storageType = 0; - text *shardMinValue = NULL; - text *shardMaxValue = NULL; - Relation relation; - - /* - * Have to check arguments for NULLness as it can't be declared STRICT - * because of min/max arguments, which have to be NULLable for new shards. - */ - if (PG_ARGISNULL(0)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("relation cannot be null"))); - } - else if (PG_ARGISNULL(1)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("shard cannot be null"))); - } - else if (PG_ARGISNULL(2)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("storage type cannot be null"))); - } - - distributedRelationId = PG_GETARG_OID(0); - shardId = PG_GETARG_INT64(1); - storageType = PG_GETARG_CHAR(2); - - if (!PG_ARGISNULL(3)) - { - shardMinValue = PG_GETARG_TEXT_P(3); - } - - if (!PG_ARGISNULL(4)) - { - shardMaxValue = PG_GETARG_TEXT_P(4); - } - - relation = heap_open(distributedRelationId, RowExclusiveLock); - - /* - * Check permissions on relation. Note we require ACL_INSERT and not owner - * rights - it'd be worse for security to require every user performing - * data loads to be made a table owner - besides being more complex to set - * up. - */ - EnsureTablePermissions(distributedRelationId, ACL_INSERT); - - /* and finally actually insert the row */ - InsertShardRow(distributedRelationId, shardId, storageType, - shardMinValue, shardMaxValue); - - heap_close(relation, NoLock); - - PG_RETURN_VOID(); -} - - -/* - * master_stage_shard_placement_row() inserts a row into - * pg_dist_shard_placment, after performing some basic checks. - * - * TODO: This function only exists for csql's \stage, and should not otherwise - * be used. Once \stage is removed, it'll be removed too. - */ -Datum -master_stage_shard_placement_row(PG_FUNCTION_ARGS) -{ - uint64 shardId = PG_GETARG_INT64(0); - int32 shardState = PG_GETARG_INT32(1); - int32 shardLength = PG_GETARG_INT64(2); - char *nodeName = text_to_cstring(PG_GETARG_TEXT_P(3)); - int nodePort = PG_GETARG_INT32(4); - - Oid distributedRelationId = InvalidOid; - Relation relation = NULL; - - Relation pgDistShard = heap_open(DistShardRelationId(), RowExclusiveLock); - ScanKeyData scanKey[1]; - int scanKeyCount = 1; - SysScanDesc scanDescriptor = NULL; - HeapTuple heapTuple = NULL; - - /* Lookup which table the shardid belongs to */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, - BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); - - scanDescriptor = systable_beginscan(pgDistShard, - DistShardShardidIndexId(), true, - NULL, scanKeyCount, scanKey); - heapTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(heapTuple)) - { - Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); - distributedRelationId = pgDistShardForm->logicalrelid; - } - else - { - ereport(ERROR, (errmsg("could not find valid entry for shard " - UINT64_FORMAT, shardId))); - } - systable_endscan(scanDescriptor); - - relation = heap_open(distributedRelationId, RowExclusiveLock); - - /* - * Check permissions on relation. Note we require ACL_INSERT and not owner - * rights - it'd be worse for security to require every user performing - * data loads to be made a table owner - besides being more complex to set - * up. - */ - EnsureTablePermissions(distributedRelationId, ACL_INSERT); - - /* finally insert placement */ - InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength, - nodeName, nodePort); - - heap_close(relation, NoLock); - heap_close(pgDistShard, NoLock); - - PG_RETURN_VOID(); -} - - /* * TableReferenced function checks whether given table is referenced by another table * via foreign constraints. If it is referenced, this function returns true. To check diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 46610c407..7784f1ead 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -77,8 +77,6 @@ PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_new_shardid); PG_FUNCTION_INFO_V1(master_get_new_placementid); -PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes); -PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_active_worker_nodes); @@ -366,182 +364,6 @@ GetNextPlacementId(void) } -/* - * master_get_local_first_candidate_nodes returns a set of candidate host names - * and port numbers on which to place new shards. The function makes sure to - * always allocate the first candidate node as the node the caller is connecting - * from; and allocates additional nodes until the shard replication factor is - * met. The function errors if the caller's remote node name is not found in the - * membership list, or if the number of available nodes falls short of the - * replication factor. - */ -Datum -master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS) -{ - FuncCallContext *functionContext = NULL; - uint32 desiredNodeCount = 0; - uint32 currentNodeCount = 0; - - if (SRF_IS_FIRSTCALL()) - { - MemoryContext oldContext = NULL; - TupleDesc tupleDescriptor = NULL; - uint32 liveNodeCount = 0; - bool hasOid = false; - - /* create a function context for cross-call persistence */ - functionContext = SRF_FIRSTCALL_INIT(); - - /* switch to memory context appropriate for multiple function calls */ - oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - - functionContext->user_fctx = NIL; - functionContext->max_calls = ShardReplicationFactor; - - /* if enough live nodes, return an extra candidate node as backup */ - liveNodeCount = WorkerGetLiveNodeCount(); - if (liveNodeCount > ShardReplicationFactor) - { - functionContext->max_calls = ShardReplicationFactor + 1; - } - - /* - * This tuple descriptor must match the output parameters declared for - * the function in pg_proc. - */ - tupleDescriptor = CreateTemplateTupleDesc(CANDIDATE_NODE_FIELDS, hasOid); - TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "node_name", - TEXTOID, -1, 0); - TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "node_port", - INT8OID, -1, 0); - - functionContext->tuple_desc = BlessTupleDesc(tupleDescriptor); - - MemoryContextSwitchTo(oldContext); - } - - functionContext = SRF_PERCALL_SETUP(); - desiredNodeCount = functionContext->max_calls; - currentNodeCount = functionContext->call_cntr; - - if (currentNodeCount < desiredNodeCount) - { - MemoryContext oldContext = NULL; - List *currentNodeList = NIL; - WorkerNode *candidateNode = NULL; - Datum candidateDatum = 0; - - /* switch to memory context appropriate for multiple function calls */ - oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - currentNodeList = functionContext->user_fctx; - - candidateNode = WorkerGetLocalFirstCandidateNode(currentNodeList); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could only find %u of %u required nodes", - currentNodeCount, desiredNodeCount))); - } - - currentNodeList = lappend(currentNodeList, candidateNode); - functionContext->user_fctx = currentNodeList; - - MemoryContextSwitchTo(oldContext); - - candidateDatum = WorkerNodeGetDatum(candidateNode, functionContext->tuple_desc); - - SRF_RETURN_NEXT(functionContext, candidateDatum); - } - else - { - SRF_RETURN_DONE(functionContext); - } -} - - -/* - * master_get_round_robin_candidate_nodes returns a set of candidate host names - * and port numbers on which to place new shards. The function uses the round - * robin policy to choose the nodes and tries to ensure that there is an even - * distribution of shards across the worker nodes. This function errors out if - * the number of available nodes falls short of the replication factor. - */ -Datum -master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS) -{ - uint64 shardId = PG_GETARG_INT64(0); - FuncCallContext *functionContext = NULL; - uint32 desiredNodeCount = 0; - uint32 currentNodeCount = 0; - - if (SRF_IS_FIRSTCALL()) - { - MemoryContext oldContext = NULL; - TupleDesc tupleDescriptor = NULL; - List *workerNodeList = NIL; - TypeFuncClass resultTypeClass = 0; - uint32 workerNodeCount = 0; - - /* create a function context for cross-call persistence */ - functionContext = SRF_FIRSTCALL_INIT(); - - /* switch to memory context appropriate for multiple function calls */ - oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); - - /* get the worker node list and sort it for determinism */ - workerNodeList = WorkerNodeList(); - workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - - functionContext->user_fctx = workerNodeList; - functionContext->max_calls = ShardReplicationFactor; - - /* if we enough live nodes, return an extra candidate node as backup */ - workerNodeCount = (uint32) list_length(workerNodeList); - if (workerNodeCount > ShardReplicationFactor) - { - functionContext->max_calls = ShardReplicationFactor + 1; - } - - /* create tuple descriptor for return value */ - resultTypeClass = get_call_result_type(fcinfo, NULL, &tupleDescriptor); - if (resultTypeClass != TYPEFUNC_COMPOSITE) - { - ereport(ERROR, (errmsg("return type must be a row type"))); - } - - functionContext->tuple_desc = tupleDescriptor; - - MemoryContextSwitchTo(oldContext); - } - - functionContext = SRF_PERCALL_SETUP(); - desiredNodeCount = functionContext->max_calls; - currentNodeCount = functionContext->call_cntr; - - if (currentNodeCount < desiredNodeCount) - { - List *workerNodeList = functionContext->user_fctx; - WorkerNode *candidateNode = NULL; - Datum candidateDatum = 0; - - candidateNode = WorkerGetRoundRobinCandidateNode(workerNodeList, shardId, - currentNodeCount); - if (candidateNode == NULL) - { - ereport(ERROR, (errmsg("could only find %u of %u required nodes", - currentNodeCount, desiredNodeCount))); - } - - candidateDatum = WorkerNodeGetDatum(candidateNode, functionContext->tuple_desc); - - SRF_RETURN_NEXT(functionContext, candidateDatum); - } - else - { - SRF_RETURN_DONE(functionContext); - } -} - - /* * master_get_active_worker_nodes returns a set of active worker host names and * port numbers in deterministic order. Currently we assume that all worker diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 99dbc8aea..4e6819580 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -126,8 +126,6 @@ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS); extern Datum master_get_new_shardid(PG_FUNCTION_ARGS); extern Datum master_get_new_placementid(PG_FUNCTION_ARGS); -extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS); -extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS); /* Function declarations to help with data staging and deletion */ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 294f70af7..4a52854aa 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.2-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_master_protocol.out b/src/test/regress/expected/multi_master_protocol.out index b02eba0e8..4a164c49d 100644 --- a/src/test/regress/expected/multi_master_protocol.out +++ b/src/test/regress/expected/multi_master_protocol.out @@ -25,20 +25,6 @@ SELECT * FROM master_get_new_shardid(); 740000 (1 row) -SELECT * FROM master_get_round_robin_candidate_nodes(1); - node_name | node_port ------------+----------- - localhost | 57638 - localhost | 57637 -(2 rows) - -SELECT * FROM master_get_round_robin_candidate_nodes(2); - node_name | node_port ------------+----------- - localhost | 57637 - localhost | 57638 -(2 rows) - SELECT * FROM master_get_active_worker_nodes(); node_name | node_port -----------+----------- diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 647cedde4..da41a523a 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -75,6 +75,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-14'; ALTER EXTENSION citus UPDATE TO '6.1-15'; ALTER EXTENSION citus UPDATE TO '6.1-16'; ALTER EXTENSION citus UPDATE TO '6.1-17'; +ALTER EXTENSION citus UPDATE TO '6.2-1'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_master_protocol.sql b/src/test/regress/sql/multi_master_protocol.sql index 982a9c8b8..108453d89 100644 --- a/src/test/regress/sql/multi_master_protocol.sql +++ b/src/test/regress/sql/multi_master_protocol.sql @@ -15,8 +15,4 @@ SELECT * FROM master_get_table_ddl_events('lineitem'); SELECT * FROM master_get_new_shardid(); -SELECT * FROM master_get_round_robin_candidate_nodes(1); - -SELECT * FROM master_get_round_robin_candidate_nodes(2); - SELECT * FROM master_get_active_worker_nodes();