pull/8363/merge
Muhammad Usama 2025-12-05 15:56:06 +03:00 committed by GitHub
commit 3490084127
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 535 additions and 48 deletions

View File

@ -154,7 +154,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
static char * ColocationGroupDeleteCommand(uint32 colocationId);
static char * RemoteSchemaIdExpressionById(Oid schemaId);
static char * RemoteSchemaIdExpressionByName(char *schemaName);
static char * RemoteTypeIdExpression(Oid typeId);
static char * GetRemoteTypeName(Oid typeId);
static char * GetRemoteTypeNamespace(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
static char * RemoteTableIdExpression(Oid relationId);
@ -4192,13 +4193,53 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio
{
StringInfo insertColocationCommand = makeStringInfo();
/*
* Get type name and schema separately to defer type resolution.
* This approach matches how SendColocationMetadataCommands handles types.
*/
char *typeName = GetRemoteTypeName(distributionColumnType);
char *typeSchemaName = GetRemoteTypeNamespace(distributionColumnType);
appendStringInfo(insertColocationCommand,
"SELECT citus_internal.add_colocation_metadata("
"%d, %d, %d, %s, %s)",
"WITH colocation_data("
"colocationid, shardcount, replicationfactor, "
"typeschema, typename, collationid) "
"AS (VALUES (%d, %d, %d, ",
colocationId,
shardCount,
replicationFactor,
RemoteTypeIdExpression(distributionColumnType),
replicationFactor);
if (typeSchemaName != NULL && typeName != NULL)
{
/* Use quote_identifier so the schema name can be cast to regnamespace */
appendStringInfo(insertColocationCommand,
"%s, %s, ",
quote_literal_cstr(quote_identifier(typeSchemaName)),
quote_literal_cstr(typeName));
}
else if (typeName != NULL)
{
appendStringInfo(insertColocationCommand,
"NULL, %s, ",
quote_literal_cstr(typeName));
}
else
{
appendStringInfo(insertColocationCommand,
"NULL, NULL, ");
}
appendStringInfo(insertColocationCommand,
"%s)) "
"SELECT citus_internal.add_colocation_metadata("
"colocationid, shardcount, replicationfactor, "
"coalesce(t.oid, 0), collationid) "
"FROM colocation_data "
"LEFT JOIN pg_type t ON ("
"typename = t.typname "
"AND (typeschema IS NULL OR "
"t.typnamespace = "
"(SELECT oid FROM pg_namespace WHERE nspname = typeschema)))",
RemoteCollationIdExpression(distributionColumnCollation));
return insertColocationCommand->data;
@ -4206,37 +4247,61 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio
/*
* RemoteTypeIdExpression returns an expression in text form that can
* be used to obtain the OID of a type on a different node when included
* in a query string.
* GetRemoteTypeName returns the unqualified name of a type.
* Returns NULL for InvalidOid.
*/
static char *
RemoteTypeIdExpression(Oid typeId)
GetRemoteTypeName(Oid typeId)
{
/* by default, use 0 (InvalidOid) */
char *expression = "0";
/* we also have pg_dist_colocation entries for reference tables */
if (typeId != InvalidOid)
if (typeId == InvalidOid)
{
char *typeName = format_type_extended(typeId, -1,
FORMAT_TYPE_FORCE_QUALIFY |
FORMAT_TYPE_ALLOW_INVALID);
/* format_type_extended returns ??? in case of an unknown type */
if (strcmp(typeName, "???") != 0)
{
StringInfo regtypeExpression = makeStringInfo();
appendStringInfo(regtypeExpression,
"%s::regtype",
quote_literal_cstr(typeName));
expression = regtypeExpression->data;
}
return NULL;
}
return expression;
HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId));
if (!HeapTupleIsValid(typeTuple))
{
return NULL;
}
Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
char *typeName = pstrdup(NameStr(typeForm->typname));
ReleaseSysCache(typeTuple);
return typeName;
}
/*
* GetRemoteTypeNamespace returns the schema name of a type.
* Returns NULL for InvalidOid or types in pg_catalog.
*/
static char *
GetRemoteTypeNamespace(Oid typeId)
{
if (typeId == InvalidOid)
{
return NULL;
}
HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId));
if (!HeapTupleIsValid(typeTuple))
{
return NULL;
}
Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
Oid typeNamespace = typeForm->typnamespace;
ReleaseSysCache(typeTuple);
/* Don't include schema for pg_catalog types for backward compatibility */
if (typeNamespace == PG_CATALOG_NAMESPACE)
{
return NULL;
}
return get_namespace_name(typeNamespace);
}
@ -4953,19 +5018,52 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
StringInfo colocationGroupCreateCommand = makeStringInfo();
appendStringInfo(colocationGroupCreateCommand,
"WITH colocation_group_data (colocationid, shardcount, "
"replicationfactor, distributioncolumntype, "
"replicationfactor, distributioncolumntypeschema, "
"distributioncolumntypename, "
"distributioncolumncollationname, "
"distributioncolumncollationschema) AS (VALUES ");
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(nextTuple);
/*
* Get the type name and schema separately to defer type resolution.
* This is necessary when the type (e.g., a domain) is defined in a
* non-public schema that may not exist on the worker yet.
*/
char *typeName =
GetRemoteTypeName(colocationForm->distributioncolumntype);
char *typeSchemaName =
GetRemoteTypeNamespace(colocationForm->distributioncolumntype);
appendStringInfo(colocationGroupCreateCommand,
"(%d, %d, %d, %s, ",
"(%d, %d, %d, ",
colocationForm->colocationid,
colocationForm->shardcount,
colocationForm->replicationfactor,
RemoteTypeIdExpression(colocationForm->distributioncolumntype));
colocationForm->replicationfactor);
/* Add type schema and name */
if (typeSchemaName != NULL && typeName != NULL)
{
/* Use quote_identifier so the schema name can be cast to regnamespace */
appendStringInfo(colocationGroupCreateCommand,
"%s, %s, ",
quote_literal_cstr(quote_identifier(typeSchemaName)),
quote_literal_cstr(typeName));
}
else if (typeName != NULL)
{
/* Type is in pg_catalog or no schema qualifier needed */
appendStringInfo(colocationGroupCreateCommand,
"NULL, %s, ",
quote_literal_cstr(typeName));
}
else
{
/* InvalidOid or unknown type */
appendStringInfo(colocationGroupCreateCommand,
"NULL, NULL, ");
}
/*
* For collations, include the names in the VALUES section and then
@ -5001,14 +5099,25 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
"NULL, NULL)");
}
/*
* Use LEFT JOIN with pg_type to resolve the type OID at runtime.
* This defers type resolution until execution on the worker, allowing
* the type and its schema to be created first by dependency commands.
*/
appendStringInfo(colocationGroupCreateCommand,
") SELECT citus_internal.add_colocation_metadata("
"colocationid, shardcount, replicationfactor, "
"distributioncolumntype, coalesce(c.oid, 0)) "
"FROM colocation_group_data d LEFT JOIN pg_collation c "
"coalesce(t.oid, 0), coalesce(c.oid, 0)) "
"FROM colocation_group_data d "
"LEFT JOIN pg_type t ON ("
"d.distributioncolumntypename = t.typname "
"AND (d.distributioncolumntypeschema IS NULL OR "
"t.typnamespace = (SELECT oid FROM pg_namespace WHERE "
"nspname = d.distributioncolumntypeschema))) "
"LEFT JOIN pg_collation c "
"ON (d.distributioncolumncollationname = c.collname "
"AND d.distributioncolumncollationschema::regnamespace"
" = c.collnamespace)");
"AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE "
"nspname = d.distributioncolumncollationschema))");
List *commandList = list_make1(colocationGroupCreateCommand->data);
SendOrCollectCommandListToActivatedNodes(context, commandList);

View File

@ -175,8 +175,8 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
@ -240,7 +240,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
@ -301,7 +301,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
@ -369,7 +369,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
@ -430,7 +430,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
@ -523,7 +523,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(5 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
(1 row)
@ -662,7 +662,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
(5 rows)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
(1 row)
@ -2023,8 +2023,8 @@ SELECT unnest(activate_node_snapshot()) order by 1;
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;

View File

@ -0,0 +1,231 @@
--
-- MULTI_METADATA_SYNC_DOMAIN
--
-- Test that metadata sync works correctly with DOMAIN types in non-public schemas
--
-- Create the initial cluster setup
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 9000000;
-- Store current sequence values and calculate restart values
SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset
-- Remove the second worker node to start with
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- Test 1: Domain in a non-public schema with special characters
CREATE SCHEMA "prepared statements";
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert some test data
INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200);
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- cleanup before next test
DROP TABLE dist_domain_nonpublic CASCADE;
DROP DOMAIN "prepared statements".test_key CASCADE;
DROP SCHEMA "prepared statements" CASCADE;
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- Test 2: Domain in public schema
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
CREATE TABLE dist_domain_public(a positive_int, b text);
SELECT create_distributed_table('dist_domain_public', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert some test data
INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two');
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- cleanup before next test
DROP TABLE dist_domain_public CASCADE;
DROP DOMAIN public.positive_int CASCADE;
SELECT citus_remove_node('localhost', :worker_2_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
-- Test 3: Domain in a regular schema
CREATE SCHEMA test_schema;
CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$');
CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address);
SELECT create_distributed_table('dist_domain_regular_schema', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert some test data
INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org');
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE SCHEMA "prepared statements";
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
CREATE TABLE dist_domain_public(a positive_int, b text);
SELECT create_distributed_table('dist_domain_public', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- Insert some test data
INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve');
INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org');
INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120);
-- Verify the colocation metadata exists
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
FROM pg_dist_colocation
WHERE distributioncolumntype IN (
'"prepared statements".test_key'::regtype,
'public.positive_int'::regtype,
'test_schema.email_address'::regtype
)
ORDER BY distributioncolumntype::regtype::text;
shardcount | replicationfactor | distributioncolumntype
---------------------------------------------------------------------
4 | 1 | "prepared statements".test_key
4 | 1 | positive_int
(2 rows)
-- Verify metadata was synced correctly by checking colocation on the worker
\c - - - :worker_2_port
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
FROM pg_dist_colocation
WHERE distributioncolumntype IN (
'"prepared statements".test_key'::regtype,
'public.positive_int'::regtype,
'test_schema.email_address'::regtype
)
ORDER BY distributioncolumntype::regtype::text;
shardcount | replicationfactor | distributioncolumntype
---------------------------------------------------------------------
4 | 1 | positive_int
(1 row)
-- Verify the domains exist on the worker
SELECT typname, typnamespace::regnamespace
FROM pg_type
WHERE typname IN ('test_key', 'positive_int', 'email_address')
ORDER BY typname;
typname | typnamespace
---------------------------------------------------------------------
email_address | test_schema
positive_int | public
test_key | "prepared statements"
(3 rows)
-- Back to coordinator for cleanup
\c - - - :master_port
-- Test that queries still work after re-adding the worker
SELECT * FROM dist_domain_nonpublic ORDER BY a;
a | b
---------------------------------------------------------------------
test-3 | 110
test-4 | 120
(2 rows)
SELECT * FROM dist_domain_public ORDER BY a;
a | b
---------------------------------------------------------------------
11 | eleven
12 | twelve
(2 rows)
SELECT * FROM dist_domain_regular_schema ORDER BY id;
id | email
---------------------------------------------------------------------
1 | user@example.com
2 | admin@test.org
11 | user@example.com
12 | admin@test.org
(4 rows)
-- Test inserts still work
INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com');
INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com');
SELECT * FROM dist_domain_nonpublic ORDER BY a;
a | b
---------------------------------------------------------------------
test-3 | 110
test-4 | 120
(2 rows)
SELECT * FROM dist_domain_public ORDER BY a;
a | b
---------------------------------------------------------------------
11 | eleven
12 | twelve
(2 rows)
SELECT * FROM dist_domain_regular_schema ORDER BY id;
id | email
---------------------------------------------------------------------
1 | user@example.com
1 | test@domain.com
2 | admin@test.org
2 | info@citusdata.com
11 | user@example.com
12 | admin@test.org
(6 rows)
-- Cleanup
DROP TABLE dist_domain_nonpublic CASCADE;
DROP TABLE dist_domain_public CASCADE;
DROP TABLE dist_domain_regular_schema CASCADE;
DROP DOMAIN "prepared statements".test_key CASCADE;
DROP DOMAIN public.positive_int CASCADE;
DROP DOMAIN test_schema.email_address CASCADE;
DROP SCHEMA "prepared statements" CASCADE;
DROP SCHEMA test_schema CASCADE;

View File

@ -269,9 +269,11 @@ test: multi_drop_extension
# ----------
# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers
# multi_metadata_sync_domain tests that metadata sync works correctly with DOMAIN types in non-public schemas
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
# ----------
test: multi_metadata_sync
test: multi_metadata_sync_domain
test: multi_unsupported_worker_operations
test: grant_on_function_propagation

View File

@ -0,0 +1,145 @@
--
-- MULTI_METADATA_SYNC_DOMAIN
--
-- Test that metadata sync works correctly with DOMAIN types in non-public schemas
--
-- Create the initial cluster setup
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 9000000;
-- Store current sequence values and calculate restart values
SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset
-- Remove the second worker node to start with
SELECT citus_remove_node('localhost', :worker_2_port);
-- Test 1: Domain in a non-public schema with special characters
CREATE SCHEMA "prepared statements";
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
-- Insert some test data
INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200);
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- cleanup before next test
DROP TABLE dist_domain_nonpublic CASCADE;
DROP DOMAIN "prepared statements".test_key CASCADE;
DROP SCHEMA "prepared statements" CASCADE;
SELECT citus_remove_node('localhost', :worker_2_port);
-- Test 2: Domain in public schema
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
CREATE TABLE dist_domain_public(a positive_int, b text);
SELECT create_distributed_table('dist_domain_public', 'a');
-- Insert some test data
INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two');
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
-- cleanup before next test
DROP TABLE dist_domain_public CASCADE;
DROP DOMAIN public.positive_int CASCADE;
SELECT citus_remove_node('localhost', :worker_2_port);
-- Test 3: Domain in a regular schema
CREATE SCHEMA test_schema;
CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$');
CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address);
SELECT create_distributed_table('dist_domain_regular_schema', 'id');
-- Insert some test data
INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org');
-- Reset sequences to avoid conflicts with other tests
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
-- Now add the worker back - this is should not fail
-- The metadata sync should now work
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
CREATE SCHEMA "prepared statements";
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
CREATE TABLE dist_domain_public(a positive_int, b text);
SELECT create_distributed_table('dist_domain_public', 'a');
-- Insert some test data
INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve');
INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org');
INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120);
-- Verify the colocation metadata exists
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
FROM pg_dist_colocation
WHERE distributioncolumntype IN (
'"prepared statements".test_key'::regtype,
'public.positive_int'::regtype,
'test_schema.email_address'::regtype
)
ORDER BY distributioncolumntype::regtype::text;
-- Verify metadata was synced correctly by checking colocation on the worker
\c - - - :worker_2_port
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
FROM pg_dist_colocation
WHERE distributioncolumntype IN (
'"prepared statements".test_key'::regtype,
'public.positive_int'::regtype,
'test_schema.email_address'::regtype
)
ORDER BY distributioncolumntype::regtype::text;
-- Verify the domains exist on the worker
SELECT typname, typnamespace::regnamespace
FROM pg_type
WHERE typname IN ('test_key', 'positive_int', 'email_address')
ORDER BY typname;
-- Back to coordinator for cleanup
\c - - - :master_port
-- Test that queries still work after re-adding the worker
SELECT * FROM dist_domain_nonpublic ORDER BY a;
SELECT * FROM dist_domain_public ORDER BY a;
SELECT * FROM dist_domain_regular_schema ORDER BY id;
-- Test inserts still work
INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com');
INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com');
SELECT * FROM dist_domain_nonpublic ORDER BY a;
SELECT * FROM dist_domain_public ORDER BY a;
SELECT * FROM dist_domain_regular_schema ORDER BY id;
-- Cleanup
DROP TABLE dist_domain_nonpublic CASCADE;
DROP TABLE dist_domain_public CASCADE;
DROP TABLE dist_domain_regular_schema CASCADE;
DROP DOMAIN "prepared statements".test_key CASCADE;
DROP DOMAIN public.positive_int CASCADE;
DROP DOMAIN test_schema.email_address CASCADE;
DROP SCHEMA "prepared statements" CASCADE;
DROP SCHEMA test_schema CASCADE;