Merge branch 'main' into speed-up-GetForeignKeyOids

pull/7578/head
Jelte Fennema-Nio 2024-04-15 17:55:05 +02:00 committed by GitHub
commit 6db3cc727d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 578 additions and 75 deletions

View File

@ -22,6 +22,7 @@
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_extension.h"
@ -50,6 +51,7 @@
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -1696,52 +1698,39 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
void
EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId);
Oid attrDefOid;
List *attrDefOids = GetAttrDefsFromSequence(seqOid);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
foreach_oid(attrDefOid, attrDefOids)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match.
*
* We skip non-distributed tables, but we need to check the current
* table as it might reference the same sequence multiple times.
*/
if (columnAddress.objectId != ownerRelationId &&
!IsCitusTable(columnAddress.objectId))
{
AttrNumber currentAttnum = seqInfo->attributeNumber;
Oid currentSeqOid = seqInfo->sequenceOid;
if (!seqInfo->isNextValDefault)
{
/*
* If a sequence is not on the nextval, we don't need any check.
* This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col
*/
continue;
}
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentAttributeTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (attributeTypeId != currentAttributeTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
continue;
}
Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId,
columnAddress.objectSubId);
if (attributeTypeId != currentAttributeTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(columnAddress.objectId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, columnAddress.objectSubId,
citusTableName)));
}
}
}

View File

@ -1093,33 +1093,26 @@ List *
GetDependentFDWsToExtension(Oid extensionId)
{
List *extensionFDWs = NIL;
ScanKeyData key[3];
int scanKeyCount = 3;
ScanKeyData key[1];
HeapTuple tup;
Relation pgDepend = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(ExtensionRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(extensionId));
ScanKeyInit(&key[2],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(ForeignDataWrapperRelationId));
SysScanDesc scan = systable_beginscan(pgDepend, InvalidOid, false,
NULL, scanKeyCount, key);
SysScanDesc scan = systable_beginscan(pgDepend, DependDependerIndexId, true,
NULL, lengthof(key), key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend pgDependEntry = (Form_pg_depend) GETSTRUCT(tup);
if (pgDependEntry->deptype == DEPENDENCY_EXTENSION)
if (pgDependEntry->deptype == DEPENDENCY_EXTENSION &&
pgDependEntry->refclassid == ExtensionRelationId &&
pgDependEntry->refobjid == extensionId)
{
extensionFDWs = lappend_oid(extensionFDWs, pgDependEntry->objid);
}

View File

@ -14,6 +14,7 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/pg_attrdef.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "nodes/makefuncs.h"
@ -507,22 +508,14 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
Oid relationId;
List *relations = GetDependentRelationsWithSequence(sequenceAddress->objectId,
depType);
foreach_oid(relationId, relations)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
if (IsCitusTable(relationId))
{
/*
* This sequence is used in a distributed table
*/
if (seqInfo->sequenceOid == sequenceAddress->objectId)
{
return citusTableId;
}
return relationId;
}
}

View File

@ -271,9 +271,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
* We allocate everything in the provided context so as to facilitate using
* pfree on all runtime parameters when connections using these entries are
* invalidated during config reloads.
*
* Also, when "host" is already provided in global parameters, we use hostname
* from the key as "hostaddr" instead of "host" to avoid host name lookup. In
* that case, the value for "host" becomes useful only if the authentication
* method requires it.
*/
bool gotHostParamFromGlobalParams = false;
for (Size paramIndex = 0; paramIndex < ConnParams.size; paramIndex++)
{
if (strcmp(ConnParams.keywords[paramIndex], "host") == 0)
{
gotHostParamFromGlobalParams = true;
break;
}
}
const char *runtimeKeywords[] = {
"host",
gotHostParamFromGlobalParams ? "hostaddr" : "host",
"port",
"dbname",
"user",

View File

@ -1637,6 +1637,74 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
}
/*
* GetDependentDependentRelationsWithSequence returns a list of oids of
* relations that have have a dependency on the given sequence.
* There are three types of dependencies:
* 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL
* 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..')
* 3. internal, created using GENERATED ALWAYS AS IDENTITY
*
* Depending on the passed deptype, we return the relations that have the
* given type(s):
* - DEPENDENCY_AUTO returns both 1 and 2
* - DEPENDENCY_INTERNAL returns 3
*
* The returned list can contain duplicates, as the same relation can have
* multiple dependencies on the sequence.
*/
List *
GetDependentRelationsWithSequence(Oid sequenceOid, char depType)
{
List *relations = NIL;
ScanKeyData key[2];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(sequenceOid));
SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
NULL, lengthof(key), key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (
deprec->refclassid == RelationRelationId &&
deprec->refobjsubid != 0 &&
deprec->deptype == depType)
{
relations = lappend_oid(relations, deprec->refobjid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
if (depType == DEPENDENCY_AUTO)
{
Oid attrDefOid;
List *attrDefOids = GetAttrDefsFromSequence(sequenceOid);
foreach_oid(attrDefOid, attrDefOids)
{
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
relations = lappend_oid(relations, columnAddress.objectId);
}
}
return relations;
}
/*
* GetSequencesFromAttrDef returns a list of sequence OIDs that have
* dependency with the given attrdefOid in pg_depend
@ -1682,6 +1750,90 @@ GetSequencesFromAttrDef(Oid attrdefOid)
}
#if PG_VERSION_NUM < PG_VERSION_15
/*
* Given a pg_attrdef OID, return the relation OID and column number of
* the owning column (represented as an ObjectAddress for convenience).
*
* Returns InvalidObjectAddress if there is no such pg_attrdef entry.
*/
ObjectAddress
GetAttrDefaultColumnAddress(Oid attrdefoid)
{
ObjectAddress result = InvalidObjectAddress;
ScanKeyData skey[1];
HeapTuple tup;
Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_attrdef_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(attrdefoid));
SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true,
NULL, 1, skey);
if (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup);
result.classId = RelationRelationId;
result.objectId = atdform->adrelid;
result.objectSubId = atdform->adnum;
}
systable_endscan(scan);
table_close(attrdef, AccessShareLock);
return result;
}
#endif
/*
* GetAttrDefsFromSequence returns a list of attrdef OIDs that have
* a dependency on the given sequence
*/
List *
GetAttrDefsFromSequence(Oid seqOid)
{
List *attrDefsResult = NIL;
ScanKeyData key[2];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(seqOid));
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, lengthof(key), key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId &&
deprec->deptype == DEPENDENCY_NORMAL)
{
attrDefsResult = lappend_oid(attrDefsResult, deprec->objid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return attrDefsResult;
}
/*
* GetDependentFunctionsWithRelation returns the dependent functions for the
* given relation id.

View File

@ -2929,6 +2929,7 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
#if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
"gsslib",
#endif
"host",
"keepalives",
"keepalives_count",
"keepalives_idle",

View File

@ -54,3 +54,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_internal_update_placement_metadata/12.2-1.sql"
#include "udfs/citus_internal_update_relation_colocation/12.2-1.sql"
#include "udfs/repl_origin_helper/12.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/12.2-1.sql"

View File

@ -54,3 +54,4 @@ DROP FUNCTION citus_internal.update_relation_colocation(oid, int);
DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"

View File

@ -0,0 +1,227 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
DECLARE
table_name regclass;
command text;
trigger_name text;
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
ELSE
EXECUTE $cmd$
SET citus.enable_ddl_propagation TO OFF;
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
RESET citus.enable_ddl_propagation;
$cmd$;
END IF;
--
-- Citus creates the array_cat_agg but because of a compatibility
-- issue between pg13-pg14, we drop and create it during upgrade.
-- And as Citus creates it, there needs to be a dependency to the
-- Citus extension, so we create that dependency here.
-- We are not using:
-- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg
-- because we don't have an easy way to check if the aggregate
-- exists with anyarray type or anycompatiblearray type.
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
-- PG16 has its own any_value, so only create it pre PG16.
-- We can remove this part when we drop support for PG16
IF substring(current_Setting('server_version'), '\d+')::int < 16 THEN
EXECUTE $cmd$
-- disable propagation to prevent EnsureCoordinator errors
-- the aggregate created here does not depend on Citus extension (yet)
-- since we add the dependency with the next command
SET citus.enable_ddl_propagation TO OFF;
CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement )
RETURNS anyelement AS $$
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
$$ LANGUAGE SQL STABLE;
CREATE AGGREGATE pg_catalog.any_value (
sfunc = pg_catalog.any_value_agg,
combinefunc = pg_catalog.any_value_agg,
basetype = anyelement,
stype = anyelement
);
COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS
'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.';
RESET citus.enable_ddl_propagation;
--
-- Citus creates the any_value aggregate but because of a compatibility
-- issue between pg15-pg16 -- any_value is created in PG16, we drop
-- and create it during upgrade IF upgraded version is less than 16.
-- And as Citus creates it, there needs to be a dependency to the
-- Citus extension, so we create that dependency here.
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'any_value_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'any_value') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
$cmd$;
END IF;
--
-- restore citus catalog tables
--
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
-- if we are upgrading from PG14/PG15 to PG16+,
-- we need to regenerate the partkeys because they will include varnullingrels as well.
UPDATE pg_catalog.pg_dist_partition
SET partkey = column_name_to_column(pg_dist_partkeys_pre_16_upgrade.logicalrelid, col_name)
FROM public.pg_dist_partkeys_pre_16_upgrade
WHERE pg_dist_partkeys_pre_16_upgrade.logicalrelid = pg_dist_partition.logicalrelid;
DROP TABLE public.pg_dist_partkeys_pre_16_upgrade;
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup;
INSERT INTO pg_catalog.pg_dist_schema SELECT schemaname::regnamespace, colocationid FROM public.pg_dist_schema;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
-- Temporarily disable trigger to check for validity of functions while
-- inserting. The current contents of the table might be invalid if one of
-- the functions was removed by the user without also removing the
-- rebalance strategy. Obviously that's not great, but it should be no
-- reason to fail the upgrade.
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
--
-- drop backup tables
--
DROP TABLE public.pg_dist_authinfo;
DROP TABLE public.pg_dist_colocation;
DROP TABLE public.pg_dist_local_group;
DROP TABLE public.pg_dist_node;
DROP TABLE public.pg_dist_node_metadata;
DROP TABLE public.pg_dist_partition;
DROP TABLE public.pg_dist_placement;
DROP TABLE public.pg_dist_poolinfo;
DROP TABLE public.pg_dist_shard;
DROP TABLE public.pg_dist_transaction;
DROP TABLE public.pg_dist_rebalance_strategy;
DROP TABLE public.pg_dist_cleanup;
DROP TABLE public.pg_dist_schema;
--
-- reset sequences
--
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false);
PERFORM setval('pg_catalog.pg_dist_clock_logical_seq', (SELECT last_value FROM public.pg_dist_clock_logical_seq), false);
DROP TABLE public.pg_dist_clock_logical_seq;
--
-- register triggers
--
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition JOIN pg_class ON (logicalrelid = oid) WHERE relkind <> 'f'
LOOP
trigger_name := 'truncate_trigger_' || table_name::oid;
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
EXECUTE command;
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
EXECUTE command;
END LOOP;
--
-- set dependencies
--
INSERT INTO pg_depend
SELECT
'pg_class'::regclass::oid as classid,
p.logicalrelid::regclass::oid as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- set dependencies for columnar table access method
PERFORM columnar_internal.columnar_ensure_am_depends_catalog();
-- restore pg_dist_object from the stable identifiers
TRUNCATE pg_catalog.pg_dist_object;
INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
SELECT
address.classid,
address.objid,
address.objsubid,
naming.distribution_argument_index,
naming.colocationid
FROM
public.pg_dist_object naming,
pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
DROP TABLE public.pg_dist_object;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';

View File

@ -128,6 +128,12 @@ BEGIN
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
-- Temporarily disable trigger to check for validity of functions while
-- inserting. The current contents of the table might be invalid if one of
-- the functions was removed by the user without also removing the
-- rebalance strategy. Obviously that's not great, but it should be no
-- reason to fail the upgrade.
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
@ -138,6 +144,7 @@ BEGIN
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
--
-- drop backup tables

View File

@ -130,8 +130,13 @@ extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId);
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
extern List * GetSequencesFromAttrDef(Oid attrdefOid);
#if PG_VERSION_NUM < PG_VERSION_15
ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid);
#endif
extern List * GetAttrDefsFromSequence(Oid seqOid);
extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum, char depType);
extern List * GetDependentRelationsWithSequence(Oid seqId, char depType);
extern List * GetDependentFunctionsWithRelation(Oid relationId);
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
extern void SetLocalEnableMetadataSync(bool state);

View File

@ -520,5 +520,61 @@ show citus.node_conninfo;
-- Should work again
ALTER TABLE test ADD COLUMN e INT;
-- show that we allow providing "host" param via citus.node_conninfo
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- fails due to invalid host
SELECT COUNT(*)>=0 FROM test;
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
SELECT COUNT(*)>=0 FROM test;
?column?
---------------------------------------------------------------------
t
(1 row)
-- restore original nodenames into pg_dist_node
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
-- reset it
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
select pg_sleep(0.1); -- wait for config reload to apply
pg_sleep
---------------------------------------------------------------------
(1 row)
DROP SCHEMA node_conninfo_reload CASCADE;
NOTICE: drop cascades to table test

View File

@ -1,8 +1,9 @@
SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name;
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold
---------------------------------------------------------------------
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
(3 rows)
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
invalid_strategy | f | 1234567 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
(4 rows)

View File

@ -35,3 +35,23 @@ SELECT citus_set_default_rebalance_strategy('custom_strategy');
(1 row)
-- Disable the trigger temporarily to allow the invalid strategy to be added.
-- Normally an invalid strategy can end up in the table by deleting one of the
-- functions it depends on. But we do directly in this test because we want to
-- have a consistent OID, so we get consistent test output.
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
SELECT citus_add_rebalance_strategy(
'invalid_strategy',
1234567,
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2,
0.3
);
citus_add_rebalance_strategy
---------------------------------------------------------------------
(1 row)
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;

View File

@ -205,4 +205,30 @@ show citus.node_conninfo;
-- Should work again
ALTER TABLE test ADD COLUMN e INT;
-- show that we allow providing "host" param via citus.node_conninfo
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- fails due to invalid host
SELECT COUNT(*)>=0 FROM test;
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
SELECT COUNT(*)>=0 FROM test;
-- restore original nodenames into pg_dist_node
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
-- reset it
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
select pg_sleep(0.1); -- wait for config reload to apply
DROP SCHEMA node_conninfo_reload CASCADE;

View File

@ -29,3 +29,19 @@ SELECT citus_add_rebalance_strategy(
0.3
);
SELECT citus_set_default_rebalance_strategy('custom_strategy');
-- Disable the trigger temporarily to allow the invalid strategy to be added.
-- Normally an invalid strategy can end up in the table by deleting one of the
-- functions it depends on. But we do directly in this test because we want to
-- have a consistent OID, so we get consistent test output.
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
SELECT citus_add_rebalance_strategy(
'invalid_strategy',
1234567,
'capacity_high_worker_1',
'only_worker_2',
0.5,
0.2,
0.3
);
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;