mirror of https://github.com/citusdata/citus.git
Define Some Utility Functions
This change declares two new functions: `master_update_table_statistics` updates the statistics of shards belong to the given table as well as its colocated tables. `get_colocated_shard_array` returns the ids of colocated shards of a given shard.pull/1545/head
parent
1961add6f9
commit
3061737712
|
@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
|
||||
6.2-1 6.2-2 6.2-3 6.2-4 \
|
||||
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10
|
||||
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -159,6 +159,8 @@ $(EXTENSION)--7.0-9.sql: $(EXTENSION)--7.0-8.sql $(EXTENSION)--7.0-8--7.0-9.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-10.sql: $(EXTENSION)--7.0-9.sql $(EXTENSION)--7.0-9--7.0-10.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--7.0-11.sql: $(EXTENSION)--7.0-10.sql $(EXTENSION)--7.0-10--7.0-11.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/* citus-7.0-10--7.0-11 */
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
CREATE OR REPLACE FUNCTION master_update_table_statistics(relation regclass)
|
||||
RETURNS VOID AS $$
|
||||
DECLARE
|
||||
colocated_tables regclass[];
|
||||
BEGIN
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;
|
||||
|
||||
PERFORM
|
||||
master_update_shard_statistics(shardid)
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = ANY (colocated_tables);
|
||||
END;
|
||||
$$ LANGUAGE 'plpgsql';
|
||||
COMMENT ON FUNCTION master_update_table_statistics(regclass)
|
||||
IS 'updates shard statistics of the given table and its colocated tables';
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_colocated_shard_array(bigint)
|
||||
RETURNS BIGINT[]
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus', $$get_colocated_shard_array$$;
|
||||
COMMENT ON FUNCTION get_colocated_shard_array(bigint)
|
||||
IS 'returns the array of colocated shards of the given shard';
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '7.0-10'
|
||||
default_version = '7.0-11'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
|
||||
|
@ -24,7 +25,6 @@ PG_FUNCTION_INFO_V1(get_table_colocation_id);
|
|||
PG_FUNCTION_INFO_V1(tables_colocated);
|
||||
PG_FUNCTION_INFO_V1(shards_colocated);
|
||||
PG_FUNCTION_INFO_V1(get_colocated_table_array);
|
||||
PG_FUNCTION_INFO_V1(get_colocated_shard_array);
|
||||
PG_FUNCTION_INFO_V1(find_shard_interval_index);
|
||||
|
||||
|
||||
|
@ -108,43 +108,6 @@ get_colocated_table_array(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_colocated_shards_array returns array of shards ids which are co-located with given
|
||||
* shard.
|
||||
*/
|
||||
Datum
|
||||
get_colocated_shard_array(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint32 shardId = PG_GETARG_UINT32(0);
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
|
||||
ArrayType *colocatedShardsArrayType = NULL;
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
int colocatedShardCount = list_length(colocatedShardList);
|
||||
Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum));
|
||||
Oid arrayTypeId = OIDOID;
|
||||
int colocatedShardIndex = 0;
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShardInterval = (ShardInterval *) lfirst(
|
||||
colocatedShardCell);
|
||||
uint64 colocatedShardId = colocatedShardInterval->shardId;
|
||||
|
||||
Datum colocatedShardDatum = Int64GetDatum(colocatedShardId);
|
||||
|
||||
colocatedShardsDatumArray[colocatedShardIndex] = colocatedShardDatum;
|
||||
colocatedShardIndex++;
|
||||
}
|
||||
|
||||
colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray,
|
||||
colocatedShardCount, arrayTypeId);
|
||||
|
||||
PG_RETURN_ARRAYTYPE_P(colocatedShardsArrayType);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* find_shard_interval_index finds index of given shard in sorted shard interval list.
|
||||
*/
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include <stddef.h>
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
#include "lib/stringinfo.h"
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "fmgr.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
#include "utils/array.h"
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
#include "access/stratnum.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
|
|
|
@ -1,39 +0,0 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/test_helper_functions.c
|
||||
*
|
||||
* This file contains helper functions used in many Citus tests.
|
||||
*
|
||||
* Copyright (c) 2014-2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "c.h"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
#include "utils/array.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
|
||||
/*
|
||||
* DatumArrayToArrayType converts the provided Datum array (of the specified
|
||||
* length and type) into an ArrayType suitable for returning from a UDF.
|
||||
*/
|
||||
ArrayType *
|
||||
DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
|
||||
{
|
||||
ArrayType *arrayObject = NULL;
|
||||
int16 typeLength = 0;
|
||||
bool typeByValue = false;
|
||||
char typeAlignment = 0;
|
||||
|
||||
get_typlenbyvalalign(datumTypeId, &typeLength, &typeByValue, &typeAlignment);
|
||||
arrayObject = construct_array(datumArray, datumCount, datumTypeId,
|
||||
typeLength, typeByValue, typeAlignment);
|
||||
|
||||
return arrayObject;
|
||||
}
|
|
@ -17,6 +17,7 @@
|
|||
#include "access/htup_details.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/listutils.h"
|
||||
|
@ -52,6 +53,7 @@ static void DeleteColocationGroup(uint32 colocationId);
|
|||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(mark_tables_colocated);
|
||||
PG_FUNCTION_INFO_V1(get_colocated_shard_array);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -91,6 +93,43 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_colocated_shards_array returns array of shards ids which are co-located with given
|
||||
* shard.
|
||||
*/
|
||||
Datum
|
||||
get_colocated_shard_array(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint32 shardId = PG_GETARG_UINT32(0);
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
|
||||
ArrayType *colocatedShardsArrayType = NULL;
|
||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||
ListCell *colocatedShardCell = NULL;
|
||||
int colocatedShardCount = list_length(colocatedShardList);
|
||||
Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum));
|
||||
Oid arrayTypeId = OIDOID;
|
||||
int colocatedShardIndex = 0;
|
||||
|
||||
foreach(colocatedShardCell, colocatedShardList)
|
||||
{
|
||||
ShardInterval *colocatedShardInterval = (ShardInterval *) lfirst(
|
||||
colocatedShardCell);
|
||||
uint64 colocatedShardId = colocatedShardInterval->shardId;
|
||||
|
||||
Datum colocatedShardDatum = Int64GetDatum(colocatedShardId);
|
||||
|
||||
colocatedShardsDatumArray[colocatedShardIndex] = colocatedShardDatum;
|
||||
colocatedShardIndex++;
|
||||
}
|
||||
|
||||
colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray,
|
||||
colocatedShardCount, arrayTypeId);
|
||||
|
||||
PG_RETURN_ARRAYTYPE_P(colocatedShardsArrayType);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkTablesColocated puts both tables to same colocation group. If the
|
||||
* source table is in INVALID_COLOCATION_ID group, then it creates a new
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include "c.h"
|
||||
#include "port.h"
|
||||
|
||||
#include "utils/lsyscache.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "utils/memutils.h"
|
||||
|
@ -81,3 +82,23 @@ PointerArrayFromList(List *pointerList)
|
|||
|
||||
return pointerArray;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DatumArrayToArrayType converts the provided Datum array (of the specified
|
||||
* length and type) into an ArrayType suitable for returning from a UDF.
|
||||
*/
|
||||
ArrayType *
|
||||
DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId)
|
||||
{
|
||||
ArrayType *arrayObject = NULL;
|
||||
int16 typeLength = 0;
|
||||
bool typeByValue = false;
|
||||
char typeAlignment = 0;
|
||||
|
||||
get_typlenbyvalalign(datumTypeId, &typeLength, &typeByValue, &typeAlignment);
|
||||
arrayObject = construct_array(datumArray, datumCount, datumTypeId,
|
||||
typeLength, typeByValue, typeAlignment);
|
||||
|
||||
return arrayObject;
|
||||
}
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
#include "c.h"
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
#include "utils/array.h"
|
||||
|
||||
|
||||
/* utility functions declaration shared within this module */
|
||||
extern List * SortList(List *pointerList,
|
||||
int (*ComparisonFunction)(const void *, const void *));
|
||||
extern void ** PointerArrayFromList(List *pointerList);
|
||||
|
||||
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
||||
Oid datumTypeId);
|
||||
|
||||
#endif /* CITUS_LISTUTILS_H */
|
||||
|
|
|
@ -28,8 +28,6 @@
|
|||
|
||||
|
||||
/* function declarations for generic test functions */
|
||||
extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
||||
Oid datumTypeId);
|
||||
extern void SetConnectionStatus(PGconn *connection, ConnStatusType status);
|
||||
|
||||
/* fake FDW for use in tests */
|
||||
|
|
|
@ -48,10 +48,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint)
|
|||
RETURNS bool
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
CREATE FUNCTION get_colocated_shard_array(bigint)
|
||||
RETURNS BIGINT[]
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
CREATE FUNCTION find_shard_interval_index(bigint)
|
||||
RETURNS int
|
||||
AS 'citus'
|
||||
|
|
|
@ -120,6 +120,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-7';
|
|||
ALTER EXTENSION citus UPDATE TO '7.0-8';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-9';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-10';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-11';
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
citus.version
|
||||
|
|
|
@ -57,11 +57,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint)
|
|||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
CREATE FUNCTION get_colocated_shard_array(bigint)
|
||||
RETURNS BIGINT[]
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
CREATE FUNCTION find_shard_interval_index(bigint)
|
||||
RETURNS int
|
||||
AS 'citus'
|
||||
|
|
|
@ -120,6 +120,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-7';
|
|||
ALTER EXTENSION citus UPDATE TO '7.0-8';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-9';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-10';
|
||||
ALTER EXTENSION citus UPDATE TO '7.0-11';
|
||||
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
|
|
Loading…
Reference in New Issue