From 3586aab17a7ff2fef8f336b6528376552a5d4c2c Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 15 Apr 2024 12:51:11 +0300 Subject: [PATCH 1/4] Allow providing "host" parameter via citus.node_conninfo (#7541) And when that is the case, directly use it as "host" parameter for the connections between nodes and use the "hostname" provided in pg_dist_node / pg_dist_poolinfo as "hostaddr" to avoid host name lookup. This is to avoid allowing dns resolution (and / or setting up DNS names for each host in the cluster). This already works currently when using IPs in the hostname. The only use of setting host is that you can then use sslmode=verify-full and it will validate that the hostname matches the certificate provided by the node you're connecting too. It would be more flexible to make this a per-node setting, but that requires SQL changes. And we'd like to backport this change, and backporting such a sql change would be quite hard while backporting this change would be very easy. And in many setups, a different hostname for TLS validation is actually not needed. The reason for that is query-from-any node: With query-from-any-node all nodes usually have a certificate that is valid for the same "cluster hostname", either using a wildcard cert or a Subject Alternative Name (SAN). Because if you load balance across nodes you don't know which node you're connecting to, but you still want TLS validation to do it's job. So with this change you can use this same "cluster hostname" for TLS validation within the cluster. Obviously this means you don't validate that you're connecting to a particular node, just that you're connecting to one of the nodes in the cluster, but that should be fine from a security perspective (in most cases). Note to self: This change requires updating https://docs.citusdata.com/en/latest/develop/api_guc.html#citus-node-conninfo-text. DESCRIPTION: Allows overwriting host name for all inter-node connections by supporting "host" parameter in citus.node_conninfo --- .../connection/connection_configuration.c | 17 +++++- src/backend/distributed/shared_library_init.c | 1 + .../regress/expected/node_conninfo_reload.out | 56 +++++++++++++++++++ src/test/regress/sql/node_conninfo_reload.sql | 26 +++++++++ 4 files changed, 99 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index ac82d4e09..3913173e2 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -271,9 +271,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, * We allocate everything in the provided context so as to facilitate using * pfree on all runtime parameters when connections using these entries are * invalidated during config reloads. + * + * Also, when "host" is already provided in global parameters, we use hostname + * from the key as "hostaddr" instead of "host" to avoid host name lookup. In + * that case, the value for "host" becomes useful only if the authentication + * method requires it. */ + bool gotHostParamFromGlobalParams = false; + for (Size paramIndex = 0; paramIndex < ConnParams.size; paramIndex++) + { + if (strcmp(ConnParams.keywords[paramIndex], "host") == 0) + { + gotHostParamFromGlobalParams = true; + break; + } + } + const char *runtimeKeywords[] = { - "host", + gotHostParamFromGlobalParams ? "hostaddr" : "host", "port", "dbname", "user", diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 45e212e8b..bd65fa60c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2929,6 +2929,7 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source) #if defined(ENABLE_GSS) && defined(ENABLE_SSPI) "gsslib", #endif + "host", "keepalives", "keepalives_count", "keepalives_idle", diff --git a/src/test/regress/expected/node_conninfo_reload.out b/src/test/regress/expected/node_conninfo_reload.out index 785e3e1b1..3b33c54b2 100644 --- a/src/test/regress/expected/node_conninfo_reload.out +++ b/src/test/regress/expected/node_conninfo_reload.out @@ -520,5 +520,61 @@ show citus.node_conninfo; -- Should work again ALTER TABLE test ADD COLUMN e INT; +-- show that we allow providing "host" param via citus.node_conninfo +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- fails due to invalid host +SELECT COUNT(*)>=0 FROM test; +WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known +ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known +SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset +UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]); +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost'; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo +SELECT COUNT(*)>=0 FROM test; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- restore original nodenames into pg_dist_node +UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]); +-- reset it +ALTER SYSTEM RESET citus.node_conninfo; +select pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +select pg_sleep(0.1); -- wait for config reload to apply + pg_sleep +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA node_conninfo_reload CASCADE; NOTICE: drop cascades to table test diff --git a/src/test/regress/sql/node_conninfo_reload.sql b/src/test/regress/sql/node_conninfo_reload.sql index 42ba8c9b1..2faaaeeb1 100644 --- a/src/test/regress/sql/node_conninfo_reload.sql +++ b/src/test/regress/sql/node_conninfo_reload.sql @@ -205,4 +205,30 @@ show citus.node_conninfo; -- Should work again ALTER TABLE test ADD COLUMN e INT; +-- show that we allow providing "host" param via citus.node_conninfo +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- fails due to invalid host +SELECT COUNT(*)>=0 FROM test; + +SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset +UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]); + +ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost'; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); + +-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo +SELECT COUNT(*)>=0 FROM test; + +-- restore original nodenames into pg_dist_node +UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]); + +-- reset it +ALTER SYSTEM RESET citus.node_conninfo; +select pg_reload_conf(); +select pg_sleep(0.1); -- wait for config reload to apply + DROP SCHEMA node_conninfo_reload CASCADE; From 381f31756e6de0e0522aeaa489fe671df0ddf731 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 15 Apr 2024 12:28:11 +0200 Subject: [PATCH 2/4] Speed up EnsureSequenceTypeSupported (#7575) DESCRIPTION: Fix performance issue when creating distributed tables and many already exist EnsureSequenceTypeSupported was doing an O(number of distributed tables) operation. This can become very slow with lots of Citus tables, which now happens much more frequently in practice due to schema based sharding. Partially addresses #7022 --- .../commands/create_distributed_table.c | 73 +++++++--------- .../distributed/metadata/metadata_sync.c | 84 +++++++++++++++++++ src/include/distributed/metadata_sync.h | 4 + 3 files changed, 119 insertions(+), 42 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5ec6d6dd7..8c59aa199 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -22,6 +22,7 @@ #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/pg_am.h" +#include "catalog/pg_attrdef.h" #include "catalog/pg_attribute.h" #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" @@ -50,6 +51,7 @@ #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1696,52 +1698,39 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId) { - List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId); + Oid attrDefOid; + List *attrDefOids = GetAttrDefsFromSequence(seqOid); - Oid citusTableId = InvalidOid; - foreach_oid(citusTableId, citusTableIdList) + foreach_oid(attrDefOid, attrDefOids) { - List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO); + ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); - SequenceInfo *seqInfo = NULL; - foreach_ptr(seqInfo, seqInfoList) + /* + * If another distributed table is using the same sequence + * in one of its column defaults, make sure the types of the + * columns match. + * + * We skip non-distributed tables, but we need to check the current + * table as it might reference the same sequence multiple times. + */ + if (columnAddress.objectId != ownerRelationId && + !IsCitusTable(columnAddress.objectId)) { - AttrNumber currentAttnum = seqInfo->attributeNumber; - Oid currentSeqOid = seqInfo->sequenceOid; - - if (!seqInfo->isNextValDefault) - { - /* - * If a sequence is not on the nextval, we don't need any check. - * This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col - */ - continue; - } - - /* - * If another distributed table is using the same sequence - * in one of its column defaults, make sure the types of the - * columns match - */ - if (currentSeqOid == seqOid) - { - Oid currentAttributeTypId = GetAttributeTypeOid(citusTableId, - currentAttnum); - if (attributeTypeId != currentAttributeTypId) - { - char *sequenceName = generate_qualified_relation_name( - seqOid); - char *citusTableName = - generate_qualified_relation_name(citusTableId); - ereport(ERROR, (errmsg( - "The sequence %s is already used for a different" - " type in column %d of the table %s", - sequenceName, currentAttnum, - citusTableName))); - } - } + continue; + } + Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId, + columnAddress.objectSubId); + if (attributeTypeId != currentAttributeTypId) + { + char *sequenceName = generate_qualified_relation_name( + seqOid); + char *citusTableName = + generate_qualified_relation_name(columnAddress.objectId); + ereport(ERROR, (errmsg( + "The sequence %s is already used for a different" + " type in column %d of the table %s", + sequenceName, columnAddress.objectSubId, + citusTableName))); } } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 31d586e90..9e44a01f9 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1682,6 +1682,90 @@ GetSequencesFromAttrDef(Oid attrdefOid) } +#if PG_VERSION_NUM < PG_VERSION_15 + +/* + * Given a pg_attrdef OID, return the relation OID and column number of + * the owning column (represented as an ObjectAddress for convenience). + * + * Returns InvalidObjectAddress if there is no such pg_attrdef entry. + */ +ObjectAddress +GetAttrDefaultColumnAddress(Oid attrdefoid) +{ + ObjectAddress result = InvalidObjectAddress; + ScanKeyData skey[1]; + HeapTuple tup; + + Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock); + ScanKeyInit(&skey[0], + Anum_pg_attrdef_oid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(attrdefoid)); + SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true, + NULL, 1, skey); + + if (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup); + + result.classId = RelationRelationId; + result.objectId = atdform->adrelid; + result.objectSubId = atdform->adnum; + } + + systable_endscan(scan); + table_close(attrdef, AccessShareLock); + + return result; +} + + +#endif + + +/* + * GetAttrDefsFromSequence returns a list of attrdef OIDs that have + * a dependency on the given sequence + */ +List * +GetAttrDefsFromSequence(Oid seqOid) +{ + List *attrDefsResult = NIL; + ScanKeyData key[2]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_refclassid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_refobjid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(seqOid)); + SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, + NULL, lengthof(key), key); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if (deprec->classid == AttrDefaultRelationId && + deprec->deptype == DEPENDENCY_NORMAL) + { + attrDefsResult = lappend_oid(attrDefsResult, deprec->objid); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + return attrDefsResult; +} + + /* * GetDependentFunctionsWithRelation returns the dependent functions for the * given relation id. diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index cb111e16e..d0b760758 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -130,6 +130,10 @@ extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * GetSequencesFromAttrDef(Oid attrdefOid); +#if PG_VERSION_NUM < PG_VERSION_15 +ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid); +#endif +extern List * GetAttrDefsFromSequence(Oid seqOid); extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, AttrNumber attnum, char depType); extern List * GetDependentFunctionsWithRelation(Oid relationId); From cdf51da45842a41304a6a2ce9d878f21c8ec0782 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 15 Apr 2024 14:01:55 +0200 Subject: [PATCH 3/4] Speed up SequenceUsedInDistributedTable (#7579) DESCRIPTION: Fix performance issue when creating distributed tables if many already exist This builds on the work to speed up EnsureSequenceTypeSupported, and now does something similar for SequenceUsedInDistributedTable. SequenceUsedInDistributedTable had a similar O(number of citus tables) operation. This fixes that and speeds up creation of distributed tables significantly when many distributed tables already exist. Fixes #7022 --- src/backend/distributed/commands/sequence.c | 21 ++---- .../distributed/metadata/metadata_sync.c | 68 +++++++++++++++++++ src/include/distributed/metadata_sync.h | 1 + 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/commands/sequence.c b/src/backend/distributed/commands/sequence.c index 4d838a882..cfb55faf7 100644 --- a/src/backend/distributed/commands/sequence.c +++ b/src/backend/distributed/commands/sequence.c @@ -14,6 +14,7 @@ #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/namespace.h" +#include "catalog/pg_attrdef.h" #include "commands/defrem.h" #include "commands/extension.h" #include "nodes/makefuncs.h" @@ -507,22 +508,14 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString, static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType) { - List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - Oid citusTableId = InvalidOid; - foreach_oid(citusTableId, citusTableIdList) + Oid relationId; + List *relations = GetDependentRelationsWithSequence(sequenceAddress->objectId, + depType); + foreach_oid(relationId, relations) { - List *seqInfoList = NIL; - GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType); - SequenceInfo *seqInfo = NULL; - foreach_ptr(seqInfo, seqInfoList) + if (IsCitusTable(relationId)) { - /* - * This sequence is used in a distributed table - */ - if (seqInfo->sequenceOid == sequenceAddress->objectId) - { - return citusTableId; - } + return relationId; } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 9e44a01f9..ef7c56dc7 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1637,6 +1637,74 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, } +/* + * GetDependentDependentRelationsWithSequence returns a list of oids of + * relations that have have a dependency on the given sequence. + * There are three types of dependencies: + * 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL + * 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..') + * 3. internal, created using GENERATED ALWAYS AS IDENTITY + * + * Depending on the passed deptype, we return the relations that have the + * given type(s): + * - DEPENDENCY_AUTO returns both 1 and 2 + * - DEPENDENCY_INTERNAL returns 3 + * + * The returned list can contain duplicates, as the same relation can have + * multiple dependencies on the sequence. + */ +List * +GetDependentRelationsWithSequence(Oid sequenceOid, char depType) +{ + List *relations = NIL; + ScanKeyData key[2]; + HeapTuple tup; + + Relation depRel = table_open(DependRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_depend_classid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationRelationId)); + ScanKeyInit(&key[1], + Anum_pg_depend_objid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(sequenceOid)); + SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, + NULL, lengthof(key), key); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); + + if ( + deprec->refclassid == RelationRelationId && + deprec->refobjsubid != 0 && + deprec->deptype == depType) + { + relations = lappend_oid(relations, deprec->refobjid); + } + } + + systable_endscan(scan); + + table_close(depRel, AccessShareLock); + + if (depType == DEPENDENCY_AUTO) + { + Oid attrDefOid; + List *attrDefOids = GetAttrDefsFromSequence(sequenceOid); + + foreach_oid(attrDefOid, attrDefOids) + { + ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); + relations = lappend_oid(relations, columnAddress.objectId); + } + } + + return relations; +} + + /* * GetSequencesFromAttrDef returns a list of sequence OIDs that have * dependency with the given attrdefOid in pg_depend diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index d0b760758..617eed705 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -136,6 +136,7 @@ ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid); extern List * GetAttrDefsFromSequence(Oid seqOid); extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, AttrNumber attnum, char depType); +extern List * GetDependentRelationsWithSequence(Oid seqId, char depType); extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); extern void SetLocalEnableMetadataSync(bool state); From 16604a6601d8445c334d44b32ec6cf825c61839f Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 15 Apr 2024 14:42:56 +0200 Subject: [PATCH 4/4] Use an index to get FDWs that depend on extensions (#7574) DESCRIPTION: Fix performance issue when distributing a table that depends on an extension When the database contains many objects this function would show up in profiles because it was doing a sequence scan on pg_depend. And with many objects pg_depend can get very large. This starts using an index scan to only look for rows containing FDWs, of which there are expected to be very few (often even zero). --- src/backend/distributed/commands/extension.c | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 2ead0c58a..8d4c6431b 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -1093,33 +1093,26 @@ List * GetDependentFDWsToExtension(Oid extensionId) { List *extensionFDWs = NIL; - ScanKeyData key[3]; - int scanKeyCount = 3; + ScanKeyData key[1]; HeapTuple tup; Relation pgDepend = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], - Anum_pg_depend_refclassid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(ExtensionRelationId)); - ScanKeyInit(&key[1], - Anum_pg_depend_refobjid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(extensionId)); - ScanKeyInit(&key[2], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(ForeignDataWrapperRelationId)); - SysScanDesc scan = systable_beginscan(pgDepend, InvalidOid, false, - NULL, scanKeyCount, key); + SysScanDesc scan = systable_beginscan(pgDepend, DependDependerIndexId, true, + NULL, lengthof(key), key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend pgDependEntry = (Form_pg_depend) GETSTRUCT(tup); - if (pgDependEntry->deptype == DEPENDENCY_EXTENSION) + if (pgDependEntry->deptype == DEPENDENCY_EXTENSION && + pgDependEntry->refclassid == ExtensionRelationId && + pgDependEntry->refobjid == extensionId) { extensionFDWs = lappend_oid(extensionFDWs, pgDependEntry->objid); }