Disallow distributed functions with distribution arguments unless replication_model is streaming

pull/3129/head
Marco Slot 2019-10-23 04:40:48 +02:00
parent 29d45bd1b9
commit 067657af26
1 changed files with 19 additions and 1 deletions

View File

@ -342,6 +342,8 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
{
Oid colocatedTableId = InvalidOid;
/* check for default colocation group */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionArgumentOid);
@ -356,6 +358,22 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
errhint("Provide a distributed table via \"colocate_with\" "
"option to create_distributed_function()")));
}
colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid,
colocatedTableId);
}
else if (ReplicationModel == REPLICATION_MODEL_COORDINATOR)
{
/* streaming replication model is required for metadata syncing */
ereport(ERROR, (errmsg("cannot create a function with a distribution "
"argument when citus.replication_model is "
"'statement'"),
errhint("Set citus.replication_model to 'streaming' "
"before creating distributed tables")));
}
}
else
{
@ -412,7 +430,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
"with distributed tables that are created using "
"streaming replication model."),
errhint("When distributing tables make sure that "
"\"citus.replication_model\" is set to \"streaming\"")));
"citus.replication_model = 'streaming'")));
}
/*