mirror of https://github.com/citusdata/citus.git
Fix metadata sync failure with domain types in non-public schemas
When adding a worker node to a Citus cluster, metadata synchronization
would fail if any distributed table used a DOMAIN type defined in a
non-public schema as its distribution column. The error occurred because
the colocation metadata command tried to cast a schema-qualified type
name to regtype before the schema existed on the worker.
Problem:
During metadata synchronization, SendColocationMetadataCommands() would
generate SQL like:
WITH colocation_group_data (..., distributioncolumntype, ...) AS (
VALUES (..., '"prepared statements".test_key'::regtype, ...)
)
The ::regtype cast happened immediately in the VALUES clause, causing
PostgreSQL to try resolving the type before the query executed. Since
SendColocationMetadataCommands() runs before SendDependencyCreationCommands(),
the schema and domain didn't exist on the worker yet, resulting in:
ERROR: schema "prepared statements" does not exist
Solution:
Modified the metadata sync to defer type resolution using a LEFT JOIN
pattern, similar to how collations are handled.
Test case and expected output updates are also part of the commit
muusama/8191
parent
662b7248db
commit
21ccbf7561
|
|
@ -154,7 +154,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
|
||||||
static char * ColocationGroupDeleteCommand(uint32 colocationId);
|
static char * ColocationGroupDeleteCommand(uint32 colocationId);
|
||||||
static char * RemoteSchemaIdExpressionById(Oid schemaId);
|
static char * RemoteSchemaIdExpressionById(Oid schemaId);
|
||||||
static char * RemoteSchemaIdExpressionByName(char *schemaName);
|
static char * RemoteSchemaIdExpressionByName(char *schemaName);
|
||||||
static char * RemoteTypeIdExpression(Oid typeId);
|
static char * GetRemoteTypeName(Oid typeId);
|
||||||
|
static char * GetRemoteTypeNamespace(Oid typeId);
|
||||||
static char * RemoteCollationIdExpression(Oid colocationId);
|
static char * RemoteCollationIdExpression(Oid colocationId);
|
||||||
static char * RemoteTableIdExpression(Oid relationId);
|
static char * RemoteTableIdExpression(Oid relationId);
|
||||||
|
|
||||||
|
|
@ -4192,13 +4193,53 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio
|
||||||
{
|
{
|
||||||
StringInfo insertColocationCommand = makeStringInfo();
|
StringInfo insertColocationCommand = makeStringInfo();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get type name and schema separately to defer type resolution.
|
||||||
|
* This approach matches how SendColocationMetadataCommands handles types.
|
||||||
|
*/
|
||||||
|
char *typeName = GetRemoteTypeName(distributionColumnType);
|
||||||
|
char *typeSchemaName = GetRemoteTypeNamespace(distributionColumnType);
|
||||||
|
|
||||||
appendStringInfo(insertColocationCommand,
|
appendStringInfo(insertColocationCommand,
|
||||||
"SELECT citus_internal.add_colocation_metadata("
|
"WITH colocation_data("
|
||||||
"%d, %d, %d, %s, %s)",
|
"colocationid, shardcount, replicationfactor, "
|
||||||
|
"typeschema, typename, collationid) "
|
||||||
|
"AS (VALUES (%d, %d, %d, ",
|
||||||
colocationId,
|
colocationId,
|
||||||
shardCount,
|
shardCount,
|
||||||
replicationFactor,
|
replicationFactor);
|
||||||
RemoteTypeIdExpression(distributionColumnType),
|
|
||||||
|
if (typeSchemaName != NULL && typeName != NULL)
|
||||||
|
{
|
||||||
|
/* Use quote_identifier so the schema name can be cast to regnamespace */
|
||||||
|
appendStringInfo(insertColocationCommand,
|
||||||
|
"%s, %s, ",
|
||||||
|
quote_literal_cstr(quote_identifier(typeSchemaName)),
|
||||||
|
quote_literal_cstr(typeName));
|
||||||
|
}
|
||||||
|
else if (typeName != NULL)
|
||||||
|
{
|
||||||
|
appendStringInfo(insertColocationCommand,
|
||||||
|
"NULL, %s, ",
|
||||||
|
quote_literal_cstr(typeName));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(insertColocationCommand,
|
||||||
|
"NULL, NULL, ");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(insertColocationCommand,
|
||||||
|
"%s)) "
|
||||||
|
"SELECT citus_internal.add_colocation_metadata("
|
||||||
|
"colocationid, shardcount, replicationfactor, "
|
||||||
|
"coalesce(t.oid, 0), collationid) "
|
||||||
|
"FROM colocation_data "
|
||||||
|
"LEFT JOIN pg_type t ON ("
|
||||||
|
"typename = t.typname "
|
||||||
|
"AND (typeschema IS NULL OR "
|
||||||
|
"t.typnamespace = "
|
||||||
|
"(SELECT oid FROM pg_namespace WHERE nspname = typeschema)))",
|
||||||
RemoteCollationIdExpression(distributionColumnCollation));
|
RemoteCollationIdExpression(distributionColumnCollation));
|
||||||
|
|
||||||
return insertColocationCommand->data;
|
return insertColocationCommand->data;
|
||||||
|
|
@ -4206,37 +4247,61 @@ ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicatio
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoteTypeIdExpression returns an expression in text form that can
|
* GetRemoteTypeName returns the unqualified name of a type.
|
||||||
* be used to obtain the OID of a type on a different node when included
|
* Returns NULL for InvalidOid.
|
||||||
* in a query string.
|
|
||||||
*/
|
*/
|
||||||
static char *
|
static char *
|
||||||
RemoteTypeIdExpression(Oid typeId)
|
GetRemoteTypeName(Oid typeId)
|
||||||
{
|
{
|
||||||
/* by default, use 0 (InvalidOid) */
|
if (typeId == InvalidOid)
|
||||||
char *expression = "0";
|
|
||||||
|
|
||||||
/* we also have pg_dist_colocation entries for reference tables */
|
|
||||||
if (typeId != InvalidOid)
|
|
||||||
{
|
{
|
||||||
char *typeName = format_type_extended(typeId, -1,
|
return NULL;
|
||||||
FORMAT_TYPE_FORCE_QUALIFY |
|
|
||||||
FORMAT_TYPE_ALLOW_INVALID);
|
|
||||||
|
|
||||||
/* format_type_extended returns ??? in case of an unknown type */
|
|
||||||
if (strcmp(typeName, "???") != 0)
|
|
||||||
{
|
|
||||||
StringInfo regtypeExpression = makeStringInfo();
|
|
||||||
|
|
||||||
appendStringInfo(regtypeExpression,
|
|
||||||
"%s::regtype",
|
|
||||||
quote_literal_cstr(typeName));
|
|
||||||
|
|
||||||
expression = regtypeExpression->data;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return expression;
|
HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId));
|
||||||
|
if (!HeapTupleIsValid(typeTuple))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
|
||||||
|
char *typeName = pstrdup(NameStr(typeForm->typname));
|
||||||
|
|
||||||
|
ReleaseSysCache(typeTuple);
|
||||||
|
return typeName;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetRemoteTypeNamespace returns the schema name of a type.
|
||||||
|
* Returns NULL for InvalidOid or types in pg_catalog.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
GetRemoteTypeNamespace(Oid typeId)
|
||||||
|
{
|
||||||
|
if (typeId == InvalidOid)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
HeapTuple typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeId));
|
||||||
|
if (!HeapTupleIsValid(typeTuple))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
Form_pg_type typeForm = (Form_pg_type) GETSTRUCT(typeTuple);
|
||||||
|
Oid typeNamespace = typeForm->typnamespace;
|
||||||
|
|
||||||
|
ReleaseSysCache(typeTuple);
|
||||||
|
|
||||||
|
/* Don't include schema for pg_catalog types for backward compatibility */
|
||||||
|
if (typeNamespace == PG_CATALOG_NAMESPACE)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return get_namespace_name(typeNamespace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -4953,19 +5018,52 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
|
||||||
StringInfo colocationGroupCreateCommand = makeStringInfo();
|
StringInfo colocationGroupCreateCommand = makeStringInfo();
|
||||||
appendStringInfo(colocationGroupCreateCommand,
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
"WITH colocation_group_data (colocationid, shardcount, "
|
"WITH colocation_group_data (colocationid, shardcount, "
|
||||||
"replicationfactor, distributioncolumntype, "
|
"replicationfactor, distributioncolumntypeschema, "
|
||||||
|
"distributioncolumntypename, "
|
||||||
"distributioncolumncollationname, "
|
"distributioncolumncollationname, "
|
||||||
"distributioncolumncollationschema) AS (VALUES ");
|
"distributioncolumncollationschema) AS (VALUES ");
|
||||||
|
|
||||||
Form_pg_dist_colocation colocationForm =
|
Form_pg_dist_colocation colocationForm =
|
||||||
(Form_pg_dist_colocation) GETSTRUCT(nextTuple);
|
(Form_pg_dist_colocation) GETSTRUCT(nextTuple);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get the type name and schema separately to defer type resolution.
|
||||||
|
* This is necessary when the type (e.g., a domain) is defined in a
|
||||||
|
* non-public schema that may not exist on the worker yet.
|
||||||
|
*/
|
||||||
|
char *typeName =
|
||||||
|
GetRemoteTypeName(colocationForm->distributioncolumntype);
|
||||||
|
char *typeSchemaName =
|
||||||
|
GetRemoteTypeNamespace(colocationForm->distributioncolumntype);
|
||||||
|
|
||||||
appendStringInfo(colocationGroupCreateCommand,
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
"(%d, %d, %d, %s, ",
|
"(%d, %d, %d, ",
|
||||||
colocationForm->colocationid,
|
colocationForm->colocationid,
|
||||||
colocationForm->shardcount,
|
colocationForm->shardcount,
|
||||||
colocationForm->replicationfactor,
|
colocationForm->replicationfactor);
|
||||||
RemoteTypeIdExpression(colocationForm->distributioncolumntype));
|
|
||||||
|
/* Add type schema and name */
|
||||||
|
if (typeSchemaName != NULL && typeName != NULL)
|
||||||
|
{
|
||||||
|
/* Use quote_identifier so the schema name can be cast to regnamespace */
|
||||||
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
|
"%s, %s, ",
|
||||||
|
quote_literal_cstr(quote_identifier(typeSchemaName)),
|
||||||
|
quote_literal_cstr(typeName));
|
||||||
|
}
|
||||||
|
else if (typeName != NULL)
|
||||||
|
{
|
||||||
|
/* Type is in pg_catalog or no schema qualifier needed */
|
||||||
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
|
"NULL, %s, ",
|
||||||
|
quote_literal_cstr(typeName));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* InvalidOid or unknown type */
|
||||||
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
|
"NULL, NULL, ");
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For collations, include the names in the VALUES section and then
|
* For collations, include the names in the VALUES section and then
|
||||||
|
|
@ -5001,14 +5099,25 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
|
||||||
"NULL, NULL)");
|
"NULL, NULL)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use LEFT JOIN with pg_type to resolve the type OID at runtime.
|
||||||
|
* This defers type resolution until execution on the worker, allowing
|
||||||
|
* the type and its schema to be created first by dependency commands.
|
||||||
|
*/
|
||||||
appendStringInfo(colocationGroupCreateCommand,
|
appendStringInfo(colocationGroupCreateCommand,
|
||||||
") SELECT citus_internal.add_colocation_metadata("
|
") SELECT citus_internal.add_colocation_metadata("
|
||||||
"colocationid, shardcount, replicationfactor, "
|
"colocationid, shardcount, replicationfactor, "
|
||||||
"distributioncolumntype, coalesce(c.oid, 0)) "
|
"coalesce(t.oid, 0), coalesce(c.oid, 0)) "
|
||||||
"FROM colocation_group_data d LEFT JOIN pg_collation c "
|
"FROM colocation_group_data d "
|
||||||
|
"LEFT JOIN pg_type t ON ("
|
||||||
|
"d.distributioncolumntypename = t.typname "
|
||||||
|
"AND (d.distributioncolumntypeschema IS NULL OR "
|
||||||
|
"t.typnamespace = (SELECT oid FROM pg_namespace WHERE "
|
||||||
|
"nspname = d.distributioncolumntypeschema))) "
|
||||||
|
"LEFT JOIN pg_collation c "
|
||||||
"ON (d.distributioncolumncollationname = c.collname "
|
"ON (d.distributioncolumncollationname = c.collname "
|
||||||
"AND d.distributioncolumncollationschema::regnamespace"
|
"AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE "
|
||||||
" = c.collnamespace)");
|
"nspname = d.distributioncolumncollationschema))");
|
||||||
|
|
||||||
List *commandList = list_make1(colocationGroupCreateCommand->data);
|
List *commandList = list_make1(colocationGroupCreateCommand->data);
|
||||||
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
||||||
|
|
|
||||||
|
|
@ -175,8 +175,8 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (3, 1, 1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
@ -240,7 +240,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
@ -301,7 +301,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
@ -369,7 +369,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
@ -430,7 +430,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (2, 8, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
@ -523,7 +523,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
|
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
|
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
@ -662,7 +662,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeid;
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
|
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx_testing_schema%' ORDER BY logicalrelid::text;
|
||||||
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
|
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
|
mx_testing_schema.mx_test_table | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varnullingrels (b) :varlevelsup 0 :varreturningtype 0 :varnosyn 1 :varattnosyn 1 :location -1} | 2 | s | f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
@ -2023,8 +2023,8 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
||||||
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET hasmetadata = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET isactive = TRUE WHERE nodeid = 2
|
||||||
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
UPDATE pg_dist_node SET metadatasynced = TRUE WHERE nodeid = 2
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, 0, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10009, 1, -1, NULL, NULL, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntypeschema, distributioncolumntypename, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10010, 4, 1, NULL, 'int4', NULL, NULL)) SELECT citus_internal.add_colocation_metadata(colocationid, shardcount, replicationfactor, coalesce(t.oid, 0), coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_type t ON (d.distributioncolumntypename = t.typname AND (d.distributioncolumntypeschema IS NULL OR t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumntypeschema))) LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND c.collnamespace = (SELECT oid FROM pg_namespace WHERE nspname = d.distributioncolumncollationschema))
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal.add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,231 @@
|
||||||
|
--
|
||||||
|
-- MULTI_METADATA_SYNC_DOMAIN
|
||||||
|
--
|
||||||
|
-- Test that metadata sync works correctly with DOMAIN types in non-public schemas
|
||||||
|
--
|
||||||
|
-- Create the initial cluster setup
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 9000000;
|
||||||
|
-- Store current sequence values and calculate restart values
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset
|
||||||
|
-- Remove the second worker node to start with
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
citus_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test 1: Domain in a non-public schema with special characters
|
||||||
|
CREATE SCHEMA "prepared statements";
|
||||||
|
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||||
|
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
|
||||||
|
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200);
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cleanup before next test
|
||||||
|
DROP TABLE dist_domain_nonpublic CASCADE;
|
||||||
|
DROP DOMAIN "prepared statements".test_key CASCADE;
|
||||||
|
DROP SCHEMA "prepared statements" CASCADE;
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
citus_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test 2: Domain in public schema
|
||||||
|
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
|
||||||
|
CREATE TABLE dist_domain_public(a positive_int, b text);
|
||||||
|
SELECT create_distributed_table('dist_domain_public', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two');
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- cleanup before next test
|
||||||
|
DROP TABLE dist_domain_public CASCADE;
|
||||||
|
DROP DOMAIN public.positive_int CASCADE;
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
citus_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test 3: Domain in a regular schema
|
||||||
|
CREATE SCHEMA test_schema;
|
||||||
|
CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$');
|
||||||
|
CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address);
|
||||||
|
SELECT create_distributed_table('dist_domain_regular_schema', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org');
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE SCHEMA "prepared statements";
|
||||||
|
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||||
|
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
|
||||||
|
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
|
||||||
|
CREATE TABLE dist_domain_public(a positive_int, b text);
|
||||||
|
SELECT create_distributed_table('dist_domain_public', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve');
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org');
|
||||||
|
INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120);
|
||||||
|
-- Verify the colocation metadata exists
|
||||||
|
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
|
||||||
|
FROM pg_dist_colocation
|
||||||
|
WHERE distributioncolumntype IN (
|
||||||
|
'"prepared statements".test_key'::regtype,
|
||||||
|
'public.positive_int'::regtype,
|
||||||
|
'test_schema.email_address'::regtype
|
||||||
|
)
|
||||||
|
ORDER BY distributioncolumntype::regtype::text;
|
||||||
|
shardcount | replicationfactor | distributioncolumntype
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4 | 1 | "prepared statements".test_key
|
||||||
|
4 | 1 | positive_int
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Verify metadata was synced correctly by checking colocation on the worker
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
|
||||||
|
FROM pg_dist_colocation
|
||||||
|
WHERE distributioncolumntype IN (
|
||||||
|
'"prepared statements".test_key'::regtype,
|
||||||
|
'public.positive_int'::regtype,
|
||||||
|
'test_schema.email_address'::regtype
|
||||||
|
)
|
||||||
|
ORDER BY distributioncolumntype::regtype::text;
|
||||||
|
shardcount | replicationfactor | distributioncolumntype
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4 | 1 | positive_int
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Verify the domains exist on the worker
|
||||||
|
SELECT typname, typnamespace::regnamespace
|
||||||
|
FROM pg_type
|
||||||
|
WHERE typname IN ('test_key', 'positive_int', 'email_address')
|
||||||
|
ORDER BY typname;
|
||||||
|
typname | typnamespace
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
email_address | test_schema
|
||||||
|
positive_int | public
|
||||||
|
test_key | "prepared statements"
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- Back to coordinator for cleanup
|
||||||
|
\c - - - :master_port
|
||||||
|
-- Test that queries still work after re-adding the worker
|
||||||
|
SELECT * FROM dist_domain_nonpublic ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test-3 | 110
|
||||||
|
test-4 | 120
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM dist_domain_public ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
11 | eleven
|
||||||
|
12 | twelve
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM dist_domain_regular_schema ORDER BY id;
|
||||||
|
id | email
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | user@example.com
|
||||||
|
2 | admin@test.org
|
||||||
|
11 | user@example.com
|
||||||
|
12 | admin@test.org
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- Test inserts still work
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com');
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com');
|
||||||
|
SELECT * FROM dist_domain_nonpublic ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test-3 | 110
|
||||||
|
test-4 | 120
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM dist_domain_public ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
11 | eleven
|
||||||
|
12 | twelve
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT * FROM dist_domain_regular_schema ORDER BY id;
|
||||||
|
id | email
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | user@example.com
|
||||||
|
1 | test@domain.com
|
||||||
|
2 | admin@test.org
|
||||||
|
2 | info@citusdata.com
|
||||||
|
11 | user@example.com
|
||||||
|
12 | admin@test.org
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
-- Cleanup
|
||||||
|
DROP TABLE dist_domain_nonpublic CASCADE;
|
||||||
|
DROP TABLE dist_domain_public CASCADE;
|
||||||
|
DROP TABLE dist_domain_regular_schema CASCADE;
|
||||||
|
DROP DOMAIN "prepared statements".test_key CASCADE;
|
||||||
|
DROP DOMAIN public.positive_int CASCADE;
|
||||||
|
DROP DOMAIN test_schema.email_address CASCADE;
|
||||||
|
DROP SCHEMA "prepared statements" CASCADE;
|
||||||
|
DROP SCHEMA test_schema CASCADE;
|
||||||
|
|
@ -269,9 +269,11 @@ test: multi_drop_extension
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers
|
# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers
|
||||||
|
# multi_metadata_sync_domain tests that metadata sync works correctly with DOMAIN types in non-public schemas
|
||||||
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
|
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
|
||||||
# ----------
|
# ----------
|
||||||
test: multi_metadata_sync
|
test: multi_metadata_sync
|
||||||
|
test: multi_metadata_sync_domain
|
||||||
test: multi_unsupported_worker_operations
|
test: multi_unsupported_worker_operations
|
||||||
|
|
||||||
test: grant_on_function_propagation
|
test: grant_on_function_propagation
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,145 @@
|
||||||
|
--
|
||||||
|
-- MULTI_METADATA_SYNC_DOMAIN
|
||||||
|
--
|
||||||
|
-- Test that metadata sync works correctly with DOMAIN types in non-public schemas
|
||||||
|
--
|
||||||
|
|
||||||
|
-- Create the initial cluster setup
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.next_shard_id TO 9000000;
|
||||||
|
-- Store current sequence values and calculate restart values
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_groupid_seq') - 1 AS last_group_id \gset
|
||||||
|
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') - 1 AS last_node_id \gset
|
||||||
|
-- Remove the second worker node to start with
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- Test 1: Domain in a non-public schema with special characters
|
||||||
|
CREATE SCHEMA "prepared statements";
|
||||||
|
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||||
|
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
|
||||||
|
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_nonpublic VALUES ('test-1', 100), ('test-2', 200);
|
||||||
|
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- cleanup before next test
|
||||||
|
DROP TABLE dist_domain_nonpublic CASCADE;
|
||||||
|
DROP DOMAIN "prepared statements".test_key CASCADE;
|
||||||
|
DROP SCHEMA "prepared statements" CASCADE;
|
||||||
|
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- Test 2: Domain in public schema
|
||||||
|
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
|
||||||
|
CREATE TABLE dist_domain_public(a positive_int, b text);
|
||||||
|
SELECT create_distributed_table('dist_domain_public', 'a');
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_public VALUES (1, 'one'), (2, 'two');
|
||||||
|
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- cleanup before next test
|
||||||
|
DROP TABLE dist_domain_public CASCADE;
|
||||||
|
DROP DOMAIN public.positive_int CASCADE;
|
||||||
|
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- Test 3: Domain in a regular schema
|
||||||
|
CREATE SCHEMA test_schema;
|
||||||
|
CREATE DOMAIN test_schema.email_address AS text CHECK(VALUE ~ '^[^@]+@[^@]+\.[^@]+$');
|
||||||
|
CREATE TABLE dist_domain_regular_schema(id int, email test_schema.email_address);
|
||||||
|
SELECT create_distributed_table('dist_domain_regular_schema', 'id');
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (1, 'user@example.com'), (2, 'admin@test.org');
|
||||||
|
|
||||||
|
|
||||||
|
-- Reset sequences to avoid conflicts with other tests
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||||
|
-- Now add the worker back - this is should not fail
|
||||||
|
-- The metadata sync should now work
|
||||||
|
SELECT 1 FROM citus_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
CREATE SCHEMA "prepared statements";
|
||||||
|
CREATE DOMAIN "prepared statements".test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||||
|
CREATE TABLE dist_domain_nonpublic(a "prepared statements".test_key, b int);
|
||||||
|
SELECT create_distributed_table('dist_domain_nonpublic', 'a');
|
||||||
|
|
||||||
|
CREATE DOMAIN public.positive_int AS int CHECK(VALUE > 0);
|
||||||
|
CREATE TABLE dist_domain_public(a positive_int, b text);
|
||||||
|
SELECT create_distributed_table('dist_domain_public', 'a');
|
||||||
|
|
||||||
|
-- Insert some test data
|
||||||
|
INSERT INTO dist_domain_public VALUES (11, 'eleven'), (12, 'twelve');
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (11, 'user@example.com'), (12, 'admin@test.org');
|
||||||
|
INSERT INTO dist_domain_nonpublic VALUES ('test-3', 110), ('test-4', 120);
|
||||||
|
|
||||||
|
-- Verify the colocation metadata exists
|
||||||
|
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
|
||||||
|
FROM pg_dist_colocation
|
||||||
|
WHERE distributioncolumntype IN (
|
||||||
|
'"prepared statements".test_key'::regtype,
|
||||||
|
'public.positive_int'::regtype,
|
||||||
|
'test_schema.email_address'::regtype
|
||||||
|
)
|
||||||
|
ORDER BY distributioncolumntype::regtype::text;
|
||||||
|
|
||||||
|
-- Verify metadata was synced correctly by checking colocation on the worker
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT shardcount, replicationfactor, distributioncolumntype::regtype
|
||||||
|
FROM pg_dist_colocation
|
||||||
|
WHERE distributioncolumntype IN (
|
||||||
|
'"prepared statements".test_key'::regtype,
|
||||||
|
'public.positive_int'::regtype,
|
||||||
|
'test_schema.email_address'::regtype
|
||||||
|
)
|
||||||
|
ORDER BY distributioncolumntype::regtype::text;
|
||||||
|
|
||||||
|
-- Verify the domains exist on the worker
|
||||||
|
SELECT typname, typnamespace::regnamespace
|
||||||
|
FROM pg_type
|
||||||
|
WHERE typname IN ('test_key', 'positive_int', 'email_address')
|
||||||
|
ORDER BY typname;
|
||||||
|
|
||||||
|
-- Back to coordinator for cleanup
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- Test that queries still work after re-adding the worker
|
||||||
|
SELECT * FROM dist_domain_nonpublic ORDER BY a;
|
||||||
|
SELECT * FROM dist_domain_public ORDER BY a;
|
||||||
|
SELECT * FROM dist_domain_regular_schema ORDER BY id;
|
||||||
|
|
||||||
|
-- Test inserts still work
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (1, 'test@domain.com');
|
||||||
|
INSERT INTO dist_domain_regular_schema VALUES (2, 'info@citusdata.com');
|
||||||
|
|
||||||
|
SELECT * FROM dist_domain_nonpublic ORDER BY a;
|
||||||
|
SELECT * FROM dist_domain_public ORDER BY a;
|
||||||
|
SELECT * FROM dist_domain_regular_schema ORDER BY id;
|
||||||
|
|
||||||
|
-- Cleanup
|
||||||
|
|
||||||
|
DROP TABLE dist_domain_nonpublic CASCADE;
|
||||||
|
DROP TABLE dist_domain_public CASCADE;
|
||||||
|
DROP TABLE dist_domain_regular_schema CASCADE;
|
||||||
|
|
||||||
|
DROP DOMAIN "prepared statements".test_key CASCADE;
|
||||||
|
DROP DOMAIN public.positive_int CASCADE;
|
||||||
|
DROP DOMAIN test_schema.email_address CASCADE;
|
||||||
|
|
||||||
|
DROP SCHEMA "prepared statements" CASCADE;
|
||||||
|
DROP SCHEMA test_schema CASCADE;
|
||||||
Loading…
Reference in New Issue