mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into fix-upgrades-with-invalid-rebalance-strategies
commit
44cfe3730f
|
@ -22,6 +22,7 @@
|
|||
#include "catalog/dependency.h"
|
||||
#include "catalog/index.h"
|
||||
#include "catalog/pg_am.h"
|
||||
#include "catalog/pg_attrdef.h"
|
||||
#include "catalog/pg_attribute.h"
|
||||
#include "catalog/pg_enum.h"
|
||||
#include "catalog/pg_extension.h"
|
||||
|
@ -50,6 +51,7 @@
|
|||
#include "tcop/pquery.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
|
@ -1696,52 +1698,39 @@ PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
|
|||
void
|
||||
EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId)
|
||||
{
|
||||
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
|
||||
citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId);
|
||||
Oid attrDefOid;
|
||||
List *attrDefOids = GetAttrDefsFromSequence(seqOid);
|
||||
|
||||
Oid citusTableId = InvalidOid;
|
||||
foreach_oid(citusTableId, citusTableIdList)
|
||||
foreach_oid(attrDefOid, attrDefOids)
|
||||
{
|
||||
List *seqInfoList = NIL;
|
||||
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);
|
||||
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
|
||||
|
||||
SequenceInfo *seqInfo = NULL;
|
||||
foreach_ptr(seqInfo, seqInfoList)
|
||||
/*
|
||||
* If another distributed table is using the same sequence
|
||||
* in one of its column defaults, make sure the types of the
|
||||
* columns match.
|
||||
*
|
||||
* We skip non-distributed tables, but we need to check the current
|
||||
* table as it might reference the same sequence multiple times.
|
||||
*/
|
||||
if (columnAddress.objectId != ownerRelationId &&
|
||||
!IsCitusTable(columnAddress.objectId))
|
||||
{
|
||||
AttrNumber currentAttnum = seqInfo->attributeNumber;
|
||||
Oid currentSeqOid = seqInfo->sequenceOid;
|
||||
|
||||
if (!seqInfo->isNextValDefault)
|
||||
{
|
||||
/*
|
||||
* If a sequence is not on the nextval, we don't need any check.
|
||||
* This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* If another distributed table is using the same sequence
|
||||
* in one of its column defaults, make sure the types of the
|
||||
* columns match
|
||||
*/
|
||||
if (currentSeqOid == seqOid)
|
||||
{
|
||||
Oid currentAttributeTypId = GetAttributeTypeOid(citusTableId,
|
||||
currentAttnum);
|
||||
if (attributeTypeId != currentAttributeTypId)
|
||||
{
|
||||
char *sequenceName = generate_qualified_relation_name(
|
||||
seqOid);
|
||||
char *citusTableName =
|
||||
generate_qualified_relation_name(citusTableId);
|
||||
ereport(ERROR, (errmsg(
|
||||
"The sequence %s is already used for a different"
|
||||
" type in column %d of the table %s",
|
||||
sequenceName, currentAttnum,
|
||||
citusTableName)));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId,
|
||||
columnAddress.objectSubId);
|
||||
if (attributeTypeId != currentAttributeTypId)
|
||||
{
|
||||
char *sequenceName = generate_qualified_relation_name(
|
||||
seqOid);
|
||||
char *citusTableName =
|
||||
generate_qualified_relation_name(columnAddress.objectId);
|
||||
ereport(ERROR, (errmsg(
|
||||
"The sequence %s is already used for a different"
|
||||
" type in column %d of the table %s",
|
||||
sequenceName, columnAddress.objectSubId,
|
||||
citusTableName)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1093,33 +1093,26 @@ List *
|
|||
GetDependentFDWsToExtension(Oid extensionId)
|
||||
{
|
||||
List *extensionFDWs = NIL;
|
||||
ScanKeyData key[3];
|
||||
int scanKeyCount = 3;
|
||||
ScanKeyData key[1];
|
||||
HeapTuple tup;
|
||||
|
||||
Relation pgDepend = table_open(DependRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&key[0],
|
||||
Anum_pg_depend_refclassid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(ExtensionRelationId));
|
||||
ScanKeyInit(&key[1],
|
||||
Anum_pg_depend_refobjid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(extensionId));
|
||||
ScanKeyInit(&key[2],
|
||||
Anum_pg_depend_classid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(ForeignDataWrapperRelationId));
|
||||
|
||||
SysScanDesc scan = systable_beginscan(pgDepend, InvalidOid, false,
|
||||
NULL, scanKeyCount, key);
|
||||
SysScanDesc scan = systable_beginscan(pgDepend, DependDependerIndexId, true,
|
||||
NULL, lengthof(key), key);
|
||||
|
||||
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
||||
{
|
||||
Form_pg_depend pgDependEntry = (Form_pg_depend) GETSTRUCT(tup);
|
||||
|
||||
if (pgDependEntry->deptype == DEPENDENCY_EXTENSION)
|
||||
if (pgDependEntry->deptype == DEPENDENCY_EXTENSION &&
|
||||
pgDependEntry->refclassid == ExtensionRelationId &&
|
||||
pgDependEntry->refobjid == extensionId)
|
||||
{
|
||||
extensionFDWs = lappend_oid(extensionFDWs, pgDependEntry->objid);
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "access/xact.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_attrdef.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/extension.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
|
@ -507,22 +508,14 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
|
|||
static Oid
|
||||
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
|
||||
{
|
||||
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
|
||||
Oid citusTableId = InvalidOid;
|
||||
foreach_oid(citusTableId, citusTableIdList)
|
||||
Oid relationId;
|
||||
List *relations = GetDependentRelationsWithSequence(sequenceAddress->objectId,
|
||||
depType);
|
||||
foreach_oid(relationId, relations)
|
||||
{
|
||||
List *seqInfoList = NIL;
|
||||
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
|
||||
SequenceInfo *seqInfo = NULL;
|
||||
foreach_ptr(seqInfo, seqInfoList)
|
||||
if (IsCitusTable(relationId))
|
||||
{
|
||||
/*
|
||||
* This sequence is used in a distributed table
|
||||
*/
|
||||
if (seqInfo->sequenceOid == sequenceAddress->objectId)
|
||||
{
|
||||
return citusTableId;
|
||||
}
|
||||
return relationId;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -271,9 +271,24 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values,
|
|||
* We allocate everything in the provided context so as to facilitate using
|
||||
* pfree on all runtime parameters when connections using these entries are
|
||||
* invalidated during config reloads.
|
||||
*
|
||||
* Also, when "host" is already provided in global parameters, we use hostname
|
||||
* from the key as "hostaddr" instead of "host" to avoid host name lookup. In
|
||||
* that case, the value for "host" becomes useful only if the authentication
|
||||
* method requires it.
|
||||
*/
|
||||
bool gotHostParamFromGlobalParams = false;
|
||||
for (Size paramIndex = 0; paramIndex < ConnParams.size; paramIndex++)
|
||||
{
|
||||
if (strcmp(ConnParams.keywords[paramIndex], "host") == 0)
|
||||
{
|
||||
gotHostParamFromGlobalParams = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const char *runtimeKeywords[] = {
|
||||
"host",
|
||||
gotHostParamFromGlobalParams ? "hostaddr" : "host",
|
||||
"port",
|
||||
"dbname",
|
||||
"user",
|
||||
|
|
|
@ -1637,6 +1637,74 @@ GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDependentDependentRelationsWithSequence returns a list of oids of
|
||||
* relations that have have a dependency on the given sequence.
|
||||
* There are three types of dependencies:
|
||||
* 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL
|
||||
* 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..')
|
||||
* 3. internal, created using GENERATED ALWAYS AS IDENTITY
|
||||
*
|
||||
* Depending on the passed deptype, we return the relations that have the
|
||||
* given type(s):
|
||||
* - DEPENDENCY_AUTO returns both 1 and 2
|
||||
* - DEPENDENCY_INTERNAL returns 3
|
||||
*
|
||||
* The returned list can contain duplicates, as the same relation can have
|
||||
* multiple dependencies on the sequence.
|
||||
*/
|
||||
List *
|
||||
GetDependentRelationsWithSequence(Oid sequenceOid, char depType)
|
||||
{
|
||||
List *relations = NIL;
|
||||
ScanKeyData key[2];
|
||||
HeapTuple tup;
|
||||
|
||||
Relation depRel = table_open(DependRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&key[0],
|
||||
Anum_pg_depend_classid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(RelationRelationId));
|
||||
ScanKeyInit(&key[1],
|
||||
Anum_pg_depend_objid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(sequenceOid));
|
||||
SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true,
|
||||
NULL, lengthof(key), key);
|
||||
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
||||
{
|
||||
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
|
||||
|
||||
if (
|
||||
deprec->refclassid == RelationRelationId &&
|
||||
deprec->refobjsubid != 0 &&
|
||||
deprec->deptype == depType)
|
||||
{
|
||||
relations = lappend_oid(relations, deprec->refobjid);
|
||||
}
|
||||
}
|
||||
|
||||
systable_endscan(scan);
|
||||
|
||||
table_close(depRel, AccessShareLock);
|
||||
|
||||
if (depType == DEPENDENCY_AUTO)
|
||||
{
|
||||
Oid attrDefOid;
|
||||
List *attrDefOids = GetAttrDefsFromSequence(sequenceOid);
|
||||
|
||||
foreach_oid(attrDefOid, attrDefOids)
|
||||
{
|
||||
ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid);
|
||||
relations = lappend_oid(relations, columnAddress.objectId);
|
||||
}
|
||||
}
|
||||
|
||||
return relations;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetSequencesFromAttrDef returns a list of sequence OIDs that have
|
||||
* dependency with the given attrdefOid in pg_depend
|
||||
|
@ -1682,6 +1750,90 @@ GetSequencesFromAttrDef(Oid attrdefOid)
|
|||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
|
||||
/*
|
||||
* Given a pg_attrdef OID, return the relation OID and column number of
|
||||
* the owning column (represented as an ObjectAddress for convenience).
|
||||
*
|
||||
* Returns InvalidObjectAddress if there is no such pg_attrdef entry.
|
||||
*/
|
||||
ObjectAddress
|
||||
GetAttrDefaultColumnAddress(Oid attrdefoid)
|
||||
{
|
||||
ObjectAddress result = InvalidObjectAddress;
|
||||
ScanKeyData skey[1];
|
||||
HeapTuple tup;
|
||||
|
||||
Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock);
|
||||
ScanKeyInit(&skey[0],
|
||||
Anum_pg_attrdef_oid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(attrdefoid));
|
||||
SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true,
|
||||
NULL, 1, skey);
|
||||
|
||||
if (HeapTupleIsValid(tup = systable_getnext(scan)))
|
||||
{
|
||||
Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup);
|
||||
|
||||
result.classId = RelationRelationId;
|
||||
result.objectId = atdform->adrelid;
|
||||
result.objectSubId = atdform->adnum;
|
||||
}
|
||||
|
||||
systable_endscan(scan);
|
||||
table_close(attrdef, AccessShareLock);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* GetAttrDefsFromSequence returns a list of attrdef OIDs that have
|
||||
* a dependency on the given sequence
|
||||
*/
|
||||
List *
|
||||
GetAttrDefsFromSequence(Oid seqOid)
|
||||
{
|
||||
List *attrDefsResult = NIL;
|
||||
ScanKeyData key[2];
|
||||
HeapTuple tup;
|
||||
|
||||
Relation depRel = table_open(DependRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&key[0],
|
||||
Anum_pg_depend_refclassid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(RelationRelationId));
|
||||
ScanKeyInit(&key[1],
|
||||
Anum_pg_depend_refobjid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(seqOid));
|
||||
SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true,
|
||||
NULL, lengthof(key), key);
|
||||
while (HeapTupleIsValid(tup = systable_getnext(scan)))
|
||||
{
|
||||
Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup);
|
||||
|
||||
if (deprec->classid == AttrDefaultRelationId &&
|
||||
deprec->deptype == DEPENDENCY_NORMAL)
|
||||
{
|
||||
attrDefsResult = lappend_oid(attrDefsResult, deprec->objid);
|
||||
}
|
||||
}
|
||||
|
||||
systable_endscan(scan);
|
||||
|
||||
table_close(depRel, AccessShareLock);
|
||||
|
||||
return attrDefsResult;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDependentFunctionsWithRelation returns the dependent functions for the
|
||||
* given relation id.
|
||||
|
|
|
@ -2929,6 +2929,7 @@ NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source)
|
|||
#if defined(ENABLE_GSS) && defined(ENABLE_SSPI)
|
||||
"gsslib",
|
||||
#endif
|
||||
"host",
|
||||
"keepalives",
|
||||
"keepalives_count",
|
||||
"keepalives_idle",
|
||||
|
|
|
@ -130,8 +130,13 @@ extern List * IdentitySequenceDependencyCommandList(Oid targetRelationId);
|
|||
|
||||
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
|
||||
extern List * GetSequencesFromAttrDef(Oid attrdefOid);
|
||||
#if PG_VERSION_NUM < PG_VERSION_15
|
||||
ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid);
|
||||
#endif
|
||||
extern List * GetAttrDefsFromSequence(Oid seqOid);
|
||||
extern void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList,
|
||||
AttrNumber attnum, char depType);
|
||||
extern List * GetDependentRelationsWithSequence(Oid seqId, char depType);
|
||||
extern List * GetDependentFunctionsWithRelation(Oid relationId);
|
||||
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
|
||||
extern void SetLocalEnableMetadataSync(bool state);
|
||||
|
|
|
@ -520,5 +520,61 @@ show citus.node_conninfo;
|
|||
|
||||
-- Should work again
|
||||
ALTER TABLE test ADD COLUMN e INT;
|
||||
-- show that we allow providing "host" param via citus.node_conninfo
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- fails due to invalid host
|
||||
SELECT COUNT(*)>=0 FROM test;
|
||||
WARNING: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
|
||||
ERROR: connection to the remote node postgres@localhost:xxxxx failed with the following error: could not parse network address "localhost": Name or service not known
|
||||
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
|
||||
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
|
||||
SELECT COUNT(*)>=0 FROM test;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- restore original nodenames into pg_dist_node
|
||||
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
|
||||
-- reset it
|
||||
ALTER SYSTEM RESET citus.node_conninfo;
|
||||
select pg_reload_conf();
|
||||
pg_reload_conf
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
select pg_sleep(0.1); -- wait for config reload to apply
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP SCHEMA node_conninfo_reload CASCADE;
|
||||
NOTICE: drop cascades to table test
|
||||
|
|
|
@ -205,4 +205,30 @@ show citus.node_conninfo;
|
|||
-- Should work again
|
||||
ALTER TABLE test ADD COLUMN e INT;
|
||||
|
||||
-- show that we allow providing "host" param via citus.node_conninfo
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=nosuchhost';
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.1);
|
||||
|
||||
-- fails due to invalid host
|
||||
SELECT COUNT(*)>=0 FROM test;
|
||||
|
||||
SELECT array_agg(nodeid) as updated_nodeids from pg_dist_node WHERE nodename = 'localhost' \gset
|
||||
UPDATE pg_dist_node SET nodename = '127.0.0.1' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
|
||||
|
||||
ALTER SYSTEM SET citus.node_conninfo = 'sslmode=require host=localhost';
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.1);
|
||||
|
||||
-- works when hostaddr is specified in pg_dist_node after providing host in citus.node_conninfo
|
||||
SELECT COUNT(*)>=0 FROM test;
|
||||
|
||||
-- restore original nodenames into pg_dist_node
|
||||
UPDATE pg_dist_node SET nodename = 'localhost' WHERE nodeid = ANY(:'updated_nodeids'::int[]);
|
||||
|
||||
-- reset it
|
||||
ALTER SYSTEM RESET citus.node_conninfo;
|
||||
select pg_reload_conf();
|
||||
select pg_sleep(0.1); -- wait for config reload to apply
|
||||
|
||||
DROP SCHEMA node_conninfo_reload CASCADE;
|
||||
|
|
Loading…
Reference in New Issue