Merge branch 'main' into use-index-get-dependent-fdws

pull/7574/head
Jelte Fennema-Nio 2024-04-15 14:25:41 +02:00 committed by GitHub
commit 4bdd977b3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 321 additions and 67 deletions

View File

@ -22,6 +22,7 @@
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attrdef.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_extension.h"
@ -50,6 +51,7 @@
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@ -1696,52 +1698,39 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
void
EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId);
Oid attrDefOid;
List *attrDefOids = GetAttrDefsFromSequence(seqOid);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
foreach_oid(attrDefOid, attrDefOids)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match.
*
* We skip non-distributed tables, but we need to check the current
* table as it might reference the same sequence multiple times.
*/
if (columnAddress.objectId != ownerRelationId &&
!IsCitusTable(columnAddress.objectId))
{
AttrNumber currentAttnum = seqInfo->attributeNumber;
Oid currentSeqOid = seqInfo->sequenceOid;
if (!seqInfo->isNextValDefault)
{
/*
* If a sequence is not on the nextval, we don't need any check.
* This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col
*/
continue;
}
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentAttributeTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (attributeTypeId != currentAttributeTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
continue;
}
Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId,
columnAddress.objectSubId);
if (attributeTypeId != currentAttributeTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(columnAddress.objectId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, columnAddress.objectSubId,
citusTableName)));
}
}
}

View File

@ -14,6 +14,7 @@
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/pg_attrdef.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "nodes/makefuncs.h"
@ -507,22 +508,14 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
Oid relationId;
List *relations = GetDependentRelationsWithSequence(sequenceAddress->objectId,
depType);
foreach_oid(relationId, relations)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
if (IsCitusTable(relationId))
{
/*
* This sequence is used in a distributed table
*/
if (seqInfo->sequenceOid == sequenceAddress->objectId)
{
return citusTableId;
}
return relationId;
}
}

View File

@ -271,9 +271,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
* We allocate everything in the provided context so as to facilitate using
* pfree on all runtime parameters when connections using these entries are
* invalidated during config reloads.
*
* Also, when "host" is already provided in global parameters, we use hostname
* from the key as "hostaddr" instead of "host" to avoid host name lookup. In
* that case, the value for "host" becomes useful only if the authentication
* method requires it.
*/
bool gotHostParamFromGlobalParams = false;
for (Size paramIndex = 0; paramIndex < ConnParams.size; paramIndex++)
{
if (strcmp(ConnParams.keywords[paramIndex], "host") == 0)
{
gotHostParamFromGlobalParams = true;
break;
}
}
const char *runtimeKeywords[] = {
"host",
gotHostParamFromGlobalParams ? "hostaddr" : "host",
"port",
"dbname",
"user",

View File

@ -1637,6 +1637,74 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
}
/*
* GetDependentDependentRelationsWithSequence returns a list of oids of
* relations that have have a dependency on the given sequence.
* There are three types of dependencies:
* 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL
* 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..')
* 3. internal, created using GENERATED ALWAYS AS IDENTITY
*
* Depending on the passed deptype, we return the relations that have the
* given type(s):
* - DEPENDENCY_AUTO returns both 1 and 2
* - DEPENDENCY_INTERNAL returns 3
*
* The returned list can contain duplicates, as the same relation can have
* multiple dependencies on the sequence.
*/
List *
GetDependentRelationsWithSequence(Oid sequenceOid, char depType)
{
List *relations = NIL;
ScanKeyData key[2];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(sequenceOid));
SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
NULL, lengthof(key), key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (
deprec->refclassid == RelationRelationId &&
deprec->refobjsubid != 0 &&
deprec->deptype == depType)
{
relations = lappend_oid(relations, deprec->refobjid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
if (depType == DEPENDENCY_AUTO)
{
Oid attrDefOid;
List *attrDefOids = GetAttrDefsFromSequence(sequenceOid);
foreach_oid(attrDefOid, attrDefOids)
{
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
relations = lappend_oid(relations, columnAddress.objectId);
}
}
return relations;
}
/*
* GetSequencesFromAttrDef returns a list of sequence OIDs that have
* dependency with the given attrdefOid in pg_depend
@ -1682,6 +1750,90 @@ GetSequencesFromAttrDef(Oid attrdefOid)
}
#if PG_VERSION_NUM < PG_VERSION_15
/*
* Given a pg_attrdef OID, return the relation OID and column number of
* the owning column (represented as an ObjectAddress for convenience).
*
* Returns InvalidObjectAddress if there is no such pg_attrdef entry.
*/
ObjectAddress
GetAttrDefaultColumnAddress(Oid attrdefoid)
{
ObjectAddress result = InvalidObjectAddress;
ScanKeyData skey[1];
HeapTuple tup;
Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_attrdef_oid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(attrdefoid));
SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true,
NULL, 1, skey);
if (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup);
result.classId = RelationRelationId;
result.objectId = atdform->adrelid;
result.objectSubId = atdform->adnum;
}
systable_endscan(scan);
table_close(attrdef, AccessShareLock);
return result;
}
#endif
/*
* GetAttrDefsFromSequence returns a list of attrdef OIDs that have
* a dependency on the given sequence
*/
List *
GetAttrDefsFromSequence(Oid seqOid)
{
List *attrDefsResult = NIL;
ScanKeyData key[2];
HeapTuple tup;
Relation depRel = table_open(DependRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_depend_refclassid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1],
Anum_pg_depend_refobjid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(seqOid));
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
NULL, lengthof(key), key);
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
if (deprec->classid == AttrDefaultRelationId &&
deprec->deptype == DEPENDENCY_NORMAL)
{
attrDefsResult = lappend_oid(attrDefsResult, deprec->objid);
}
}
systable_endscan(scan);
table_close(depRel, AccessShareLock);
return attrDefsResult;
}
/*
* GetDependentFunctionsWithRelation returns the dependent functions for the
* given relation id.

View File

@ -2929,6 +2929,7 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
#if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
"gsslib",
#endif
"host",
"keepalives",
"keepalives_count",
"keepalives_idle",

View File

@ -130,8 +130,13 @@ extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId);
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
extern List * GetSequencesFromAttrDef(Oid attrdefOid);
#if PG_VERSION_NUM < PG_VERSION_15
ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid);
#endif
extern List * GetAttrDefsFromSequence(Oid seqOid);
extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
AttrNumber attnum, char depType);
extern List * GetDependentRelationsWithSequence(Oid seqId, char depType);
extern List * GetDependentFunctionsWithRelation(Oid relationId);
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
extern void SetLocalEnableMetadataSync(bool state);

View File

@ -520,5 +520,61 @@ show citus.node_conninfo;
-- Should work again
ALTER TABLE test ADD COLUMN e INT;
-- show that we allow providing "host" param via citus.node_conninfo
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- fails due to invalid host
SELECT COUNT(*)>=0 FROM test;
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
SELECT COUNT(*)>=0 FROM test;
?column?
---------------------------------------------------------------------
t
(1 row)
-- restore original nodenames into pg_dist_node
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
-- reset it
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
select pg_sleep(0.1); -- wait for config reload to apply
pg_sleep
---------------------------------------------------------------------
(1 row)
DROP SCHEMA node_conninfo_reload CASCADE;
NOTICE: drop cascades to table test

View File

@ -1126,16 +1126,33 @@ sub RunVanillaTests
system("mkdir", ("-p", "$pgregressOutputdir/sql")) == 0
or die "Could not create vanilla sql dir.";
$exitcode = system("$plainRegress",
("--dlpath", $dlpath),
("--inputdir", $pgregressInputdir),
("--outputdir", $pgregressOutputdir),
("--schedule", catfile("$pgregressInputdir", "parallel_schedule")),
("--use-existing"),
("--host","$host"),
("--port","$masterPort"),
("--user","$user"),
("--dbname", "$dbName"));
if ($majorversion >= "16")
{
$exitcode = system("$plainRegress",
("--dlpath", $dlpath),
("--inputdir", $pgregressInputdir),
("--outputdir", $pgregressOutputdir),
("--expecteddir", $pgregressOutputdir),
("--schedule", catfile("$pgregressInputdir", "parallel_schedule")),
("--use-existing"),
("--host","$host"),
("--port","$masterPort"),
("--user","$user"),
("--dbname", "$dbName"));
}
else
{
$exitcode = system("$plainRegress",
("--dlpath", $dlpath),
("--inputdir", $pgregressInputdir),
("--outputdir", $pgregressOutputdir),
("--schedule", catfile("$pgregressInputdir", "parallel_schedule")),
("--use-existing"),
("--host","$host"),
("--port","$masterPort"),
("--user","$user"),
("--dbname", "$dbName"));
}
}
if ($useMitmproxy) {

View File

@ -205,4 +205,30 @@ show citus.node_conninfo;
-- Should work again
ALTER TABLE test ADD COLUMN e INT;
-- show that we allow providing "host" param via citus.node_conninfo
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- fails due to invalid host
SELECT COUNT(*)>=0 FROM test;
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
SELECT COUNT(*)>=0 FROM test;
-- restore original nodenames into pg_dist_node
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
-- reset it
ALTER SYSTEM RESET citus.node_conninfo;
select pg_reload_conf();
select pg_sleep(0.1); -- wait for config reload to apply
DROP SCHEMA node_conninfo_reload CASCADE;