diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5ec6d6dd7..8c59aa199 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -22,6 +22,7 @@ #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/pg_am.h" +#include "catalog/pg_attrdef.h" #include "catalog/pg_attribute.h" #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" @@ -50,6 +51,7 @@ #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1696,52 +1698,39 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId) { - List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId); + Oid attrDefOid; + List *attrDefOids = GetAttrDefsFromSequence(seqOid); - Oid citusTableId = InvalidOid; - foreach_oid(citusTableId, citusTableIdList) + foreach_oid(attrDefOid, attrDefOids) { - List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO); + ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); - SequenceInfo *seqInfo = NULL; - foreach_ptr(seqInfo, seqInfoList) + /* + * If another distributed table is using the same sequence + * in one of its column defaults, make sure the types of the + * columns match. + * + * We skip non-distributed tables, but we need to check the current + * table as it might reference the same sequence multiple times. + */ + if (columnAddress.objectId != ownerRelationId && + !IsCitusTable(columnAddress.objectId)) { - AttrNumber currentAttnum = seqInfo->attributeNumber; - Oid currentSeqOid = seqInfo->sequenceOid; - - if (!seqInfo->isNextValDefault) - { - /* - * If a sequence is not on the nextval, we don't need any check. - * This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col - */ - continue; - } - - /* - * If another distributed table is using the same sequence - * in one of its column defaults, make sure the types of the - * columns match - */ - if (currentSeqOid == seqOid) - { - Oid currentAttributeTypId = GetAttributeTypeOid(citusTableId, - currentAttnum); - if (attributeTypeId != currentAttributeTypId) - { - char *sequenceName = generate_qualified_relation_name( - seqOid); - char *citusTableName = - generate_qualified_relation_name(citusTableId); - ereport(ERROR, (errmsg( - "The sequence %s is already used for a different" - " type in column %d of the table %s", - sequenceName, currentAttnum, - citusTableName))); - } - } + continue; + } + Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId, + columnAddress.objectSubId); + if (attributeTypeId != currentAttributeTypId) + { + char *sequenceName = generate_qualified_relation_name( + seqOid); + char *citusTableName = + generate_qualified_relation_name(columnAddress.objectId); + ereport(ERROR, (errmsg( + "The sequence %s is already used for a different" + " type in column %d of the table %s", + sequenceName, columnAddress.objectSubId, + citusTableName))); } } } diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 4d838a882..cfb55faf7 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -14,6 +14,7 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" +#include "catalog/pg_attrdef.h" #include "commands/defrem.h" #include "commands/extension.h" #include "nodes/makefuncs.h" @@ -507,22 +508,14 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString, static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType) { - List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - Oid citusTableId = InvalidOid; - foreach_oid(citusTableId, citusTableIdList) + Oid relationId; + List *relations = GetDependentRelationsWithSequence(sequenceAddress->objectId, + depType); + foreach_oid(relationId, relations) { - List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType); - SequenceInfo *seqInfo = NULL; - foreach_ptr(seqInfo, seqInfoList) + if (IsCitusTable(relationId)) { - /* - * This sequence is used in a distributed table - */ - if (seqInfo->sequenceOid == sequenceAddress->objectId) - { - return citusTableId; - } + return relationId; } } diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index ac82d4e09..3913173e2 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -271,9 +271,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, * We allocate everything in the provided context so as to facilitate using * pfree on all runtime parameters when connections using these entries are * invalidated during config reloads. + * + * Also, when "host" is already provided in global parameters, we use hostname + * from the key as "hostaddr" instead of "host" to avoid host name lookup. In + * that case, the value for "host" becomes useful only if the authentication + * method requires it. */ + bool gotHostParamFromGlobalParams = false; + for (Size paramIndex = 0; paramIndex < ConnParams.size; paramIndex++) + { + if (strcmp(ConnParams.keywords[paramIndex], "host") == 0) + { + gotHostParamFromGlobalParams = true; + break; + } + } + const char *runtimeKeywords[] = { - "host", + gotHostParamFromGlobalParams ? "hostaddr" : "host", "port", "dbname", "user", diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 31d586e90..ef7c56dc7 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1637,6 +1637,74 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, } +/* + * GetDependentDependentRelationsWithSequence returns a list of oids of + * relations that have have a dependency on the given sequence. + * There are three types of dependencies: + * 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL + * 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..') + * 3. internal, created using GENERATED ALWAYS AS IDENTITY + * + * Depending on the passed deptype, we return the relations that have the + * given type(s): + * - DEPENDENCY_AUTO returns both 1 and 2 + * - DEPENDENCY_INTERNAL returns 3 + * + * The returned list can contain duplicates, as the same relation can have + * multiple dependencies on the sequence. + */ +List * +GetDependentRelationsWithSequence(Oid sequenceOid, char depType) +{ + List *relations = NIL; + ScanKeyData key[2]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(sequenceOid)); + SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, + NULL, lengthof(key), key); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if ( + deprec->refclassid == RelationRelationId && + deprec->refobjsubid != 0 && + deprec->deptype == depType) + { + relations = lappend_oid(relations, deprec->refobjid); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + if (depType == DEPENDENCY_AUTO) + { + Oid attrDefOid; + List *attrDefOids = GetAttrDefsFromSequence(sequenceOid); + + foreach_oid(attrDefOid, attrDefOids) + { + ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); + relations = lappend_oid(relations, columnAddress.objectId); + } + } + + return relations; +} + + /* * GetSequencesFromAttrDef returns a list of sequence OIDs that have * dependency with the given attrdefOid in pg_depend @@ -1682,6 +1750,90 @@ GetSequencesFromAttrDef(Oid attrdefOid) } +#if PG_VERSION_NUM < PG_VERSION_15 + +/* + * Given a pg_attrdef OID, return the relation OID and column number of + * the owning column (represented as an ObjectAddress for convenience). + * + * Returns InvalidObjectAddress if there is no such pg_attrdef entry. + */ +ObjectAddress +GetAttrDefaultColumnAddress(Oid attrdefoid) +{ + ObjectAddress result = InvalidObjectAddress; + ScanKeyData skey[1]; + HeapTuple tup; + + Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock); + ScanKeyInit(&skey[0], + Anum_pg_attrdef_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(attrdefoid)); + SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true, + NULL, 1, skey); + + if (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup); + + result.classId = RelationRelationId; + result.objectId = atdform->adrelid; + result.objectSubId = atdform->adnum; + } + + systable_endscan(scan); + table_close(attrdef, AccessShareLock); + + return result; +} + + +#endif + + +/* + * GetAttrDefsFromSequence returns a list of attrdef OIDs that have + * a dependency on the given sequence + */ +List * +GetAttrDefsFromSequence(Oid seqOid) +{ + List *attrDefsResult = NIL; + ScanKeyData key[2]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(seqOid)); + SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, lengthof(key), key); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if (deprec->classid == AttrDefaultRelationId && + deprec->deptype == DEPENDENCY_NORMAL) + { + attrDefsResult = lappend_oid(attrDefsResult, deprec->objid); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + return attrDefsResult; +} + + /* * GetDependentFunctionsWithRelation returns the dependent functions for the * given relation id. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 45e212e8b..bd65fa60c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2929,6 +2929,7 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source) #if defined(ENABLE_GSS) && defined(ENABLE_SSPI) "gsslib", #endif + "host", "keepalives", "keepalives_count", "keepalives_idle", diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index cb111e16e..617eed705 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -130,8 +130,13 @@ extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * GetSequencesFromAttrDef(Oid attrdefOid); +#if PG_VERSION_NUM < PG_VERSION_15 +ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid); +#endif +extern List * GetAttrDefsFromSequence(Oid seqOid); extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, AttrNumber attnum, char depType); +extern List * GetDependentRelationsWithSequence(Oid seqId, char depType); extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern void SetLocalEnableMetadataSync(bool state); diff --git a/src/test/regress/expected/node_conninfo_reload.out b/src/test/regress/expected/node_conninfo_reload.out index 785e3e1b1..3b33c54b2 100644 --- a/src/test/regress/expected/node_conninfo_reload.out +++ b/src/test/regress/expected/node_conninfo_reload.out @@ -520,5 +520,61 @@ show citus.node_conninfo; -- Should work again ALTER TABLE test ADD COLUMN e INT; +-- show that we allow providing "host" param via citus.node_conninfo +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- fails due to invalid host +SELECT COUNT(*)>=0 FROM test; +WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known +ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known +SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset +UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]); +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo +SELECT COUNT(*)>=0 FROM test; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- restore original nodenames into pg_dist_node +UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]); +-- reset it +ALTER SYSTEM RESET citus.node_conninfo; +select pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +select pg_sleep(0.1); -- wait for config reload to apply + pg_sleep +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA node_conninfo_reload CASCADE; NOTICE: drop cascades to table test diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index c9a85d523..01e57c469 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -1126,16 +1126,33 @@ sub RunVanillaTests system("mkdir", ("-p", "$pgregressOutputdir/sql")) == 0 or die "Could not create vanilla sql dir."; - $exitcode = system("$plainRegress", - ("--dlpath", $dlpath), - ("--inputdir", $pgregressInputdir), - ("--outputdir", $pgregressOutputdir), - ("--schedule", catfile("$pgregressInputdir", "parallel_schedule")), - ("--use-existing"), - ("--host","$host"), - ("--port","$masterPort"), - ("--user","$user"), - ("--dbname", "$dbName")); + if ($majorversion >= "16") + { + $exitcode = system("$plainRegress", + ("--dlpath", $dlpath), + ("--inputdir", $pgregressInputdir), + ("--outputdir", $pgregressOutputdir), + ("--expecteddir", $pgregressOutputdir), + ("--schedule", catfile("$pgregressInputdir", "parallel_schedule")), + ("--use-existing"), + ("--host","$host"), + ("--port","$masterPort"), + ("--user","$user"), + ("--dbname", "$dbName")); + } + else + { + $exitcode = system("$plainRegress", + ("--dlpath", $dlpath), + ("--inputdir", $pgregressInputdir), + ("--outputdir", $pgregressOutputdir), + ("--schedule", catfile("$pgregressInputdir", "parallel_schedule")), + ("--use-existing"), + ("--host","$host"), + ("--port","$masterPort"), + ("--user","$user"), + ("--dbname", "$dbName")); + } } if ($useMitmproxy) { diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index 42ba8c9b1..2faaaeeb1 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -205,4 +205,30 @@ show citus.node_conninfo; -- Should work again ALTER TABLE test ADD COLUMN e INT; +-- show that we allow providing "host" param via citus.node_conninfo +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- fails due to invalid host +SELECT COUNT(*)>=0 FROM test; + +SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset +UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]); + +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo +SELECT COUNT(*)>=0 FROM test; + +-- restore original nodenames into pg_dist_node +UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]); + +-- reset it +ALTER SYSTEM RESET citus.node_conninfo; +select pg_reload_conf(); +select pg_sleep(0.1); -- wait for config reload to apply + DROP SCHEMA node_conninfo_reload CASCADE;