Move CheckCitusVersion to the top of each function

Previously this was usually done after argument parsing. This can cause
SEGFAULTs if the number or type of arguments changes in a new version.
By checking that Citus version is correct before doing any argument
parsing we protect against these types of issues. Issues like this have
occurred in pg_auto_failover, so it's not just a theoretical issue.

The main reason why these calls were not at the top of functions is
really just historical. It was because in the past we didn't allow
statements before declarations. Thus having this check before the
argument parsing would have only been possible if we first declared all
variables.

In addition to moving existing CheckCitusVersion calls it also adds
these calls to rebalancer related functions (they were missing there).
pull/5013/head
Jelte Fennema 2021-06-01 11:39:14 +02:00
parent 98081557fb
commit b1cad26ebc
40 changed files with 193 additions and 185 deletions

View File

@ -219,11 +219,11 @@ PG_FUNCTION_INFO_V1(worker_change_sequence_dependency);
Datum
undistribute_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
bool cascadeViaForeignKeys = PG_GETARG_BOOL(1);
CheckCitusVersion(ERROR);
TableConversionParameters params = {
.relationId = relationId,
.cascadeViaForeignKeys = cascadeViaForeignKeys
@ -243,6 +243,8 @@ undistribute_table(PG_FUNCTION_ARGS)
Datum
alter_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
char *distributionColumn = NULL;
@ -280,9 +282,6 @@ alter_distributed_table(PG_FUNCTION_ARGS)
}
}
CheckCitusVersion(ERROR);
TableConversionParameters params = {
.relationId = relationId,
.distributionColumn = distributionColumn,
@ -305,13 +304,13 @@ alter_distributed_table(PG_FUNCTION_ARGS)
Datum
alter_table_set_access_method(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *accessMethodText = PG_GETARG_TEXT_P(1);
char *accessMethod = text_to_cstring(accessMethodText);
CheckCitusVersion(ERROR);
TableConversionParameters params = {
.relationId = relationId,
.accessMethod = accessMethod

View File

@ -142,12 +142,11 @@ PG_FUNCTION_INFO_V1(create_reference_table);
Datum
master_create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
CheckCitusVersion(ERROR);
EnsureCitusTableCanBeCreated(relationId);
char *colocateWithTableName = NULL;
@ -189,6 +188,8 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
Datum
create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
@ -221,8 +222,6 @@ create_distributed_table(PG_FUNCTION_ARGS)
shardCountIsStrict = true;
}
CheckCitusVersion(ERROR);
EnsureCitusTableCanBeCreated(relationId);
/* enable create_distributed_table on an empty node */
@ -271,6 +270,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
Datum
create_reference_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
char *colocateWithTableName = NULL;
@ -278,8 +278,6 @@ create_reference_table(PG_FUNCTION_ARGS)
bool viaDeprecatedAPI = false;
CheckCitusVersion(ERROR);
EnsureCitusTableCanBeCreated(relationId);
/* enable create_reference_table on an empty node */

View File

@ -59,6 +59,8 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS)
Datum
master_remove_partition_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *tableNameText = PG_GETARG_TEXT_P(2);
@ -66,8 +68,6 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
char *schemaName = text_to_cstring(schemaNameText);
char *tableName = text_to_cstring(tableNameText);
CheckCitusVersion(ERROR);
/*
* The SQL_DROP trigger calls this function even for tables that are
* not distributed. In that case, silently ignore. This is not very
@ -97,6 +97,8 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
Datum
master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *tableNameText = PG_GETARG_TEXT_P(2);
@ -104,8 +106,6 @@ master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS)
char *schemaName = text_to_cstring(schemaNameText);
char *tableName = text_to_cstring(tableNameText);
CheckCitusVersion(ERROR);
CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName);
MasterRemoveDistributedTableMetadataFromWorkers(relationId, schemaName, tableName);

View File

@ -174,10 +174,11 @@ TruncateTaskList(Oid relationId)
Datum
truncate_local_data_after_distributing_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid relationId = PG_GETARG_OID(0);
EnsureLocalTableCanBeTruncated(relationId);
TruncateStmt *truncateStmt = makeNode(TruncateStmt);

View File

@ -111,9 +111,9 @@ PG_FUNCTION_INFO_V1(citus_reserved_connection_stats);
Datum
citus_reserved_connection_stats(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllReservedConnections(tupleStore, tupleDescriptor);

View File

@ -136,9 +136,9 @@ PG_FUNCTION_INFO_V1(citus_remote_connection_stats);
Datum
citus_remote_connection_stats(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor);

View File

@ -118,6 +118,8 @@ PG_FUNCTION_INFO_V1(fetch_intermediate_results);
Datum
broadcast_intermediate_result(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(1);
@ -125,8 +127,6 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS)
bool writeLocalFile = false;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
/*
* Make sure that this transaction has a distributed transaction ID.
*
@ -159,6 +159,8 @@ broadcast_intermediate_result(PG_FUNCTION_ARGS)
Datum
create_intermediate_result(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *resultIdText = PG_GETARG_TEXT_P(0);
char *resultIdString = text_to_cstring(resultIdText);
text *queryText = PG_GETARG_TEXT_P(1);
@ -167,8 +169,6 @@ create_intermediate_result(PG_FUNCTION_ARGS)
bool writeLocalFile = true;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
/*
* Make sure that this transaction has a distributed transaction ID.
*
@ -771,13 +771,13 @@ IntermediateResultSize(const char *resultId)
Datum
read_intermediate_result(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Datum resultId = PG_GETARG_DATUM(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
CheckCitusVersion(ERROR);
ReadIntermediateResultsIntoFuncOutput(fcinfo, copyFormatLabel, &resultId, 1);
PG_RETURN_DATUM(0);
@ -794,14 +794,14 @@ read_intermediate_result(PG_FUNCTION_ARGS)
Datum
read_intermediate_result_array(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum copyFormatOidDatum = PG_GETARG_DATUM(1);
Datum copyFormatLabelDatum = DirectFunctionCall1(enum_out, copyFormatOidDatum);
char *copyFormatLabel = DatumGetCString(copyFormatLabelDatum);
CheckCitusVersion(ERROR);
int32 resultCount = ArrayGetNItems(ARR_NDIM(resultIdObject), ARR_DIMS(
resultIdObject));
Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
@ -874,6 +874,8 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
Datum
fetch_intermediate_results(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ArrayType *resultIdObject = PG_GETARG_ARRAYTYPE_P(0);
Datum *resultIdArray = DeconstructArrayObject(resultIdObject);
int32 resultCount = ArrayObjectCount(resultIdObject);
@ -885,8 +887,6 @@ fetch_intermediate_results(PG_FUNCTION_ARGS)
int resultIndex = 0;
int64 totalBytesWritten = 0L;
CheckCitusVersion(ERROR);
if (resultCount == 0)
{
PG_RETURN_INT64(0);

View File

@ -107,6 +107,8 @@ PG_FUNCTION_INFO_V1(worker_partition_query_result);
Datum
worker_partition_query_result(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ReturnSetInfo *resultInfo = (ReturnSetInfo *) fcinfo->resultinfo;
text *resultIdPrefixText = PG_GETARG_TEXT_P(0);
@ -136,8 +138,6 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
bool binaryCopy = PG_GETARG_BOOL(6);
CheckCitusVersion(ERROR);
if (!IsMultiStatementTransaction())
{
ereport(ERROR, (errmsg("worker_partition_query_result can only be used in a "

View File

@ -2641,6 +2641,8 @@ SecondaryNodeRoleId(void)
Datum
citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TriggerData *triggerData = (TriggerData *) fcinfo->context;
Oid oldLogicalRelationId = InvalidOid;
Oid newLogicalRelationId = InvalidOid;
@ -2651,8 +2653,6 @@ citus_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
if (RelationGetRelid(triggerData->tg_relation) != DistPartitionRelationId())
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
@ -2718,6 +2718,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TriggerData *triggerData = (TriggerData *) fcinfo->context;
Oid oldLogicalRelationId = InvalidOid;
Oid newLogicalRelationId = InvalidOid;
@ -2728,8 +2730,6 @@ citus_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
if (RelationGetRelid(triggerData->tg_relation) != DistShardRelationId())
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
@ -2795,6 +2795,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TriggerData *triggerData = (TriggerData *) fcinfo->context;
Oid oldShardId = InvalidOid;
Oid newShardId = InvalidOid;
@ -2805,8 +2807,6 @@ citus_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
/*
* Before 7.0-2 this trigger is on pg_dist_shard_placement,
* ignore trigger in this scenario.
@ -2884,14 +2884,14 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
@ -2919,14 +2919,14 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_conninfo_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
/* no-op in community edition */
PG_RETURN_DATUM(PointerGetDatum(NULL));
@ -2954,14 +2954,14 @@ master_dist_authinfo_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
@ -2989,14 +2989,14 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
Datum
citus_dist_object_cache_invalidate(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (!CALLED_AS_TRIGGER(fcinfo))
{
ereport(ERROR, (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -103,6 +103,8 @@ static bool got_SIGALRM = false;
Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
@ -126,10 +128,10 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
/* fail if metadata synchronization doesn't succeed */
bool raiseInterrupts = true;
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureSuperUser();
EnsureModificationsCanRun();
CheckCitusVersion(ERROR);
PreventInTransactionBlock(true, "start_metadata_sync_to_node");
@ -185,14 +187,14 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureSuperUser();
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
EnsureCoordinator();
EnsureSuperUser();
CheckCitusVersion(ERROR);
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);

View File

@ -270,13 +270,13 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
Datum
citus_total_relation_size(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
bool failOnError = PG_GETARG_BOOL(1);
SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
sizeQueryType = CSTORE_TABLE_SIZE;
@ -301,12 +301,12 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
Datum
citus_table_size(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
SizeQueryType sizeQueryType = TABLE_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
sizeQueryType = CSTORE_TABLE_SIZE;
@ -331,12 +331,12 @@ citus_table_size(PG_FUNCTION_ARGS)
Datum
citus_relation_size(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
SizeQueryType sizeQueryType = RELATION_SIZE;
CheckCitusVersion(ERROR);
if (CStoreTable(relationId))
{
sizeQueryType = CSTORE_TABLE_SIZE;

View File

@ -161,6 +161,8 @@ DefaultNodeMetadata()
Datum
citus_set_coordinator_host(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
@ -173,8 +175,6 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
Name nodeClusterName = PG_GETARG_NAME(3);
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
CheckCitusVersion(ERROR);
/* prevent concurrent modification */
LockRelationOid(DistNodeRelationId(), RowShareLock);
@ -219,6 +219,8 @@ citus_set_coordinator_host(PG_FUNCTION_ARGS)
Datum
citus_add_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
@ -227,8 +229,6 @@ citus_add_node(PG_FUNCTION_ARGS)
bool nodeAlreadyExists = false;
nodeMetadata.groupId = PG_GETARG_INT32(2);
CheckCitusVersion(ERROR);
/*
* During tests this function is called before nodeRole and nodeCluster have been
* created.
@ -288,6 +288,8 @@ master_add_node(PG_FUNCTION_ARGS)
Datum
citus_add_inactive_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
@ -299,8 +301,6 @@ citus_add_inactive_node(PG_FUNCTION_ARGS)
nodeMetadata.nodeRole = PG_GETARG_OID(3);
nodeMetadata.nodeCluster = NameStr(*nodeClusterName);
CheckCitusVersion(ERROR);
if (nodeMetadata.groupId == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errmsg("coordinator node cannot be added as inactive node")));
@ -331,6 +331,8 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
Datum
citus_add_secondary_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
@ -348,8 +350,6 @@ citus_add_secondary_node(PG_FUNCTION_ARGS)
nodeMetadata.nodeRole = SecondaryNodeRoleId();
nodeMetadata.isActive = true;
CheckCitusVersion(ERROR);
int nodeId = AddNodeMetadata(nodeNameString, nodePort, &nodeMetadata,
&nodeAlreadyExists);
TransactionModifiedNodeMetadata = true;
@ -380,11 +380,11 @@ master_add_secondary_node(PG_FUNCTION_ARGS)
Datum
citus_remove_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
CheckCitusVersion(ERROR);
RemoveNodeFromCluster(text_to_cstring(nodeNameText), nodePort);
TransactionModifiedNodeMetadata = true;
@ -631,7 +631,6 @@ static WorkerNode *
ModifiableWorkerNode(const char *nodeName, int32 nodePort)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
@ -843,6 +842,8 @@ ActivateNode(char *nodeName, int nodePort)
Datum
citus_update_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int32 nodeId = PG_GETARG_INT32(0);
text *newNodeName = PG_GETARG_TEXT_P(1);
@ -864,8 +865,6 @@ citus_update_node(PG_FUNCTION_ARGS)
List *placementList = NIL;
BackgroundWorkerHandle *handle = NULL;
CheckCitusVersion(ERROR);
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
newNodePort);
if (workerNodeWithSameAddress != NULL)
@ -1077,10 +1076,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
Datum
get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
{
ShardInterval *shardInterval = NULL;
CheckCitusVersion(ERROR);
ShardInterval *shardInterval = NULL;
/*
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
* we need to check all parameters for NULL values.

View File

@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(citus_create_restore_point);
Datum
citus_create_restore_point(PG_FUNCTION_ARGS)
{
text *restoreNameText = PG_GETARG_TEXT_P(0);
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
text *restoreNameText = PG_GETARG_TEXT_P(0);
if (RecoveryInProgress())
{
ereport(ERROR,

View File

@ -71,14 +71,14 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
Datum
master_run_on_worker(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
bool parallelExecution = false;
StringInfo *nodeNameArray = NULL;
int *nodePortArray = NULL;
StringInfo *commandStringArray = NULL;
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
{

View File

@ -64,6 +64,9 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards);
Datum
master_create_worker_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
text *tableNameText = PG_GETARG_TEXT_P(0);
int32 shardCount = PG_GETARG_INT32(1);
int32 replicationFactor = PG_GETARG_INT32(2);
@ -74,9 +77,6 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
/* do not add any data */
bool useExclusiveConnections = false;
EnsureCoordinator();
CheckCitusVersion(ERROR);
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created

View File

@ -109,6 +109,9 @@ PG_FUNCTION_INFO_V1(master_drop_sequences);
Datum
master_apply_delete_command(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
text *queryText = PG_GETARG_TEXT_P(0);
char *queryString = text_to_cstring(queryText);
List *deletableShardIntervalList = NIL;
@ -116,9 +119,6 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
Node *queryTreeNode = rawStmt->stmt;
EnsureCoordinator();
CheckCitusVersion(ERROR);
if (!IsA(queryTreeNode, DeleteStmt))
{
ereport(ERROR, (errmsg("query \"%s\" is not a delete statement",
@ -208,6 +208,8 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
Datum
citus_drop_all_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *relationNameText = PG_GETARG_TEXT_P(2);
@ -215,8 +217,6 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
char *schemaName = text_to_cstring(schemaNameText);
char *relationName = text_to_cstring(relationNameText);
CheckCitusVersion(ERROR);
/*
* The SQL_DROP trigger calls this function even for tables that are
* not distributed. In that case, silently ignore and return -1.

View File

@ -70,13 +70,13 @@ PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
Datum
master_modify_multiple_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *queryText = PG_GETARG_TEXT_P(0);
char *queryString = text_to_cstring(queryText);
RawStmt *rawStmt = (RawStmt *) ParseTreeRawStmt(queryString);
Node *queryTreeNode = rawStmt->stmt;
CheckCitusVersion(ERROR);
if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt))
{
ereport(ERROR, (errmsg("query \"%s\" is not a delete or update "

View File

@ -103,6 +103,8 @@ PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
Datum
master_get_table_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, false);
@ -112,8 +114,6 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
Datum values[TABLE_METADATA_FIELDS];
bool isNulls[TABLE_METADATA_FIELDS];
CheckCitusVersion(ERROR);
/* find partition tuple for partitioned relation */
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);
@ -201,11 +201,11 @@ CStoreTable(Oid relationId)
Datum
master_get_table_ddl_events(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
ListCell *tableDDLEventCell = NULL;
CheckCitusVersion(ERROR);
/*
* On the very first call to this function, we first use the given relation
* name to get to the relation. We then recreate the list of DDL statements
@ -276,8 +276,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
Datum
master_get_new_shardid(PG_FUNCTION_ARGS)
{
EnsureCoordinator();
CheckCitusVersion(ERROR);
EnsureCoordinator();
uint64 shardId = GetNextShardId();
Datum shardIdDatum = Int64GetDatum(shardId);
@ -346,8 +346,8 @@ GetNextShardId()
Datum
master_get_new_placementid(PG_FUNCTION_ARGS)
{
EnsureCoordinator();
CheckCitusVersion(ERROR);
EnsureCoordinator();
uint64 placementId = GetNextPlacementId();
Datum placementIdDatum = Int64GetDatum(placementId);
@ -453,11 +453,11 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS)
Datum
citus_get_active_worker_nodes(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
uint32 workerNodeCount = 0;
CheckCitusVersion(ERROR);
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */

View File

@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(time_partition_range);
Datum
time_partition_range(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
/* create tuple descriptor for return value */
TupleDesc metadataDescriptor = NULL;
TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL,

View File

@ -122,6 +122,9 @@ bool CheckAvailableSpaceBeforeMove = true;
Datum
citus_copy_shard_placement(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
text *sourceNodeNameText = PG_GETARG_TEXT_P(1);
int32 sourceNodePort = PG_GETARG_INT32(2);
@ -133,9 +136,6 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *targetNodeName = text_to_cstring(targetNodeNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
{
@ -283,6 +283,9 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
Datum
citus_move_shard_placement(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
int64 shardId = PG_GETARG_INT64(0);
char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
int32 sourceNodePort = PG_GETARG_INT32(2);
@ -294,10 +297,6 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
ListCell *colocatedTableCell = NULL;
ListCell *colocatedShardCell = NULL;
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid relationId = RelationIdForShard(shardId);
ErrorIfMoveCitusLocalTable(relationId);
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);

View File

@ -547,6 +547,7 @@ GetShardCost(uint64 shardId, void *voidContext)
Datum
citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 shardId = PG_GETARG_INT64(0);
bool missingOk = false;
ShardPlacement *shardPlacement = ActiveShardPlacement(shardId, missingOk);
@ -784,6 +785,7 @@ SetupRebalanceMonitor(List *placementUpdateList, Oid relationId)
Datum
rebalance_table_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
List *relationIdList = NIL;
if (!PG_ARGISNULL(0))
{
@ -888,6 +890,7 @@ GetRebalanceStrategy(Name name)
Datum
citus_drain_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_ENSURE_ARGNOTNULL(0, "nodename");
PG_ENSURE_ARGNOTNULL(1, "nodeport");
PG_ENSURE_ARGNOTNULL(2, "shard_transfer_mode");
@ -932,6 +935,7 @@ citus_drain_node(PG_FUNCTION_ARGS)
Datum
replicate_table_shards(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
uint32 shardReplicationFactor = PG_GETARG_INT32(1);
int32 maxShardCopies = PG_GETARG_INT32(2);
@ -986,6 +990,7 @@ master_drain_node(PG_FUNCTION_ARGS)
Datum
get_rebalance_table_shards_plan(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
List *relationIdList = NIL;
if (!PG_ARGISNULL(0))
{
@ -1067,6 +1072,7 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS)
Datum
get_rebalance_progress(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
List *segmentList = NIL;
ListCell *rebalanceMonitorCell = NULL;
TupleDesc tupdesc;
@ -2811,6 +2817,7 @@ pg_dist_rebalance_strategy_enterprise_check(PG_FUNCTION_ARGS)
Datum
citus_validate_rebalance_strategy_functions(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureShardCostUDF(PG_GETARG_OID(0));
EnsureNodeCapacityUDF(PG_GETARG_OID(1));
EnsureShardAllowedOnNodeUDF(PG_GETARG_OID(2));

View File

@ -61,10 +61,10 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
Datum
worker_hash(PG_FUNCTION_ARGS)
{
Datum valueDatum = PG_GETARG_DATUM(0);
CheckCitusVersion(ERROR);
Datum valueDatum = PG_GETARG_DATUM(0);
/* figure out hash function from the data type */
Oid valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0);
TypeCacheEntry *typeEntry = lookup_type_cache(valueDataType,

View File

@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(citus_update_table_statistics);
Datum
master_create_empty_shard(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText);
uint32 attemptableNodeCount = 0;
@ -108,8 +110,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
Oid relationId = ResolveRelationId(relationNameText, false);
char relationKind = get_rel_relkind(relationId);
CheckCitusVersion(ERROR);
EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId);
@ -239,6 +239,8 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
Datum
master_append_table_to_shard(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 shardId = PG_GETARG_INT64(0);
text *sourceTableNameText = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(2);
@ -249,8 +251,6 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
float4 shardFillLevel = 0.0;
CheckCitusVersion(ERROR);
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
@ -359,10 +359,10 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
Datum
citus_update_shard_statistics(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
CheckCitusVersion(ERROR);
int64 shardId = PG_GETARG_INT64(0);
uint64 shardSize = UpdateShardStatistics(shardId);
PG_RETURN_INT64(shardSize);
@ -376,10 +376,10 @@ citus_update_shard_statistics(PG_FUNCTION_ARGS)
Datum
citus_update_table_statistics(PG_FUNCTION_ARGS)
{
Oid distributedTableId = PG_GETARG_OID(0);
CheckCitusVersion(ERROR);
Oid distributedTableId = PG_GETARG_OID(0);
UpdateTableStatistics(distributedTableId);
PG_RETURN_VOID();

View File

@ -901,13 +901,13 @@ AppendShardIdToName(char **name, uint64 shardId)
Datum
shard_name(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
int64 shardId = PG_GETARG_INT64(1);
char *qualifiedName = NULL;
CheckCitusVersion(ERROR);
if (shardId <= 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

View File

@ -39,6 +39,8 @@ PG_FUNCTION_INFO_V1(get_adjacency_list_wait_graph);
Datum
get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
HASH_SEQ_STATUS status;
@ -47,8 +49,6 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
Datum values[2];
bool isNulls[2];
CheckCitusVersion(ERROR);
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
WaitGraph *waitGraph = BuildGlobalWaitGraph();
HTAB *adjacencyList = BuildAdjacencyListsForWaitGraph(waitGraph);

View File

@ -77,11 +77,11 @@ drop_constraint_cascade_via_perform_deletion(PG_FUNCTION_ARGS)
Datum
get_referencing_relation_id_list(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
ListCell *foreignRelationCell = NULL;
CheckCitusVersion(ERROR);
/* for the first we call this UDF, we need to populate the result to return set */
if (SRF_IS_FIRSTCALL())
{
@ -136,11 +136,11 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS)
Datum
get_referenced_relation_id_list(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
ListCell *foreignRelationCell = NULL;
CheckCitusVersion(ERROR);
/* for the first we call this UDF, we need to populate the result to return set */
if (SRF_IS_FIRSTCALL())
{

View File

@ -30,11 +30,11 @@ PG_FUNCTION_INFO_V1(get_foreign_key_to_reference_table_commands);
Datum
get_foreign_key_to_reference_table_commands(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
FuncCallContext *functionContext = NULL;
ListCell *commandsCell = NULL;
CheckCitusVersion(ERROR);
/* for the first we call this UDF, we need to populate the result to return set */
if (SRF_IS_FIRSTCALL())
{

View File

@ -34,6 +34,8 @@ PG_FUNCTION_INFO_V1(store_intermediate_result_on_node);
Datum
store_intermediate_result_on_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeNameText = PG_GETARG_TEXT_P(0);
char *nodeNameString = text_to_cstring(nodeNameText);
int nodePort = PG_GETARG_INT32(1);
@ -44,8 +46,6 @@ store_intermediate_result_on_node(PG_FUNCTION_ARGS)
bool writeLocalFile = false;
ParamListInfo paramListInfo = NULL;
CheckCitusVersion(ERROR);
WorkerNode *workerNode = FindWorkerNodeOrError(nodeNameString, nodePort);
/*

View File

@ -83,13 +83,13 @@ AllowNonIdleTransactionOnXactHandling(void)
Datum
start_session_level_connection_to_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *nodeName = PG_GETARG_TEXT_P(0);
uint32 nodePort = PG_GETARG_UINT32(1);
char *nodeNameString = text_to_cstring(nodeName);
int connectionFlags = 0;
CheckCitusVersion(ERROR);
if (singleConnection != NULL && (strcmp(singleConnection->hostname,
nodeNameString) != 0 ||
singleConnection->port != nodePort))

View File

@ -106,6 +106,8 @@ PG_FUNCTION_INFO_V1(get_all_active_transactions);
Datum
assign_distributed_transaction_id(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid userId = GetUserId();
/* prepare data before acquiring spinlock to protect against errors */
@ -113,8 +115,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
uint64 transactionNumber = PG_GETARG_INT64(1);
TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(2);
CheckCitusVersion(ERROR);
/* MyBackendData should always be avaliable, just out of paranoia */
if (!MyBackendData)
{
@ -166,14 +166,14 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
Datum
get_current_transaction_id(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Datum values[5];
bool isNulls[5];
CheckCitusVersion(ERROR);
/* build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
{
@ -225,12 +225,13 @@ get_current_transaction_id(PG_FUNCTION_ARGS)
Datum
get_global_active_transactions(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock);
List *connectionList = NIL;
StringInfo queryToSend = makeStringInfo();
CheckCitusVersion(ERROR);
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY);
@ -336,9 +337,9 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
Datum
get_all_active_transactions(PG_FUNCTION_ARGS)
{
TupleDesc tupleDescriptor = NULL;
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllActiveTransactions(tupleStore, tupleDescriptor);

View File

@ -70,6 +70,9 @@ PG_FUNCTION_INFO_V1(update_distributed_table_colocation);
Datum
mark_tables_colocated(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid sourceRelationId = PG_GETARG_OID(0);
ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
@ -80,8 +83,6 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
"operation")));
}
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureTableOwner(sourceRelationId);
Datum *relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);
@ -108,11 +109,12 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
Datum
update_distributed_table_colocation(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
Oid targetRelationId = PG_GETARG_OID(0);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(1);
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureTableOwner(targetRelationId);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);

View File

@ -49,12 +49,12 @@ PG_FUNCTION_INFO_V1(column_to_column_name);
Datum
column_name_to_column(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *columnText = PG_GETARG_TEXT_P(1);
char *columnName = text_to_cstring(columnText);
CheckCitusVersion(ERROR);
Relation relation = relation_open(relationId, AccessShareLock);
Var *column = BuildDistributionKeyFromColumnName(relation, columnName);
@ -100,13 +100,13 @@ column_name_to_column_id(PG_FUNCTION_ARGS)
Datum
column_to_column_name(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *columnNodeText = PG_GETARG_TEXT_P(1);
char *columnNodeString = text_to_cstring(columnNodeText);
CheckCitusVersion(ERROR);
char *columnName = ColumnToColumnName(relationId, columnNodeString);
text *columnText = cstring_to_text(columnName);

View File

@ -97,11 +97,11 @@ PG_FUNCTION_INFO_V1(lock_relation_if_exists);
Datum
lock_shard_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0));
ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
@ -134,11 +134,11 @@ lock_shard_metadata(PG_FUNCTION_ARGS)
Datum
lock_shard_resources(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0));
ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));

View File

@ -94,6 +94,8 @@ PG_FUNCTION_INFO_V1(master_expire_table_cache);
Datum
worker_fetch_partition_file(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
uint32 partitionTaskId = PG_GETARG_UINT32(1);
uint32 partitionFileId = PG_GETARG_UINT32(2);
@ -115,8 +117,6 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
@ -383,6 +383,8 @@ CitusDeleteFile(const char *filename)
Datum
worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 shardId = PG_GETARG_INT64(0);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *ddlCommandText = PG_GETARG_TEXT_P(2);
@ -391,8 +393,6 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtilityParseTree(ddlCommandNode, ddlCommand, PROCESS_UTILITY_QUERY, NULL,
@ -410,6 +410,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
Datum
worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 leftShardId = PG_GETARG_INT64(0);
text *leftShardSchemaNameText = PG_GETARG_TEXT_P(1);
uint64 rightShardId = PG_GETARG_INT64(2);
@ -421,8 +423,6 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId,
@ -443,6 +443,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
Datum
worker_apply_sequence_command(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *commandText = PG_GETARG_TEXT_P(0);
Oid sequenceTypeId = PG_GETARG_OID(1);
const char *commandString = text_to_cstring(commandText);
@ -450,8 +452,6 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
NodeTag nodeType = nodeTag(commandNode);
CheckCitusVersion(ERROR);
if (nodeType != T_CreateSeqStmt)
{
ereport(ERROR,
@ -579,6 +579,8 @@ ParseTreeRawStmt(const char *ddlCommand)
Datum
worker_append_table_to_shard(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *shardQualifiedNameText = PG_GETARG_TEXT_P(0);
text *sourceQualifiedNameText = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(2);
@ -596,8 +598,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
CheckCitusVersion(ERROR);
/* We extract schema names and table names from qualified names */
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);

View File

@ -49,15 +49,15 @@ PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
Datum
worker_drop_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName, true);
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
char relationKind = '\0';
CheckCitusVersion(ERROR);
EnsureSuperUser();
if (!OidIsValid(relationId))
{
ereport(NOTICE, (errmsg("relation %s does not exist, skipping",

View File

@ -35,13 +35,13 @@ PG_FUNCTION_INFO_V1(worker_find_block_local_path);
Datum
worker_foreign_file_path(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
text *foreignTableName = PG_GETARG_TEXT_P(0);
text *foreignFilePath = NULL;
Oid relationId = ResolveRelationId(foreignTableName, false);
ForeignTable *foreignTable = GetForeignTable(relationId);
CheckCitusVersion(ERROR);
DefElem *option = NULL;
foreach_ptr(option, foreignTable->options)
{
@ -75,6 +75,8 @@ worker_foreign_file_path(PG_FUNCTION_ARGS)
Datum
worker_find_block_local_path(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int64 blockId = PG_GETARG_INT64(0);
ArrayType *dataDirectoryObject = PG_GETARG_ARRAYTYPE_P(1);
@ -82,8 +84,6 @@ worker_find_block_local_path(PG_FUNCTION_ARGS)
(void) blockId;
(void) dataDirectoryObject;
CheckCitusVersion(ERROR);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("called function is currently unsupported")));

View File

@ -68,14 +68,14 @@ PG_FUNCTION_INFO_V1(worker_repartition_cleanup);
Datum
worker_create_schema(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
text *ownerText = PG_GETARG_TEXT_P(1);
char *ownerString = TextDatumGetCString(ownerText);
StringInfo jobSchemaName = JobSchemaName(jobId);
CheckCitusVersion(ERROR);
bool schemaExists = JobSchemaExists(jobSchemaName);
if (!schemaExists)
{
@ -144,12 +144,12 @@ CreateJobSchema(StringInfo schemaName, char *schemaOwner)
Datum
worker_repartition_cleanup(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
StringInfo jobDirectoryName = JobDirectoryName(jobId);
StringInfo jobSchemaName = JobSchemaName(jobId);
CheckCitusVersion(ERROR);
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
EnsureSchemaOwner(schemaId);
@ -173,6 +173,8 @@ worker_repartition_cleanup(PG_FUNCTION_ARGS)
Datum
worker_merge_files_into_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
uint32 taskId = PG_GETARG_UINT32(1);
ArrayType *columnNameObject = PG_GETARG_ARRAYTYPE_P(2);
@ -189,8 +191,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
int32 columnNameCount = ArrayObjectCount(columnNameObject);
int32 columnTypeCount = ArrayObjectCount(columnTypeObject);
CheckCitusVersion(ERROR);
if (columnNameCount != columnTypeCount)
{
ereport(ERROR, (errmsg("column name array size: %d and type array size: %d"
@ -264,11 +264,11 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
Datum
worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
ScanKey scanKey = NULL;
int scanKeyCount = 0;
CheckCitusVersion(ERROR);
Relation pgNamespace = table_open(NamespaceRelationId, AccessExclusiveLock);
TableScanDesc scanDescriptor = table_beginscan_catalog(pgNamespace, scanKeyCount,
scanKey);

View File

@ -109,6 +109,8 @@ PG_FUNCTION_INFO_V1(worker_hash_partition_table);
Datum
worker_range_partition_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
uint32 taskId = PG_GETARG_UINT32(1);
text *filterQueryText = PG_GETARG_TEXT_P(2);
@ -130,8 +132,6 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
/* first check that array element's and partition column's types match */
Oid splitPointType = ARR_ELEMTYPE(splitPointObject);
CheckCitusVersion(ERROR);
if (splitPointType != partitionColumnType)
{
ereport(ERROR, (errmsg("partition column type %u and split point type %u "
@ -188,6 +188,8 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
Datum
worker_hash_partition_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
uint64 jobId = PG_GETARG_INT64(0);
uint32 taskId = PG_GETARG_UINT32(1);
text *filterQueryText = PG_GETARG_TEXT_P(2);
@ -209,8 +211,6 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
Datum *hashRangeArray = DeconstructArrayObject(hashRangeObject);
int32 partitionCount = ArrayObjectCount(hashRangeObject);
CheckCitusVersion(ERROR);
HashPartitionContext *partitionContext = palloc0(sizeof(HashPartitionContext));
partitionContext->syntheticShardIntervalArray =
SyntheticShardIntervalArrayForShardMinValues(hashRangeArray, partitionCount);

View File

@ -38,11 +38,11 @@ PG_FUNCTION_INFO_V1(relation_is_a_known_shard);
Datum
relation_is_a_known_shard(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
bool onlySearchPath = true;
CheckCitusVersion(ERROR);
PG_RETURN_BOOL(RelationIsAKnownShard(relationId, onlySearchPath));
}
@ -55,12 +55,12 @@ relation_is_a_known_shard(PG_FUNCTION_ARGS)
Datum
citus_table_is_visible(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
char relKind = '\0';
bool onlySearchPath = true;
CheckCitusVersion(ERROR);
/*
* We don't want to deal with not valid/existing relations
* as pg_table_is_visible does.

View File

@ -35,10 +35,10 @@ PG_FUNCTION_INFO_V1(worker_create_truncate_trigger);
Datum
worker_create_truncate_trigger(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
EnsureSuperUser();
CheckCitusVersion(ERROR);
EnsureSuperUser();
Oid relationId = PG_GETARG_OID(0);
/* Create the truncate trigger */
CreateTruncateTrigger(relationId);