Upgrade uncrustify from 0.68.1 to 0.82.0 (#8373)

This upgrade has changed some indentation levels, moved some
parameter names to the next line etc.
I also did some manual style changes to obey the 88-90 character per
line rule and avoid commas or semicolons in a new line.

Sister PRs https://github.com/citusdata/tools/pull/382
https://github.com/citusdata/the-process/pull/179
ihalatci-patch-1^2
Naisila Puka 2025-12-11 16:51:19 +03:00 committed by GitHub
parent b68023f91d
commit 75460fa23e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
111 changed files with 725 additions and 625 deletions

View File

@ -113,10 +113,10 @@ FROM base AS uncrustify-builder
RUN sudo apt update && sudo apt install -y cmake tree
WORKDIR /uncrustify
RUN curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.68.1.tar.gz | tar xz
WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/
RUN curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.82.0.tar.gz | tar xz
WORKDIR /uncrustify/uncrustify-uncrustify-0.82.0/
RUN mkdir build
WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/build/
WORKDIR /uncrustify/uncrustify-uncrustify-0.82.0/build/
RUN cmake ..
RUN MAKEFLAGS="-j $(nproc)" make -s

View File

@ -30,9 +30,9 @@ jobs:
fail_test_image_name: "ghcr.io/citusdata/failtester"
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
style_checker_tools_version: "0.8.18"
style_checker_tools_version: "0.8.33"
sql_snapshot_pg_version: "17.6"
image_suffix: "-va20872f"
image_suffix: "-ve4d3aa0"
pg15_version: '{ "major": "15", "full": "15.14" }'
pg16_version: '{ "major": "16", "full": "16.10" }'
pg17_version: '{ "major": "17", "full": "17.6" }'

View File

@ -11,9 +11,9 @@ tool. This tool uses `uncrustify` under the hood.
```bash
# Uncrustify changes the way it formats code every release a bit. To make sure
# everyone formats consistently we use version 0.68.1:
curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.68.1.tar.gz | tar xz
cd uncrustify-uncrustify-0.68.1/
# everyone formats consistently we use version 0.82.0:
curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.82.0.tar.gz | tar xz
cd uncrustify-uncrustify-0.82.0/
mkdir build
cd build
cmake ..

View File

@ -1330,10 +1330,10 @@ GetHighestUsedAddress(Relation rel)
Oid
ColumnarRelationId(Oid relid, RelFileLocator relfilelocator)
{
return OidIsValid(relid) ? relid : RelidByRelfilenumber(RelationTablespace_compat(
relfilelocator),
RelationPhysicalIdentifierNumber_compat(
relfilelocator));
return OidIsValid(relid) ? relid : RelidByRelfilenumber(RelationTablespace_compat
(relfilelocator),
RelationPhysicalIdentifierNumber_compat
(relfilelocator));
}

View File

@ -758,8 +758,10 @@ SnapshotMightSeeUnflushedStripes(Snapshot snapshot)
}
default:
{
return false;
}
}
}

View File

@ -547,7 +547,8 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation)
if (!ColumnarLogicalOffsetIsValid(newDataReservation))
{
elog(ERROR,
"attempted to truncate relation %d to invalid logical offset: " UINT64_FORMAT,
"attempted to truncate relation %d to "
"invalid logical offset: " UINT64_FORMAT,
rel->rd_id, newDataReservation);
}

View File

@ -2410,10 +2410,11 @@ ColumnarProcessUtility(PlannedStmt *pstmt,
}
default:
{
/* FALL THROUGH */
break;
}
}
if (columnarOptions != NIL && columnarRangeVar == NULL)
{

View File

@ -185,8 +185,8 @@ typedef struct TableConversionState
static TableConversionReturn * AlterDistributedTable(TableConversionParameters *params);
static TableConversionReturn * AlterTableSetAccessMethod(
TableConversionParameters *params);
static TableConversionReturn * AlterTableSetAccessMethod(TableConversionParameters *
params);
static TableConversionReturn * ConvertTable(TableConversionState *con);
static TableConversionReturn * ConvertTableInternal(TableConversionState *con);
static bool SwitchToSequentialAndLocalExecutionIfShardNameTooLong(char *relationName,
@ -215,7 +215,7 @@ static char * CreateWorkerChangeSequenceDependencyCommand(char *qualifiedSequece
static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid);
static char * CreateMaterializedViewDDLCommand(Oid matViewOid);
static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
static bool WillRecreateFKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption);
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ErrorIfUnsupportedCascadeObjects(Oid relationId);
@ -505,7 +505,8 @@ UndistributeTable(TableConversionParameters *params)
if (!params->bypassTenantCheck && IsTenantSchema(schemaId) &&
IsCitusTableType(params->relationId, SINGLE_SHARD_DISTRIBUTED))
{
EnsureUndistributeTenantTableSafe(params->relationId,
EnsureUndistributeTenantTableSafe(
params->relationId,
TenantOperationNames[TENANT_UNDISTRIBUTE_TABLE]);
}
@ -577,7 +578,7 @@ AlterDistributedTable(TableConversionParameters *params)
TableConversionState *con = CreateTableConversion(params);
CheckAlterDistributedTableConversionParameters(con);
if (WillRecreateForeignKeyToReferenceTable(con->relationId, con->cascadeToColocated))
if (WillRecreateFKeyToReferenceTable(con->relationId, con->cascadeToColocated))
{
ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential")));
SetLocalMultiShardModifyModeToSequential();
@ -2193,12 +2194,12 @@ GetAccessMethodForMatViewIfExists(Oid viewOid)
/*
* WillRecreateForeignKeyToReferenceTable checks if the table of relationId has any foreign
* WillRecreateFKeyToReferenceTable checks if the table of relationId has any foreign
* key to a reference table, if conversion will be cascaded to colocated table this function
* also checks if any of the colocated tables have a foreign key to a reference table too
*/
bool
WillRecreateForeignKeyToReferenceTable(Oid relationId,
WillRecreateFKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption)
{
if (cascadeOption == CASCADE_TO_COLOCATED_NO ||

View File

@ -522,7 +522,7 @@ ExecuteCascadeOperationForRelationIdList(List *relationIdList,
* with the flag InTableTypeConversionFunctionCall set to true.
*/
void
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *utilityCommandList)
ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *utilityCmdList)
{
bool oldValue = InTableTypeConversionFunctionCall;
InTableTypeConversionFunctionCall = true;
@ -531,7 +531,7 @@ ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *utilityCommandL
PG_TRY();
{
char *utilityCommand = NULL;
foreach_declared_ptr(utilityCommand, utilityCommandList)
foreach_declared_ptr(utilityCommand, utilityCmdList)
{
/*
* CREATE MATERIALIZED VIEW commands need to be parsed/transformed,
@ -566,10 +566,10 @@ ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *utilityCommandL
* ExecuteAndLogUtilityCommand function for each of them.
*/
void
ExecuteAndLogUtilityCommandList(List *utilityCommandList)
ExecuteAndLogUtilityCommandList(List *utilityCmdList)
{
char *utilityCommand = NULL;
foreach_declared_ptr(utilityCommand, utilityCommandList)
foreach_declared_ptr(utilityCommand, utilityCmdList)
{
ExecuteAndLogUtilityCommand(utilityCommand);
}

View File

@ -64,8 +64,8 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
return NIL;
}
if (ops->qualify && DistOpsValidityState(stmt, ops) ==
ShouldQualifyAfterLocalCreation)
if (ops->qualify &&
DistOpsValidityState(stmt, ops) == ShouldQualifyAfterLocalCreation)
{
/* qualify the statement after local creation */
ops->qualify(stmt);

View File

@ -702,7 +702,8 @@ EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
* given table. We should make those checks after local table conversion by acquiring locks to
* the relation because the distribution column can be modified in that period.
*/
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId,
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(
relationId,
distributionColumnName);
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
@ -1108,8 +1109,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) !=
(distributedTableParams != NULL))
tableType == SINGLE_SHARD_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "

View File

@ -2165,9 +2165,11 @@ GetDistributeObjectOps(Node *node)
}
default:
{
return &Any_SecLabel;
}
}
}
case T_RenameStmt:
{

View File

@ -107,9 +107,9 @@ static void DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid
char *colocateWithTableName,
const ObjectAddress *
functionAddress);
static void DistributeFunctionColocatedWithSingleShardTable(const
ObjectAddress *functionAddress,
text *colocateWithText);
static void DistributeFunctionColocatedWithSingleShardTable(const ObjectAddress *
functionAddress, text *
colocateWithText);
static void DistributeFunctionColocatedWithReferenceTable(const
ObjectAddress *functionAddress);
static List * FilterDistributedFunctions(GrantStmt *grantStmt);
@ -1896,8 +1896,10 @@ ShouldAddFunctionSignature(FunctionParameterMode mode)
}
default:
{
return true;
}
}
}

View File

@ -96,6 +96,7 @@ PreprocessGrantStmt(Node *node, const char *queryString,
{
appendStringInfo(&privsString, "%s", priv->priv_name);
}
/*
* ALL can only be set alone.
* And ALL is not added as a keyword in priv_name by parser, but
@ -108,6 +109,7 @@ PreprocessGrantStmt(Node *node, const char *queryString,
/* this is used for column level only */
appendStringInfo(&privsString, "ALL");
}
/*
* Instead of relying only on the syntax check done by Postgres and
* adding an assert here, add a default ERROR if ALL is not first
@ -227,8 +229,8 @@ CollectGrantTableIdList(GrantStmt *grantStmt)
bool grantOnTableCommand = (grantStmt->targtype == ACL_TARGET_OBJECT &&
grantStmt->objtype == OBJECT_TABLE);
bool grantAllTablesOnSchemaCommand = (grantStmt->targtype ==
ACL_TARGET_ALL_IN_SCHEMA &&
bool grantAllTablesOnSchemaCommand = (grantStmt->targtype == ACL_TARGET_ALL_IN_SCHEMA
&&
grantStmt->objtype == OBJECT_TABLE);
/* we are only interested in table level grants */

View File

@ -64,8 +64,8 @@ static int GetNumberOfIndexParameters(IndexStmt *createIndexStatement);
static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
static void SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(
IndexStmt *createIndexStatement);
static void SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(IndexStmt *
createIndexStatement);
static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement);
static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement);
static List * GenerateIndexParameters(IndexStmt *createIndexStatement);

View File

@ -1272,8 +1272,10 @@ ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result)
}
default:
{
Assert(false); /* there are no other options for this enum */
}
}
}
@ -2858,8 +2860,8 @@ ErrorIfCopyHasOnErrorLogVerbosity(CopyStmt *copyStatement)
{
if (strcmp(option->defname, "on_error") == 0)
{
ereport(ERROR, (errmsg(
"Citus does not support COPY FROM with ON_ERROR option.")));
ereport(ERROR, (errmsg("Citus does not support "
"COPY FROM with ON_ERROR option.")));
}
else if (strcmp(option->defname, "log_verbosity") == 0)
{
@ -2876,8 +2878,8 @@ ErrorIfCopyHasOnErrorLogVerbosity(CopyStmt *copyStatement)
*/
if (log_verbosity)
{
ereport(ERROR, (errmsg(
"Citus does not support COPY FROM with LOG_VERBOSITY option.")));
ereport(ERROR, (errmsg("Citus does not support "
"COPY FROM with LOG_VERBOSITY option.")));
}
#endif
}

View File

@ -149,7 +149,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
}
default:
{
/*
* Nodes that are not supported by Citus: we pass-through to the
* main PostgreSQL executor. Any Citus-supported RenameStmt
@ -157,6 +157,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
*/
return NIL;
}
}
bool isCitusRelation = IsCitusTable(tableRelationId);
if (!isCitusRelation)

View File

@ -462,8 +462,8 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
if (IsAnyObjectDistributed(addresses) || SequenceUsedInDistributedTable(address,
DEPENDENCY_INTERNAL))
{
ereport(ERROR, (errmsg(
"Altering a distributed sequence is currently not supported.")));
ereport(ERROR, (errmsg("Altering a distributed sequence "
"is currently not supported.")));
}
/*
@ -991,8 +991,8 @@ FilterDistributedSequences(GrantStmt *stmt)
{
bool grantOnSequenceCommand = (stmt->targtype == ACL_TARGET_OBJECT &&
stmt->objtype == OBJECT_SEQUENCE);
bool grantOnAllSequencesInSchemaCommand = (stmt->targtype ==
ACL_TARGET_ALL_IN_SCHEMA &&
bool grantOnAllSequencesInSchemaCommand = (stmt->targtype == ACL_TARGET_ALL_IN_SCHEMA
&&
stmt->objtype == OBJECT_SEQUENCE);
/* we are only interested in sequence level grants */
@ -1033,10 +1033,9 @@ FilterDistributedSequences(GrantStmt *stmt)
*/
if (list_member_oid(namespaceOidList, namespaceOid))
{
RangeVar *distributedSequence = makeRangeVar(get_namespace_name(
namespaceOid),
get_rel_name(
sequenceAddress->objectId),
RangeVar *distributedSequence = makeRangeVar(
get_namespace_name(namespaceOid),
get_rel_name(sequenceAddress->objectId),
-1);
grantSequenceList = lappend(grantSequenceList, distributedSequence);
}

View File

@ -237,8 +237,10 @@ AcquireCitusAdvisoryObjectClassLockGetOid(ObjectClass objectClass,
}
default:
{
elog(ERROR, "unsupported object class: %d", objectClass);
}
}
}
@ -270,6 +272,8 @@ AcquireCitusAdvisoryObjectClassLockCheckPrivileges(ObjectClass objectClass, Oid
}
default:
{
elog(ERROR, "unsupported object class: %d", objectClass);
}
}
}

View File

@ -81,23 +81,23 @@ static void ErrorIfAttachCitusTableToPgLocalTable(Oid parentRelationId,
Oid partitionRelationId);
static bool DeparserSupportsAlterTableAddColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *addColumnSubCommand);
static bool ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(
AlterTableStmt *alterTableStatement);
static bool ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(AlterTableStmt *
alterTableStatement);
static bool ShouldMarkConnectedRelationsNotAutoConverted(Oid leftRelationId,
Oid rightRelationId);
static bool RelationIdListContainsCitusTableType(List *relationIdList,
CitusTableType citusTableType);
static bool RelationIdListContainsPostgresTable(List *relationIdList);
static void ConvertPostgresLocalTablesToCitusLocalTables(
AlterTableStmt *alterTableStatement);
static void ConvertPostgresLocalTablesToCitusLocalTables(AlterTableStmt *
alterTableStatement);
static bool RangeVarListHasLocalRelationConvertedByUser(List *relationRangeVarList,
AlterTableStmt *
alterTableStatement);
static int CompareRangeVarsByOid(const void *leftElement, const void *rightElement);
static List * GetAlterTableAddFKeyRightRelationIdList(
AlterTableStmt *alterTableStatement);
static List * GetAlterTableAddFKeyRightRelationRangeVarList(
AlterTableStmt *alterTableStatement);
static List * GetAlterTableAddFKeyRightRelationIdList(AlterTableStmt *
alterTableStatement);
static List * GetAlterTableAddFKeyRightRelationRangeVarList(AlterTableStmt *
alterTableStatement);
static List * GetAlterTableAddFKeyConstraintList(AlterTableStmt *alterTableStatement);
static List * GetAlterTableCommandFKeyConstraintList(AlterTableCmd *command);
static List * GetRangeVarListFromFKeyConstraintList(List *fKeyConstraintList);
@ -1352,6 +1352,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
constraint);
}
}
/*
* When constraint->indexname is not NULL we are handling an
* ADD {PRIMARY KEY, UNIQUE} USING INDEX command. In this case
@ -1532,6 +1533,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
}
}
}
/*
* We check for ALTER COLUMN .. SET/DROP DEFAULT
* we should not propagate anything to shards
@ -2181,8 +2183,10 @@ AlterTableCommandTypeIsTrigger(AlterTableType alterTableType)
}
default:
{
return false;
}
}
}
@ -2719,6 +2723,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
}
}
}
/*
* We check for ALTER COLUMN .. SET DEFAULT nextval('user_defined_seq')
* we should make sure that the type of the column that uses
@ -2815,6 +2820,7 @@ FixAlterTableStmtIndexNames(AlterTableStmt *alterTableStatement)
FixPartitionShardIndexNames(relationId, parentIndexOid);
}
/*
* If this is an ALTER TABLE .. ATTACH PARTITION command
* we have wrong index names generated on indexes of shards of
@ -3425,8 +3431,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
if (commandList->length > 1 ||
columnConstraints->length > 1)
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot execute ADD COLUMN .. DEFAULT nextval('..')"
" command with other subcommands/constraints"),
@ -3440,8 +3446,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
*/
if (!TableEmpty(relationId))
{
ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED),
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot add a column involving DEFAULT nextval('..') "
"because the table is not empty"),

View File

@ -1297,7 +1297,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"partial failure, potentially leading to an inconsistent "
"state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command.")));
"\nif applicable, and then re-attempt "
"the original command.")));
}
PG_RE_THROW();

View File

@ -475,8 +475,8 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
if (flags & OUTSIDE_TRANSACTION)
{
/* don't return connections that are used in transactions */
if (connection->remoteTransaction.transactionState !=
REMOTE_TRANS_NOT_STARTED)
if (connection->
remoteTransaction.transactionState != REMOTE_TRANS_NOT_STARTED)
{
continue;
}

View File

@ -191,8 +191,8 @@ static HTAB *ConnectionShardHash;
static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
const char *userName);
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(
ShardPlacement *placement);
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(ShardPlacement *
placement);
static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *placementConnection);
static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,

View File

@ -675,7 +675,8 @@ SharedConnectionStatsShmemInit(void)
ConnectionStatsSharedState->sharedConnectionHashTrancheId = LWLockNewTrancheId();
ConnectionStatsSharedState->sharedConnectionHashTrancheName =
"Shared Connection Tracking Hash Tranche";
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
LWLockRegisterTranche(
ConnectionStatsSharedState->sharedConnectionHashTrancheId,
ConnectionStatsSharedState->sharedConnectionHashTrancheName);
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,

View File

@ -28,7 +28,8 @@ static void AppendCreateExtensionStmtOptions(StringInfo buf, List *options);
static void AppendDropExtensionStmt(StringInfo buf, DropStmt *stmt);
static void AppendExtensionNameList(StringInfo buf, List *objects);
static void AppendAlterExtensionSchemaStmt(StringInfo buf,
AlterObjectSchemaStmt *alterExtensionSchemaStmt);
AlterObjectSchemaStmt *
alterExtensionSchemaStmt);
static void AppendAlterExtensionStmt(StringInfo buf,
AlterExtensionStmt *alterExtensionStmt);

View File

@ -290,8 +290,10 @@ GetDefElemActionString(DefElemAction action)
}
default:
{
return "";
}
}
}

View File

@ -118,9 +118,11 @@ ObjectTypeToKeyword(ObjectType objtype)
}
default:
{
elog(ERROR, "Unknown object type: %d", objtype);
return NULL;
}
}
}

View File

@ -242,8 +242,8 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"only simple column references are allowed in CREATE STATISTICS")));
errmsg("only simple column references are allowed "
"in CREATE STATISTICS")));
}
const char *columnName = quote_identifier(column->name);

View File

@ -536,9 +536,11 @@ GeneratedWhenStr(char generatedWhen)
}
default:
{
ereport(ERROR, (errmsg("unrecognized generated_when: %d",
generatedWhen)));
}
}
}

View File

@ -642,11 +642,11 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
xactProperties,
List *jobIdList,
bool localExecutionSupported);
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
static TransactionProperties DecideTaskListTransactionProperties(RowModifyLevel
modLevel,
List *taskList,
bool
exludeFromTransaction);
excludeFromTransaction);
static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
@ -711,8 +711,8 @@ static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution);
static int GetEventSetSize(List *sessionList);
static bool ProcessSessionsWithFailedWaitEventSetOperations(
DistributedExecution *execution);
static bool ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *
execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static void RebuildWaitEventSet(DistributedExecution *execution);
static void RebuildWaitEventSetForSessions(DistributedExecution *execution);
@ -842,7 +842,7 @@ AdaptiveExecutor(CitusScanState *scanState)
bool excludeFromXact = false;
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
TransactionProperties xactProperties = DecideTaskListTransactionProperties(
distributedPlan->modLevel, taskList, excludeFromXact);
/*
@ -941,7 +941,7 @@ ExecuteUtilityTaskList(List *utilityTaskList, bool localExecutionSupported)
modLevel, utilityTaskList, MaxAdaptiveExecutorPoolSize, localExecutionSupported
);
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList, false);
DecideTaskListTransactionProperties(modLevel, utilityTaskList, false);
executionParams->isUtilityCommand = true;
return ExecuteTaskListExtended(executionParams);
@ -963,7 +963,7 @@ ExecuteUtilityTaskListExtended(List *utilityTaskList, int poolSize,
bool excludeFromXact = false;
executionParams->xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, utilityTaskList,
DecideTaskListTransactionProperties(modLevel, utilityTaskList,
excludeFromXact);
executionParams->isUtilityCommand = true;
@ -984,7 +984,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList)
);
bool excludeFromXact = false;
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
executionParams->xactProperties = DecideTaskListTransactionProperties(
modLevel, taskList, excludeFromXact);
return ExecuteTaskListExtended(executionParams);
@ -1010,7 +1010,7 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
executionParams->xactProperties = DecideTaskListTransactionProperties(
modLevel, taskList, true);
return ExecuteTaskListExtended(executionParams);
}
@ -1032,7 +1032,7 @@ CreateDefaultExecutionParams(RowModifyLevel modLevel, List *taskList,
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
executionParams->xactProperties = DecideTaskListTransactionProperties(
modLevel, taskList, false);
executionParams->expectResults = expectResults;
executionParams->tupleDestination = tupleDest;
@ -1252,7 +1252,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
/*
* DecideTransactionPropertiesForTaskList decides whether to use remote transaction
* DecideTaskListTransactionProperties decides whether to use remote transaction
* blocks, whether to use 2PC for the given task list, and whether to error on any
* failure.
*
@ -1260,8 +1260,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
* errorOnAnyFailure, but not the other way around) we keep them in the same place.
*/
static TransactionProperties
DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, bool
exludeFromTransaction)
DecideTaskListTransactionProperties(RowModifyLevel modLevel, List *taskList, bool
excludeFromTransaction)
{
TransactionProperties xactProperties;
@ -1277,7 +1277,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList,
return xactProperties;
}
if (exludeFromTransaction)
if (excludeFromTransaction)
{
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED;
return xactProperties;
@ -2634,10 +2634,8 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
connectionFlags |= adaptiveConnectionManagementFlag;
/* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
workerPool->nodeName,
workerPool->nodePort,
NULL, NULL);
MultiConnection *connection = StartNodeUserDatabaseConnection(
connectionFlags, workerPool->nodeName, workerPool->nodePort, NULL, NULL);
if (!connection)
{
/* connection can only be NULL for optional connections */

View File

@ -67,8 +67,8 @@ static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob);
static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
static DistributedPlan * CopyDistributedPlanWithoutCache(DistributedPlan *
originalDistributedPlan);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
static void EnsureForceDelegationDistributionKey(Job *job);

View File

@ -69,8 +69,8 @@ static List * WrapTasksForPartitioning(const char *resultIdPrefix,
bool binaryFormat);
static List * ExecutePartitionTaskList(List *partitionTaskList,
CitusTableCacheEntry *targetRelation);
static PartitioningTupleDest * CreatePartitioningTupleDest(
CitusTableCacheEntry *targetRelation);
static PartitioningTupleDest * CreatePartitioningTupleDest(CitusTableCacheEntry *
targetRelation);
static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize);

View File

@ -66,7 +66,8 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList,
PlannedStmt *selectPlan,
EState *executorState,
char *intermediateResultIdPrefix);
char *
intermediateResultIdPrefix);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);

View File

@ -38,11 +38,13 @@ static HTAB * ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetR
sourceTargetList,
PlannedStmt *
sourcePlan,
EState *executorState,
EState *
executorState,
char *
intermediateResultIdPrefix,
int
partitionColumnIndex);
partitionColumnIndex)
;
/*

View File

@ -180,9 +180,10 @@ static bool FollowExtAndInternalDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition);
static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
DependencyDefinition *definition);
static void ApplyAddCitusDependedObjectsToDependencyList(
ObjectAddressCollector *collector,
DependencyDefinition *definition);
static void ApplyAddCitusDependedObjectsToDependencyList(ObjectAddressCollector *
collector,
DependencyDefinition *
definition);
static List * GetViewRuleReferenceDependencyList(Oid relationId);
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
ObjectAddress target);

View File

@ -338,8 +338,8 @@ ShouldMarkRelationDistributed(Oid relationId)
bool ownedByExtension = IsTableOwnedByExtension(relationId);
bool alreadyDistributed = IsObjectDistributed(relationAddress);
bool hasUnsupportedDependency =
DeferErrorIfAnyObjectHasUnsupportedDependency(list_make1(relationAddress)) !=
NULL;
DeferErrorIfAnyObjectHasUnsupportedDependency(
list_make1(relationAddress)) != NULL;
bool hasCircularDependency =
DeferErrorIfCircularDependencyExists(relationAddress) != NULL;

View File

@ -823,8 +823,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
/* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */
char *subqueryForPartitionedShards =
GenerateSizeQueryForRelationNameList(partitionedShardNames,
GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType));
GetWorkerPartitionedSizeUDFNameBySizeQueryType
(sizeQueryType));
/* SELECT SUM(pg_..._size) FROM VALUES (...) */
char *subqueryForNonPartitionedShards =
@ -4266,10 +4266,9 @@ CancelTasksForJob(int64 jobid)
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(jobid));
const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTaskJobIdTaskIdIndexId(),
indexOK, NULL,
lengthof(scanKey), scanKey);
SysScanDesc scanDescriptor = systable_beginscan(
pgDistBackgroundTasks, DistBackgroundTaskJobIdTaskIdIndexId(),
indexOK, NULL, lengthof(scanKey), scanKey);
List *runningTaskPids = NIL;
HeapTuple taskTuple = NULL;

View File

@ -76,7 +76,8 @@ static List * DropTaskList(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList);
static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName,
const char *dropShardPlacementCommand);
const char *
dropShardPlacementCommand);
static char * CreateDropShardPlacementCommand(const char *schemaName,
const char *shardRelationName,
char storageType);

View File

@ -78,7 +78,8 @@ static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_
indexForm,
List **
indexDDLEventList,
int indexFlags);
int
indexFlags);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,

View File

@ -939,8 +939,8 @@ TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePo
* because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block.
*/
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) !=
RESPONSE_OKAY)
if (ExecuteOptionalRemoteCommand(
connection, commandString, NULL) != RESPONSE_OKAY)
{
executeCommand = false;
break;

View File

@ -131,14 +131,16 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
uint32 colocationId);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreatePartitioningHierarchyForBlockingSplit(
List *shardGroupSplitIntervalListList,
static void CreatePartitioningHierarchyForBlockingSplit(List *
shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
static StringInfo CreateSplitShardReplicationSetupUDF(
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
static StringInfo CreateSplitShardReplicationSetupUDF(List *
sourceColocatedShardIntervalList,
List *
shardGroupSplitIntervalListList,
List *destinationWorkerNodesList,
DistributionColumnMap *
distributionColumnOverrides);
@ -816,7 +818,7 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
ROW_MODIFY_NONE,
ddlTaskExecList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
NULL /* jobIdList (ignored by API impl.) */);
}
@ -883,7 +885,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */);
NULL /* jobIdList (ignored by API impl.) */);
}
@ -1880,8 +1882,9 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"Failed to run worker_split_shard_replication_setup UDF. It should successfully execute "
" for splitting a shard in a non-blocking way. Please retry.")));
"Failed to run worker_split_shard_replication_setup UDF. "
"It should successfully execute for splitting a shard in "
"a non-blocking way. Please retry.")));
}
/* Get replication slot information */

View File

@ -2064,8 +2064,7 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, copyTaskList,
MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */
);
NULL /* jobIdList (ignored by API impl.) */);
}

View File

@ -471,8 +471,8 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
bool isBinaryCopy = localCopyOutState->binary;
bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len ==
0);
bool shouldAddBinaryHeaders = (isBinaryCopy &&
localCopyOutState->fe_msgbuf->len == 0);
if (shouldAddBinaryHeaders)
{
AppendCopyBinaryHeaders(localCopyOutState);

View File

@ -71,8 +71,8 @@ worker_split_copy(PG_FUNCTION_ARGS)
if (arrayHasNull)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg(
"pg_catalog.split_copy_info array cannot contain null values")));
errmsg("pg_catalog.split_copy_info array "
"cannot contain null values")));
}
const int slice_ndim = 0;

View File

@ -85,8 +85,8 @@ int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static PlannedStmt * CreateDistributedPlannedStmt(
DistributedPlanningContext *planContext);
static PlannedStmt * CreateDistributedPlannedStmt(DistributedPlanningContext *
planContext);
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext
*planContext);
@ -125,14 +125,14 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList);
static void CreateAndPushPlannerRestrictionContext(
DistributedPlanningContext *planContext,
static void CreateAndPushPlannerRestrictionContext(DistributedPlanningContext *
planContext,
FastPathRestrictionContext *
fastPathContext);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext(
PlannerRestrictionContext *plannerRestrictionContext);
static void ResetPlannerRestrictionContext(PlannerRestrictionContext *
plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter);

View File

@ -828,10 +828,11 @@ IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId)
Assert(AllowedDistributionColumnValue.isActive);
Assert(ExecutorLevel > AllowedDistributionColumnValue.executorLevel);
ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d",
ereport(DEBUG4, errmsg(
"Comparing saved:%s with Shard key: %s colocationid:%d:%d",
pretty_format_node_dump(
nodeToString(
AllowedDistributionColumnValue.distributionColumnValue)),
nodeToString(AllowedDistributionColumnValue.
distributionColumnValue)),
pretty_format_node_dump(nodeToString(shardKey)),
AllowedDistributionColumnValue.colocationId, colocationId));

View File

@ -66,7 +66,8 @@ static bool InsertSelectHasRouterSelect(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
CitusTableCacheEntry *targetTableCacheEntry,
CitusTableCacheEntry *
targetTableCacheEntry,
ShardInterval *shardInterval,
PlannerRestrictionContext *
plannerRestrictionContext,
@ -1152,7 +1153,8 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
exprTypmod((Node *) newSubqueryTargetEntry->expr),
exprCollation((Node *) newSubqueryTargetEntry->expr),
0);
TargetEntry *newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar,
TargetEntry *newInsertTargetEntry = makeTargetEntry(
(Expr *) newInsertVar,
originalAttrNo,
oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk);

View File

@ -67,7 +67,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
Query *query,
Node *quals,
List *targetList,
CmdType commandType);
CmdType
commandType);
static DistributedPlan * CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery,
Query *query,
@ -574,8 +575,8 @@ IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool
Var *distributionColumn = DistPartitionKey(relationId);
/* not all distributed tables have partition column */
if (distributionColumn != NULL && column->varattno ==
distributionColumn->varattno)
if (distributionColumn != NULL &&
column->varattno == distributionColumn->varattno)
{
isDistributionColumn = true;
}
@ -1045,7 +1046,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
PlannerRestrictionContext *plannerRestrictionContext)
{
ListCell *restrictionCell = NULL;
foreach(restrictionCell,
foreach(
restrictionCell,
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
@ -1078,7 +1080,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
*/
static DeferredErrorMessage *
DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
PlannerRestrictionContext *plannerRestrictionContext,
PlannerRestrictionContext *
plannerRestrictionContext,
Oid targetRelationId)
{
List *distTablesList = NIL;
@ -1115,8 +1118,8 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
if (list_length(distTablesList) > 0 && list_length(localTablesList) > 0)
{
ereport(DEBUG1, (errmsg(
"A mix of distributed and local table, try repartitioning")));
ereport(DEBUG1, (errmsg("A mix of distributed and local table, "
"try repartitioning")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"A mix of distributed and citus-local table, "
"routable query is not possible", NULL, NULL);

View File

@ -271,7 +271,8 @@ static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
static bool WorkerAggregateWalker(Node *node,
WorkerAggregateWalkerContext *walkerContext);
static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
WorkerAggregateWalkerContext *walkerContextry);
WorkerAggregateWalkerContext *
walkerContextry);
static AggregateType GetAggregateType(Aggref *aggregatExpression);
static Oid AggregateArgumentType(Aggref *aggregate);
static Expr * FirstAggregateArgument(Aggref *aggregate);
@ -293,16 +294,17 @@ static Const * MakeIntegerConst(int32 integerValue);
/* Local functions forward declarations for aggregate expression checks */
static bool HasNonDistributableAggregates(MultiNode *logicalPlanNode);
static bool CanPushDownExpression(Node *expression,
const ExtendedOpNodeProperties *extendedOpNodeProperties);
static DeferredErrorMessage * DeferErrorIfHasNonDistributableAggregates(
MultiNode *logicalPlanNode);
static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate(
Aggref *arrayAggregateExpression);
const ExtendedOpNodeProperties *
extendedOpNodeProperties);
static DeferredErrorMessage * DeferErrorIfHasNonDistributableAggregates(MultiNode *
logicalPlanNode);
static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate(Aggref *
arrayAggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedJsonAggregate(AggregateType type,
Aggref *
aggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct(
Aggref *aggregateExpression,
static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct(Aggref *
aggregateExpression,
MultiNode *
logicalPlanNode);
static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
@ -322,8 +324,8 @@ static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
static bool HasOrderByHllType(List *sortClauseList, List *targetList);
static bool ShouldProcessDistinctOrderAndLimitForWorker(
ExtendedOpNodeProperties *extendedOpNodeProperties,
static bool ShouldProcessDistinctOrderAndLimitForWorker(ExtendedOpNodeProperties *
extendedOpNodeProperties,
bool pushingDownOriginalGrouping,
Node *havingQual);
static bool IsIndexInRange(const List *list, int index);
@ -5061,8 +5063,8 @@ HasOrderByHllType(List *sortClauseList, List *targetList)
* neither should ProcessLimitOrderByForWorkerQuery.
*/
static bool
ShouldProcessDistinctOrderAndLimitForWorker(
ExtendedOpNodeProperties *extendedOpNodeProperties,
ShouldProcessDistinctOrderAndLimitForWorker(ExtendedOpNodeProperties *
extendedOpNodeProperties,
bool pushingDownOriginalGrouping,
Node *havingQual)
{

View File

@ -153,8 +153,8 @@ static String * MakeDummyColumnString(int dummyColumnId);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList);
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
static DeferredErrorMessage * DeferErrorIfUnsupportedRouterPlannableSelectQuery(
Query *query);
static DeferredErrorMessage * DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *
query);
static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree);
static DeferredErrorMessage * ErrorIfQueryHasCTEWithSearchClause(Query *queryTree);
static bool ContainsSearchClauseWalker(Node *node, void *context);
@ -855,7 +855,8 @@ DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList)
"Modifying local tables with remote local tables is "
"not supported.",
NULL,
"Consider wrapping remote local table to a CTE, or subquery");
"Consider wrapping remote local table to a CTE, "
"or subquery");
}
return NULL;
}
@ -3151,8 +3152,8 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
FindShardInterval(inputDistributionKeyValue->constvalue, cache);
if (cachedShardInterval == NULL)
{
ereport(ERROR, (errmsg(
"could not find shardinterval to which to send the query")));
ereport(ERROR, (errmsg("could not find shardinterval to which to send "
"the query")));
}
if (outputPartitionValueConst != NULL)

View File

@ -107,8 +107,12 @@ static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int v
static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
Relids relids);
static char * RecurringTypeDescription(RecurringTuplesType recurType);
static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery(
PlannerInfo *plannerInfo, Relids recurringRelIds, Relids nonRecurringRelIds);
static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery(PlannerInfo *
plannerInfo,
Relids
recurringRelIds,
Relids
nonRecurringRelIds);
static bool ContainsLateralSubquery(PlannerInfo *plannerInfo);
static Var * PartitionColumnForPushedDownSubquery(Query *query);
static bool ContainsReferencesToRelids(Query *query, Relids relids, int *foundRelid);
@ -790,8 +794,8 @@ FromClauseRecurringTupleType(Query *queryTree)
* such queries have lateral subqueries.
*/
static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin(
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorIfUnsupportedRecurringTuplesJoin(PlannerRestrictionContext *
plannerRestrictionContext,
bool plannerPhase)
{
List *joinRestrictionList =

View File

@ -161,7 +161,8 @@ static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
RecursivePlanningContext *
recursivePlanningContext);
static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
RecursivePlanningContext *context,
RecursivePlanningContext *
context,
bool chainedJoin);
static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query,
RecursivePlanningContext *context);
@ -207,8 +208,8 @@ static bool CanPushdownRecurringOuterJoinOnOuterRTE(RangeTblEntry *rte);
static bool CanPushdownRecurringOuterJoinOnInnerVar(Var *innervar, RangeTblEntry *rte);
static bool CanPushdownRecurringOuterJoin(JoinExpr *joinExpr, Query *query);
#if PG_VERSION_NUM < PG_VERSION_17
static bool hasPseudoconstantQuals(
RelationRestrictionContext *relationRestrictionContext);
static bool hasPseudoconstantQuals(RelationRestrictionContext *
relationRestrictionContext);
#endif
/*
@ -2192,6 +2193,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
subquery->targetList = lappend(subquery->targetList, targetEntry);
}
}
/*
* If tupleDesc is NULL we have 2 different cases:
*
@ -2241,6 +2243,7 @@ TransformFunctionRTE(RangeTblEntry *rangeTblEntry)
columnType = list_nth_oid(rangeTblFunction->funccoltypes,
targetColumnIndex);
}
/* use the types in the function definition otherwise */
else
{
@ -2780,8 +2783,8 @@ CanPushdownRecurringOuterJoinOnInnerVar(Var *innerVar, RangeTblEntry *rte)
}
/* Check if the inner variable is part of the distribution column */
if (cacheEntry->partitionColumn && innerVar->varattno ==
cacheEntry->partitionColumn->varattno)
if (cacheEntry->partitionColumn &&
innerVar->varattno == cacheEntry->partitionColumn->varattno)
{
return true;
}
@ -2921,8 +2924,8 @@ CanPushdownRecurringOuterJoinExtended(JoinExpr *joinExpr, Query *query,
if (JoinTreeContainsLateral(joinExpr->rarg, query->rtable) || JoinTreeContainsLateral(
joinExpr->larg, query->rtable))
{
ereport(DEBUG5, (errmsg(
"Lateral join is not supported for pushdown in this path.")));
ereport(DEBUG5, (errmsg("Lateral join is not supported for pushdown "
"in this path.")));
return false;
}
@ -2983,6 +2986,7 @@ CanPushdownRecurringOuterJoinExtended(JoinExpr *joinExpr, Query *query,
return true;
}
}
/* the inner table is a subquery, extract the base relation referred in the qual */
else if (rte && rte->rtekind == RTE_SUBQUERY)
{

View File

@ -156,8 +156,9 @@ static bool AllDistributedRelationsInRestrictionContextColocated(
restrictionContext);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
static bool HasPlaceHolderVar(Node *node);
static JoinRestrictionContext * FilterJoinRestrictionContext(
JoinRestrictionContext *joinRestrictionContext, Relids
static JoinRestrictionContext * FilterJoinRestrictionContext(JoinRestrictionContext *
joinRestrictionContext,
Relids
queryRteIdentities);
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
rangeTableArrayLength, Relids
@ -613,7 +614,8 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *restrictionCon
List *attributeEquivalenceList = GenerateAllAttributeEquivalences(restrictionContext);
return RestrictionEquivalenceForPartitionKeysViaEquivalences(restrictionContext,
return RestrictionEquivalenceForPartitionKeysViaEquivalences(
restrictionContext,
attributeEquivalenceList);
}
@ -1160,8 +1162,8 @@ GenerateCommonEquivalence(List *attributeEquivalenceList,
* with a single AttributeEquivalenceClassMember.
*/
static AttributeEquivalenceClass *
GenerateEquivalenceClassForRelationRestriction(
RelationRestrictionContext *relationRestrictionContext)
GenerateEquivalenceClassForRelationRestriction(RelationRestrictionContext *
relationRestrictionContext)
{
ListCell *relationRestrictionCell = NULL;
AttributeEquivalenceClassMember *eqMember = NULL;
@ -2071,8 +2073,8 @@ FindQueryContainingRTEIdentityInternal(Node *node,
* distributed relations in the given relation restrictions list are co-located.
*/
static bool
AllDistributedRelationsInRestrictionContextColocated(
RelationRestrictionContext *restrictionContext)
AllDistributedRelationsInRestrictionContextColocated(RelationRestrictionContext *
restrictionContext)
{
RelationRestriction *relationRestriction = NULL;
List *relationIdList = NIL;

View File

@ -1215,9 +1215,11 @@ AddPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, OpExpr *opCla
}
default:
{
Assert(false);
}
}
}
prune->hasValidConstraint = true;
}

View File

@ -131,8 +131,8 @@ static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList);
static char * escape_param_str(const char *str);
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition(
GroupedLogicalRepTargets *groupedLogicalRepTargets);
static XLogRecPtr GetSubscriptionPosition(GroupedLogicalRepTargets *
groupedLogicalRepTargets);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals);

View File

@ -210,10 +210,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
{
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
#else
switch (change->action)
{
@ -245,10 +247,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
{
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
#endif
/* Current replication slot is not responsible for handling the change */
@ -318,10 +322,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
{
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
#else
switch (change->action)
{
@ -373,10 +379,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default:
{
ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action));
}
}
#endif
}

View File

@ -539,7 +539,8 @@ StatCountersShmemInit(void)
bool sharedBackendStatsSlotArrayAlreadyInit = false;
SharedBackendStatsSlotArray = (BackendStatsSlot *)
ShmemInitStruct("Citus Shared Backend Stats Slot Array",
ShmemInitStruct(
"Citus Shared Backend Stats Slot Array",
SharedBackendStatsSlotArrayShmemSize(),
&sharedBackendStatsSlotArrayAlreadyInit);

View File

@ -34,8 +34,8 @@
#include "distributed/shard_rebalancer.h"
/* static declarations for json conversion */
static List * JsonArrayToShardPlacementTestInfoList(
ArrayType *shardPlacementJsonArrayObject);
static List * JsonArrayToShardPlacementTestInfoList(ArrayType *
shardPlacementJsonArrayObject);
static List * JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject);
static bool JsonFieldValueBoolDefault(Datum jsonDocument, const char *key,
bool defaultValue);

View File

@ -395,8 +395,8 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
DistributedTransactionId *currentTransactionId =
&currentBackendData.transactionId;
if (currentTransactionId->transactionNumber !=
transactionNode->transactionId.transactionNumber)
if (currentTransactionId->transactionNumber != transactionNode->transactionId.
transactionNumber)
{
continue;
}

View File

@ -634,8 +634,9 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
* false.
*/
bool
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection, List *commandList)
SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *
workerConnection, List *
commandList)
{
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{

View File

@ -376,12 +376,12 @@ ExtractAggregationValues(FunctionCallInfo fcinfo, int argumentIndex,
HeapTupleHeader tupleHeader =
DatumGetHeapTupleHeader(fcGetArgValue(fcinfo, argumentIndex));
if (HeapTupleHeaderGetNatts(tupleHeader) !=
aggregationArgumentContext->argumentCount ||
HeapTupleHeaderGetTypeId(tupleHeader) !=
aggregationArgumentContext->tupleDesc->tdtypeid ||
HeapTupleHeaderGetTypMod(tupleHeader) !=
aggregationArgumentContext->tupleDesc->tdtypmod)
if (HeapTupleHeaderGetNatts(
tupleHeader) != aggregationArgumentContext->argumentCount ||
HeapTupleHeaderGetTypeId(
tupleHeader) != aggregationArgumentContext->tupleDesc->tdtypeid ||
HeapTupleHeaderGetTypMod(
tupleHeader) != aggregationArgumentContext->tupleDesc->tdtypmod)
{
ereport(ERROR, (errmsg("worker_partial_agg_sfunc received "
"incompatible record")));
@ -817,8 +817,8 @@ coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
if (!TypecheckCoordCombineAggReturnType(fcinfo, ffunc, box))
{
ereport(ERROR, (errmsg(
"coord_combine_agg_ffunc could not confirm type correctness")));
ereport(ERROR, (errmsg("coord_combine_agg_ffunc could not "
"confirm type correctness")));
}
if (ffunc == InvalidOid)

View File

@ -88,26 +88,26 @@ static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInf
bool *hadError);
static void UpdateDependingTasks(BackgroundTask *task);
static int64 CalculateBackoffDelay(int retryCount);
static bool NewExecutorExceedsCitusLimit(
QueueMonitorExecutionContext *queueMonitorExecutionContext);
static bool NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *
queueMonitorExecutionContext);
static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
QueueMonitorExecutionContext *
queueMonitorExecutionContext);
static bool AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
QueueMonitorExecutionContext *
queueMonitorExecutionContext);
static void AssignRunnableTasks(
QueueMonitorExecutionContext *queueMonitorExecutionContext);
static void AssignRunnableTasks(QueueMonitorExecutionContext *
queueMonitorExecutionContext);
static List * GetRunningTaskEntries(HTAB *currentExecutors);
static shm_mq_result ReadFromExecutorQueue(
BackgroundExecutorHashEntry *backgroundExecutorHashEntry,
static shm_mq_result ReadFromExecutorQueue(BackgroundExecutorHashEntry *
backgroundExecutorHashEntry,
bool *hadError);
static void CheckAndResetLastWorkerAllocationFailure(
QueueMonitorExecutionContext *queueMonitorExecutionContext);
static TaskExecutionStatus TaskConcurrentCancelCheck(
TaskExecutionContext *taskExecutionContext);
static TaskExecutionStatus ConsumeExecutorQueue(
TaskExecutionContext *taskExecutionContext);
static void CheckAndResetLastWorkerAllocationFailure(QueueMonitorExecutionContext *
queueMonitorExecutionContext);
static TaskExecutionStatus TaskConcurrentCancelCheck(TaskExecutionContext *
taskExecutionContext);
static TaskExecutionStatus ConsumeExecutorQueue(TaskExecutionContext *
taskExecutionContext);
static void TaskHadError(TaskExecutionContext *taskExecutionContext);
static void TaskEnded(TaskExecutionContext *taskExecutionContext);
static void TerminateAllTaskExecutors(HTAB *currentExecutors);
@ -537,7 +537,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
*/
static bool
AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
QueueMonitorExecutionContext *queueMonitorExecutionContext)
QueueMonitorExecutionContext *
queueMonitorExecutionContext)
{
Assert(runnableTask && runnableTask->status == BACKGROUND_TASK_STATUS_RUNNABLE);
@ -649,8 +650,8 @@ GetRunningTaskEntries(HTAB *currentExecutors)
* It also resets the failure timestamp.
*/
static void
CheckAndResetLastWorkerAllocationFailure(
QueueMonitorExecutionContext *queueMonitorExecutionContext)
CheckAndResetLastWorkerAllocationFailure(QueueMonitorExecutionContext *
queueMonitorExecutionContext)
{
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime > 0)
{

View File

@ -531,8 +531,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,
continue;
}
if (colocationId == INVALID_COLOCATION_ID || colocationId >
colocationForm->colocationid)
if (colocationId == INVALID_COLOCATION_ID ||
colocationId > colocationForm->colocationid)
{
/*
* We assign the smallest colocation id among all the matches so that we
@ -1051,8 +1051,8 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex.
*/
Assert(cacheEntry->shardIntervalArrayLength ==
colocatedTableCacheEntry->shardIntervalArrayLength);
Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
shardIntervalArrayLength);
ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
@ -1122,8 +1122,8 @@ ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex.
*/
Assert(cacheEntry->shardIntervalArrayLength ==
colocatedTableCacheEntry->shardIntervalArrayLength);
Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
shardIntervalArrayLength);
ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];

View File

@ -81,7 +81,8 @@ static List * GetRelationshipNodesForFKeyConnectedRelations(
static List * GetAllNeighboursList(ForeignConstraintRelationshipNode *relationshipNode);
static ForeignConstraintRelationshipNode * GetRelationshipNodeForRelationId(Oid
relationId,
bool *isFound);
bool *
isFound);
static void CreateForeignConstraintRelationshipGraph(void);
static bool IsForeignConstraintRelationshipGraphValid(void);
static List * GetNeighbourList(ForeignConstraintRelationshipNode *relationshipNode,
@ -177,8 +178,8 @@ ShouldUndistributeCitusLocalTable(Oid relationId)
* to given relation node via a foreign key relationhip graph.
*/
static List *
GetRelationshipNodesForFKeyConnectedRelations(
ForeignConstraintRelationshipNode *relationshipNode)
GetRelationshipNodesForFKeyConnectedRelations(ForeignConstraintRelationshipNode *
relationshipNode)
{
HTAB *oidVisitedMap = CreateSimpleHashSetWithName(Oid, "oid visited hash set");
@ -566,8 +567,8 @@ PopulateAdjacencyLists(void)
/* we just saw this edge, no need to add it twice */
if (currentFConstraintRelationshipEdge->referencingRelationOID ==
prevReferencingOid &&
currentFConstraintRelationshipEdge->referencedRelationOID ==
prevReferencedOid)
currentFConstraintRelationshipEdge->referencedRelationOID == prevReferencedOid
)
{
continue;
}

View File

@ -61,8 +61,12 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *indexIdList,
Oid partitionRelationId);
static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId);
static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char *
qualifiedParentShardIndexName,
Oid
parentIndexId,
Oid
partitionRelationId);
static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
partitionIndexId,
char *
@ -652,8 +656,10 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
* given partition. Otherwise, all the partitions are included.
*/
static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId)
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char *
qualifiedParentShardIndexName,
Oid parentIndexId, Oid
partitionRelationId)
{
List *commandList = NIL;

View File

@ -29,7 +29,8 @@ extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeSequenceDefaults
includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
char *accessMethod);
extern void EnsureRelationKindSupported(Oid relationId);

View File

@ -48,7 +48,8 @@ extern void SwitchToSequentialAndLocalExecutionIfRelationNameTooLong(Oid relatio
extern void SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(Oid
parentRelationId,
Oid
partitionRelationId);
partitionRelationId)
;
/* DistOpsOperationType to be used in DistributeObjectOps */
typedef enum DistOpsOperationType
@ -560,13 +561,15 @@ extern List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryStr
processUtilityContext);
extern List * PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessSequenceAlterTableStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessDropSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern List * SequenceDropStmtObjectAddress(Node *stmt, bool missing_ok, bool
@ -639,7 +642,8 @@ extern void PrepareAlterTableStmtForConstraint(AlterTableStmt *alterTableStateme
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
ProcessUtilityContext
processUtilityContext);
extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
extern void SkipForeignKeyValidationIfConstraintIsFkey(AlterTableStmt *alterTableStmt,
@ -789,8 +793,8 @@ extern List * PostprocessAlterTriggerDependsStmt(Node *node, const char *querySt
extern List * PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern void AlterTriggerDependsEventExtendNames(
AlterObjectDependsStmt *alterTriggerDependsStmt,
extern void AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *
alterTriggerDependsStmt,
char *schemaName, uint64 shardId);
extern void ErrorOutForTriggerIfNotSupported(Oid relationId);
extern void ErrorIfRelationHasUnsupportedTrigger(Oid relationId);
@ -834,8 +838,8 @@ extern bool RelationIdListHasReferenceTable(List *relationIdList);
extern List * GetFKeyCreationCommandsForRelationIdList(List *relationIdList);
extern void DropRelationForeignKeys(Oid relationId, int flags);
extern void SetLocalEnableLocalReferenceForeignKeys(bool state);
extern void ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(
List *utilityCommandList);
extern void ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *
utilityCmdList);
extern void ExecuteAndLogUtilityCommandList(List *ddlCommandList);
extern void ExecuteAndLogUtilityCommand(const char *commandString);
extern void ExecuteForeignKeyCreateCommandList(List *ddlCommandList,

View File

@ -63,7 +63,8 @@
#define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPLY_INTER_SHARD_DDL_COMMAND \
"SELECT worker_apply_inter_shard_ddl_command (" UINT64_FORMAT ", %s, " UINT64_FORMAT \
"SELECT worker_apply_inter_shard_ddl_command (" UINT64_FORMAT \
", %s, " UINT64_FORMAT \
", %s, %s)"
#define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)"
@ -225,7 +226,8 @@ extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId,
IncludeSequenceDefaults includeSequenceDefaults,
IncludeSequenceDefaults
includeSequenceDefaults,
IncludeIdentities includeIdentityDefaults,
bool creatingShellTableOnRemoteNode);
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,

View File

@ -40,8 +40,9 @@ typedef struct ExtendedOpNodeProperties
} ExtendedOpNodeProperties;
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(
MultiExtendedOp *extendedOpNode, bool hasNonDistributableAggregates);
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(MultiExtendedOp *
extendedOpNode, bool
hasNonDistributableAggregates);
#endif /* EXTENDED_OP_NODE_UTILS_H_ */

View File

@ -112,8 +112,8 @@ extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectT
int partitionColumnIndex,
CitusTableCacheEntry *distributionScheme,
bool binaryFormat);
extern char * QueryStringForFragmentsTransfer(
NodeToNodeFragmentsTransfer *fragmentsTransfer);
extern char * QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *
fragmentsTransfer);
extern void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
Oid intervalTypeId, ArrayType **minValueArray,
ArrayType **maxValueArray);

View File

@ -160,7 +160,7 @@ typedef struct ListCellAndListWrapper
for (int var ## PositionDoNotUse = 0; \
(var ## PositionDoNotUse) < list_length(l) && \
(((var) = list_nth(l, var ## PositionDoNotUse)) || true); \
var ## PositionDoNotUse ++)
var ## PositionDoNotUse++)
/* utility functions declaration shared within this module */
extern List * SortList(List *pointerList,

View File

@ -210,7 +210,8 @@ extern ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable(
DistObjectCacheEntry *procedure, List *argumentList, Var *partitionColumn,
CitusTableCacheEntry
*cacheEntry,
PlannedStmt *plan);
PlannedStmt *
plan);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);

View File

@ -21,7 +21,8 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement,
const char *userName);
extern MultiConnection * GetConnectionIfPlacementAccessedInXact(int flags,
List *placementAccessList,
List *
placementAccessList,
const char *userName);
extern MultiConnection * StartPlacementListConnection(uint32 flags,
List *placementAccessList,

View File

@ -26,7 +26,8 @@ extern int ValuesMaterializationThreshold;
extern bool CanPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit);
extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
PlannerRestrictionContext *plannerRestrictionContext);
PlannerRestrictionContext *
plannerRestrictionContext);
extern bool JoinTreeContainsSubquery(Query *query);
extern bool IsNodeSubquery(Node *node);
extern bool HasEmptyJoinTree(Query *query);
@ -37,8 +38,8 @@ extern MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(
Query *originalQuery,
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query *
originalQuery,
PlannerRestrictionContext
*
plannerRestrictionContext,

View File

@ -31,8 +31,8 @@ typedef struct RangeTblEntryIndex
Index rteIndex;
}RangeTblEntryIndex;
extern PlannerRestrictionContext * GetPlannerRestrictionContext(
RecursivePlanningContext *recursivePlanningContext);
extern PlannerRestrictionContext * GetPlannerRestrictionContext(RecursivePlanningContext *
recursivePlanningContext);
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext,

View File

@ -41,8 +41,8 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext *
plannerRestrictionContext);
extern RelationRestriction * RelationRestrictionForRelation(
RangeTblEntry *rangeTableEntry,
extern RelationRestriction * RelationRestrictionForRelation(RangeTblEntry *
rangeTableEntry,
PlannerRestrictionContext *
plannerRestrictionContext);
extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext *

View File

@ -20,8 +20,8 @@ extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelatio
Query *
modifyQueryViaCoordinatorOrRepartition,
char *resultIdPrefix);
extern List * GenerateTaskListWithRedistributedResults(
Query *modifyQueryViaCoordinatorOrRepartition,
extern List * GenerateTaskListWithRedistributedResults(Query *
modifyQueryViaCoordinatorOrRepartition,
CitusTableCacheEntry *
targetRelation,
List **redistributedResults,

View File

@ -171,7 +171,8 @@ IsNodeWideObjectClass(ObjectClass objectClass)
* this assertion check based on latest supported major Postgres version.
*/
StaticAssertStmt(PG_MAJORVERSION_NUM <= 18,
"better to check if any of newly added ObjectClass'es are node-wide");
"better to check if any of newly added ObjectClass'es are node-wide")
;
switch (objectClass)
{
@ -187,8 +188,10 @@ IsNodeWideObjectClass(ObjectClass objectClass)
}
default:
{
return false;
}
}
}

View File

@ -38,7 +38,8 @@ typedef struct SortShardIntervalContext
extern ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray, int
shardCount, Oid collation,
FmgrInfo *shardIntervalSortCompareFunction);
FmgrInfo *
shardIntervalSortCompareFunction);
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
SortShardIntervalContext *sortContext);
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);

View File

@ -34,9 +34,10 @@ extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHa
List *
shardGroupSplitIntervalListList,
List *workersForPlacementList);
extern HTAB * CreateShardSplitInfoMapForPublication(
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
extern HTAB * CreateShardSplitInfoMapForPublication(List *
sourceColocatedShardIntervalList,
List *
shardGroupSplitIntervalListList,
List *destinationWorkerNodesList);
/* Functions to drop publisher-subscriber resources */

View File

@ -74,7 +74,8 @@ void StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle);
/* Functions for creating and accessing shared memory segments consisting shard split information */
extern ShardSplitInfoSMHeader * CreateSharedMemoryForShardSplitInfo(int
shardSplitInfoCount,
dsm_handle *dsmHandle);
dsm_handle *
dsmHandle);
extern void ReleaseSharedMemoryOfShardSplitInfo(void);
extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void);

View File

@ -16,7 +16,8 @@
extern bool EnableBinaryProtocol;
extern DestReceiver * CreateShardCopyDestReceiver(EState *executorState,
List *destinationShardFullyQualifiedName,
List *
destinationShardFullyQualifiedName,
uint32_t destinationNodeId);
extern const char * CopyableColumnNamesFromRelationName(const char *schemaName, const

View File

@ -81,8 +81,10 @@ extern bool SendOptionalCommandListToWorkerOutsideTransactionWithConnection(
List *
commandList);
extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
char *nodeName,
int32 nodePort,
char *
nodeName,
int32
nodePort,
const char *
nodeUser,
List *
@ -100,16 +102,17 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandListToWorkerOutsideTransactionWithConnection(
MultiConnection *workerConnection,
extern void SendCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *
workerConnection,
List *commandList);
extern void SendCommandListToWorkerListWithBareConnections(List *workerConnections,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList,
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *
workerNodeList,
const char *
nodeUser,
List *commandList);
List *
commandList);
extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
/* helper functions for worker transactions */

View File

@ -364,8 +364,10 @@ getObjectClass(const ObjectAddress *object)
}
case TransformRelationId:
{
return OCLASS_TRANSFORM;
}
}
/* shouldn't get here */
elog(ERROR, "unrecognized object class: %u", object->classId);