diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 391444856..81f6a0604 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -154,7 +154,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteSchemaIdExpressionById(Oid schemaId); static char * RemoteSchemaIdExpressionByName(char *schemaName); -static char * RemoteTypeIdExpression(Oid typeId); +static char * GetRemoteTypeName(Oid typeId); +static char * GetRemoteTypeNamespace(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); static char * RemoteTableIdExpression(Oid relationId); @@ -4192,13 +4193,53 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio { StringInfo insertColocationCommand = makeStringInfo(); + /* + * Get type name and schema separately to defer type resolution. + * This approach matches how SendColocationMetadataCommands handles types. + */ + char *typeName = GetRemoteTypeName(distributionColumnType); + char *typeSchemaName = GetRemoteTypeNamespace(distributionColumnType); + appendStringInfo(insertColocationCommand, - "SELECT citus_internal.add_colocation_metadata(" - "%d, %d, %d, %s, %s)", + "WITH colocation_data(" + "colocationid, shardcount, replicationfactor, " + "typeschema, typename, collationid) " + "AS (VALUES (%d, %d, %d, ", colocationId, shardCount, - replicationFactor, - RemoteTypeIdExpression(distributionColumnType), + replicationFactor); + + if (typeSchemaName != NULL && typeName != NULL) + { + /* Use quote_identifier so the schema name can be cast to regnamespace */ + appendStringInfo(insertColocationCommand, + "%s, %s, ", + quote_literal_cstr(quote_identifier(typeSchemaName)), + quote_literal_cstr(typeName)); + } + else if (typeName != NULL) + { + appendStringInfo(insertColocationCommand, + "NULL, %s, ", + quote_literal_cstr(typeName)); + } + else + { + appendStringInfo(insertColocationCommand, + "NULL, NULL, "); + } + + appendStringInfo(insertColocationCommand, + "%s)) " + "SELECT citus_internal.add_colocation_metadata(" + "colocationid, shardcount, replicationfactor, " + "coalesce(t.oid, 0), collationid) " + "FROM colocation_data " + "LEFT JOIN pg_type t ON (" + "typename = t.typname " + "AND (typeschema IS NULL OR " + "t.typnamespace = " + "(SELECT oid FROM pg_namespace WHERE nspname = typeschema)))", RemoteCollationIdExpression(distributionColumnCollation)); return insertColocationCommand->data; @@ -4206,37 +4247,61 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio /* - * RemoteTypeIdExpression returns an expression in text form that can - * be used to obtain the OID of a type on a different node when included - * in a query string. + * GetRemoteTypeName returns the unqualified name of a type. + * Returns NULL for InvalidOid. */ static char * -RemoteTypeIdExpression(Oid typeId) +GetRemoteTypeName(Oid typeId) { - /* by default, use 0 (InvalidOid) */ - char *expression = "0"; - - /* we also have pg_dist_colocation entries for reference tables */ - if (typeId != InvalidOid) + if (typeId == InvalidOid) { - char *typeName = format_type_extended(typeId, -1, - FORMAT_TYPE_FORCE_QUALIFY | - FORMAT_TYPE_ALLOW_INVALID); - - /* format_type_extended returns ??? in case of an unknown type */ - if (strcmp(typeName, "???") != 0) - { - StringInfo regtypeExpression = makeStringInfo(); - - appendStringInfo(regtypeExpression, - "%s::regtype", - quote_literal_cstr(typeName)); - - expression = regtypeExpression->data; - } + return NULL; } - return expression; + HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId)); + if (!HeapTupleIsValid(typeTuple)) + { + return NULL; + } + + Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple); + char *typeName = pstrdup(NameStr(typeForm->typname)); + + ReleaseSysCache(typeTuple); + return typeName; +} + + +/* + * GetRemoteTypeNamespace returns the schema name of a type. + * Returns NULL for InvalidOid or types in pg_catalog. + */ +static char * +GetRemoteTypeNamespace(Oid typeId) +{ + if (typeId == InvalidOid) + { + return NULL; + } + + HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId)); + if (!HeapTupleIsValid(typeTuple)) + { + return NULL; + } + + Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple); + Oid typeNamespace = typeForm->typnamespace; + + ReleaseSysCache(typeTuple); + + /* Don't include schema for pg_catalog types for backward compatibility */ + if (typeNamespace == PG_CATALOG_NAMESPACE) + { + return NULL; + } + + return get_namespace_name(typeNamespace); } @@ -4953,19 +5018,52 @@ SendColocationMetadataCommands(MetadataSyncContext *context) StringInfo colocationGroupCreateCommand = makeStringInfo(); appendStringInfo(colocationGroupCreateCommand, "WITH colocation_group_data (colocationid, shardcount, " - "replicationfactor, distributioncolumntype, " + "replicationfactor, distributioncolumntypeschema, " + "distributioncolumntypename, " "distributioncolumncollationname, " "distributioncolumncollationschema) AS (VALUES "); Form_pg_dist_colocation colocationForm = (Form_pg_dist_colocation) GETSTRUCT(nextTuple); + /* + * Get the type name and schema separately to defer type resolution. + * This is necessary when the type (e.g., a domain) is defined in a + * non-public schema that may not exist on the worker yet. + */ + char *typeName = + GetRemoteTypeName(colocationForm->distributioncolumntype); + char *typeSchemaName = + GetRemoteTypeNamespace(colocationForm->distributioncolumntype); + appendStringInfo(colocationGroupCreateCommand, - "(%d, %d, %d, %s, ", + "(%d, %d, %d, ", colocationForm->colocationid, colocationForm->shardcount, - colocationForm->replicationfactor, - RemoteTypeIdExpression(colocationForm->distributioncolumntype)); + colocationForm->replicationfactor); + + /* Add type schema and name */ + if (typeSchemaName != NULL && typeName != NULL) + { + /* Use quote_identifier so the schema name can be cast to regnamespace */ + appendStringInfo(colocationGroupCreateCommand, + "%s, %s, ", + quote_literal_cstr(quote_identifier(typeSchemaName)), + quote_literal_cstr(typeName)); + } + else if (typeName != NULL) + { + /* Type is in pg_catalog or no schema qualifier needed */ + appendStringInfo(colocationGroupCreateCommand, + "NULL, %s, ", + quote_literal_cstr(typeName)); + } + else + { + /* InvalidOid or unknown type */ + appendStringInfo(colocationGroupCreateCommand, + "NULL, NULL, "); + } /* * For collations, include the names in the VALUES section and then @@ -5001,14 +5099,25 @@ SendColocationMetadataCommands(MetadataSyncContext *context) "NULL, NULL)"); } + /* + * Use LEFT JOIN with pg_type to resolve the type OID at runtime. + * This defers type resolution until execution on the worker, allowing + * the type and its schema to be created first by dependency commands. + */ appendStringInfo(colocationGroupCreateCommand, ") SELECT citus_internal.add_colocation_metadata(" "colocationid, shardcount, replicationfactor, " - "distributioncolumntype, coalesce(c.oid, 0)) " - "FROM colocation_group_data d LEFT JOIN pg_collation c " + "coalesce(t.oid, 0), coalesce(c.oid, 0)) " + "FROM colocation_group_data d " + "LEFT JOIN pg_type t ON (" + "d.distributioncolumntypename = t.typname " + "AND (d.distributioncolumntypeschema IS NULL OR " + "t.typnamespace = (SELECT oid FROM pg_namespace WHERE " + "nspname = d.distributioncolumntypeschema))) " + "LEFT JOIN pg_collation c " "ON (d.distributioncolumncollationname = c.collname " - "AND d.distributioncolumncollationschema::regnamespace" - " = c.collnamespace)"); + "AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE " + "nspname = d.distributioncolumncollationschema))"); List *commandList = list_make1(colocationGroupCreateCommand->data); SendOrCollectCommandListToActivatedNodes(context, commandList); diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index e9b5b587e..ae47e2d8e 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -175,8 +175,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; @@ -240,7 +240,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; @@ -301,7 +301,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; @@ -369,7 +369,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; @@ -430,7 +430,7 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; @@ -523,7 +523,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; (5 rows) SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; - logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted + logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f (1 row) @@ -662,7 +662,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid; (5 rows) SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text; - logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted + logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted --------------------------------------------------------------------- mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f (1 row) @@ -2023,8 +2023,8 @@ SELECT unnest(activate_node_snapshot()) order by 1; UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2 UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2 - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) - WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) + WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema)) WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; diff --git a/src/test/regress/expected/multi_metadata_sync_domain.out b/src/test/regress/expected/multi_metadata_sync_domain.out new file mode 100644 index 000000000..816e41995 --- /dev/null +++ b/src/test/regress/expected/multi_metadata_sync_domain.out @@ -0,0 +1,231 @@ +-- +-- MULTI_METADATA_SYNC_DOMAIN +-- +-- Test that metadata sync works correctly with DOMAIN types in non-public schemas +-- +-- Create the initial cluster setup +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 9000000; +-- Store current sequence values and calculate restart values +SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset +-- Remove the second worker node to start with +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- Test 1: Domain in a non-public schema with special characters +CREATE SCHEMA "prepared statements"; +CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$'); +CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int); +SELECT create_distributed_table('dist_domain_nonpublic', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert some test data +INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200); +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- cleanup before next test +DROP TABLE dist_domain_nonpublic CASCADE; +DROP DOMAIN "prepared statements".test_key CASCADE; +DROP SCHEMA "prepared statements" CASCADE; +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- Test 2: Domain in public schema +CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0); +CREATE TABLE dist_domain_public(a positive_int, b text); +SELECT create_distributed_table('dist_domain_public', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert some test data +INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two'); +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- cleanup before next test +DROP TABLE dist_domain_public CASCADE; +DROP DOMAIN public.positive_int CASCADE; +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- Test 3: Domain in a regular schema +CREATE SCHEMA test_schema; +CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$'); +CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address); +SELECT create_distributed_table('dist_domain_regular_schema', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert some test data +INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org'); +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE SCHEMA "prepared statements"; +CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$'); +CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int); +SELECT create_distributed_table('dist_domain_nonpublic', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0); +CREATE TABLE dist_domain_public(a positive_int, b text); +SELECT create_distributed_table('dist_domain_public', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- Insert some test data +INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve'); +INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org'); +INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120); +-- Verify the colocation metadata exists +SELECT shardcount, replicationfactor, distributioncolumntype::regtype +FROM pg_dist_colocation +WHERE distributioncolumntype IN ( + '"prepared statements".test_key'::regtype, + 'public.positive_int'::regtype, + 'test_schema.email_address'::regtype +) +ORDER BY distributioncolumntype::regtype::text; + shardcount | replicationfactor | distributioncolumntype +--------------------------------------------------------------------- + 4 | 1 | "prepared statements".test_key + 4 | 1 | positive_int +(2 rows) + +-- Verify metadata was synced correctly by checking colocation on the worker +\c - - - :worker_2_port +SELECT shardcount, replicationfactor, distributioncolumntype::regtype +FROM pg_dist_colocation +WHERE distributioncolumntype IN ( + '"prepared statements".test_key'::regtype, + 'public.positive_int'::regtype, + 'test_schema.email_address'::regtype +) +ORDER BY distributioncolumntype::regtype::text; + shardcount | replicationfactor | distributioncolumntype +--------------------------------------------------------------------- + 4 | 1 | positive_int +(1 row) + +-- Verify the domains exist on the worker +SELECT typname, typnamespace::regnamespace +FROM pg_type +WHERE typname IN ('test_key', 'positive_int', 'email_address') +ORDER BY typname; + typname | typnamespace +--------------------------------------------------------------------- + email_address | test_schema + positive_int | public + test_key | "prepared statements" +(3 rows) + +-- Back to coordinator for cleanup +\c - - - :master_port +-- Test that queries still work after re-adding the worker +SELECT * FROM dist_domain_nonpublic ORDER BY a; + a | b +--------------------------------------------------------------------- + test-3 | 110 + test-4 | 120 +(2 rows) + +SELECT * FROM dist_domain_public ORDER BY a; + a | b +--------------------------------------------------------------------- + 11 | eleven + 12 | twelve +(2 rows) + +SELECT * FROM dist_domain_regular_schema ORDER BY id; + id | email +--------------------------------------------------------------------- + 1 | user@example.com + 2 | admin@test.org + 11 | user@example.com + 12 | admin@test.org +(4 rows) + +-- Test inserts still work +INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com'); +INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com'); +SELECT * FROM dist_domain_nonpublic ORDER BY a; + a | b +--------------------------------------------------------------------- + test-3 | 110 + test-4 | 120 +(2 rows) + +SELECT * FROM dist_domain_public ORDER BY a; + a | b +--------------------------------------------------------------------- + 11 | eleven + 12 | twelve +(2 rows) + +SELECT * FROM dist_domain_regular_schema ORDER BY id; + id | email +--------------------------------------------------------------------- + 1 | user@example.com + 1 | test@domain.com + 2 | admin@test.org + 2 | info@citusdata.com + 11 | user@example.com + 12 | admin@test.org +(6 rows) + +-- Cleanup +DROP TABLE dist_domain_nonpublic CASCADE; +DROP TABLE dist_domain_public CASCADE; +DROP TABLE dist_domain_regular_schema CASCADE; +DROP DOMAIN "prepared statements".test_key CASCADE; +DROP DOMAIN public.positive_int CASCADE; +DROP DOMAIN test_schema.email_address CASCADE; +DROP SCHEMA "prepared statements" CASCADE; +DROP SCHEMA test_schema CASCADE; diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 59fdae959..aeec907b2 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -269,9 +269,11 @@ test: multi_drop_extension # ---------- # multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers +# multi_metadata_sync_domain tests that metadata sync works correctly with DOMAIN types in non-public schemas # multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers # ---------- test: multi_metadata_sync +test: multi_metadata_sync_domain test: multi_unsupported_worker_operations test: grant_on_function_propagation diff --git a/src/test/regress/sql/multi_metadata_sync_domain.sql b/src/test/regress/sql/multi_metadata_sync_domain.sql new file mode 100644 index 000000000..9d82cbf34 --- /dev/null +++ b/src/test/regress/sql/multi_metadata_sync_domain.sql @@ -0,0 +1,145 @@ +-- +-- MULTI_METADATA_SYNC_DOMAIN +-- +-- Test that metadata sync works correctly with DOMAIN types in non-public schemas +-- + +-- Create the initial cluster setup +SET citus.shard_replication_factor TO 1; +SET citus.next_shard_id TO 9000000; +-- Store current sequence values and calculate restart values +SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset +SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset +-- Remove the second worker node to start with +SELECT citus_remove_node('localhost', :worker_2_port); + +-- Test 1: Domain in a non-public schema with special characters +CREATE SCHEMA "prepared statements"; +CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$'); +CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int); +SELECT create_distributed_table('dist_domain_nonpublic', 'a'); + +-- Insert some test data +INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200); + +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +-- cleanup before next test +DROP TABLE dist_domain_nonpublic CASCADE; +DROP DOMAIN "prepared statements".test_key CASCADE; +DROP SCHEMA "prepared statements" CASCADE; + +SELECT citus_remove_node('localhost', :worker_2_port); + +-- Test 2: Domain in public schema +CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0); +CREATE TABLE dist_domain_public(a positive_int, b text); +SELECT create_distributed_table('dist_domain_public', 'a'); + +-- Insert some test data +INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two'); + +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +-- cleanup before next test +DROP TABLE dist_domain_public CASCADE; +DROP DOMAIN public.positive_int CASCADE; + +SELECT citus_remove_node('localhost', :worker_2_port); + +-- Test 3: Domain in a regular schema +CREATE SCHEMA test_schema; +CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$'); +CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address); +SELECT create_distributed_table('dist_domain_regular_schema', 'id'); + +-- Insert some test data +INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org'); + + +-- Reset sequences to avoid conflicts with other tests +ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id; +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id; +-- Now add the worker back - this is should not fail +-- The metadata sync should now work +SELECT 1 FROM citus_add_node('localhost', :worker_2_port); + +CREATE SCHEMA "prepared statements"; +CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$'); +CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int); +SELECT create_distributed_table('dist_domain_nonpublic', 'a'); + +CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0); +CREATE TABLE dist_domain_public(a positive_int, b text); +SELECT create_distributed_table('dist_domain_public', 'a'); + +-- Insert some test data +INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve'); +INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org'); +INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120); + +-- Verify the colocation metadata exists +SELECT shardcount, replicationfactor, distributioncolumntype::regtype +FROM pg_dist_colocation +WHERE distributioncolumntype IN ( + '"prepared statements".test_key'::regtype, + 'public.positive_int'::regtype, + 'test_schema.email_address'::regtype +) +ORDER BY distributioncolumntype::regtype::text; + +-- Verify metadata was synced correctly by checking colocation on the worker +\c - - - :worker_2_port +SELECT shardcount, replicationfactor, distributioncolumntype::regtype +FROM pg_dist_colocation +WHERE distributioncolumntype IN ( + '"prepared statements".test_key'::regtype, + 'public.positive_int'::regtype, + 'test_schema.email_address'::regtype +) +ORDER BY distributioncolumntype::regtype::text; + +-- Verify the domains exist on the worker +SELECT typname, typnamespace::regnamespace +FROM pg_type +WHERE typname IN ('test_key', 'positive_int', 'email_address') +ORDER BY typname; + +-- Back to coordinator for cleanup +\c - - - :master_port + +-- Test that queries still work after re-adding the worker +SELECT * FROM dist_domain_nonpublic ORDER BY a; +SELECT * FROM dist_domain_public ORDER BY a; +SELECT * FROM dist_domain_regular_schema ORDER BY id; + +-- Test inserts still work +INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com'); +INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com'); + +SELECT * FROM dist_domain_nonpublic ORDER BY a; +SELECT * FROM dist_domain_public ORDER BY a; +SELECT * FROM dist_domain_regular_schema ORDER BY id; + +-- Cleanup + +DROP TABLE dist_domain_nonpublic CASCADE; +DROP TABLE dist_domain_public CASCADE; +DROP TABLE dist_domain_regular_schema CASCADE; + +DROP DOMAIN "prepared statements".test_key CASCADE; +DROP DOMAIN public.positive_int CASCADE; +DROP DOMAIN test_schema.email_address CASCADE; + +DROP SCHEMA "prepared statements" CASCADE; +DROP SCHEMA test_schema CASCADE;