Naisila Puka 2025-12-07 20:55:32 +00:00 committed by GitHub
commit 5b05d44a69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
109 changed files with 662 additions and 545 deletions

View File

@ -113,10 +113,10 @@ FROM base AS uncrustify-builder
RUN sudo apt update && sudo apt install -y cmake tree RUN sudo apt update && sudo apt install -y cmake tree
WORKDIR /uncrustify WORKDIR /uncrustify
RUN curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.68.1.tar.gz | tar xz RUN curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.82.0.tar.gz | tar xz
WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/ WORKDIR /uncrustify/uncrustify-uncrustify-0.82.0/
RUN mkdir build RUN mkdir build
WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/build/ WORKDIR /uncrustify/uncrustify-uncrustify-0.82.0/build/
RUN cmake .. RUN cmake ..
RUN MAKEFLAGS="-j $(nproc)" make -s RUN MAKEFLAGS="-j $(nproc)" make -s

View File

@ -32,7 +32,7 @@ jobs:
style_checker_image_name: "ghcr.io/citusdata/stylechecker" style_checker_image_name: "ghcr.io/citusdata/stylechecker"
style_checker_tools_version: "0.8.18" style_checker_tools_version: "0.8.18"
sql_snapshot_pg_version: "17.6" sql_snapshot_pg_version: "17.6"
image_suffix: "-va20872f" image_suffix: "-dev-7ba2db1"
pg15_version: '{ "major": "15", "full": "15.14" }' pg15_version: '{ "major": "15", "full": "15.14" }'
pg16_version: '{ "major": "16", "full": "16.10" }' pg16_version: '{ "major": "16", "full": "16.10" }'
pg17_version: '{ "major": "17", "full": "17.6" }' pg17_version: '{ "major": "17", "full": "17.6" }'

View File

@ -11,9 +11,9 @@ tool. This tool uses `uncrustify` under the hood.
```bash ```bash
# Uncrustify changes the way it formats code every release a bit. To make sure # Uncrustify changes the way it formats code every release a bit. To make sure
# everyone formats consistently we use version 0.68.1: # everyone formats consistently we use version 0.82.0:
curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.68.1.tar.gz | tar xz curl -L https://github.com/uncrustify/uncrustify/archive/uncrustify-0.82.0.tar.gz | tar xz
cd uncrustify-uncrustify-0.68.1/ cd uncrustify-uncrustify-0.82.0/
mkdir build mkdir build
cd build cd build
cmake .. cmake ..

View File

@ -72,9 +72,9 @@
#define COLUMNAR_RELOPTION_NAMESPACE "columnar" #define COLUMNAR_RELOPTION_NAMESPACE "columnar"
#define SLOW_METADATA_ACCESS_WARNING \ #define SLOW_METADATA_ACCESS_WARNING \
"Metadata index %s is not available, this might mean slower read/writes " \ "Metadata index %s is not available, this might mean slower read/writes " \
"on columnar tables. This is expected during Postgres upgrades and not " \ "on columnar tables. This is expected during Postgres upgrades and not " \
"expected otherwise." "expected otherwise."
typedef struct typedef struct
{ {
@ -1332,7 +1332,8 @@ ColumnarRelationId(Oid relid, RelFileLocator relfilelocator)
{ {
return OidIsValid(relid) ? relid : RelidByRelfilenumber(RelationTablespace_compat( return OidIsValid(relid) ? relid : RelidByRelfilenumber(RelationTablespace_compat(
relfilelocator), relfilelocator),
RelationPhysicalIdentifierNumber_compat( RelationPhysicalIdentifierNumber_compat
(
relfilelocator)); relfilelocator));
} }

View File

@ -41,8 +41,8 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#define UNEXPECTED_STRIPE_READ_ERR_MSG \ #define UNEXPECTED_STRIPE_READ_ERR_MSG \
"attempted to read an unexpected stripe while reading columnar " \ "attempted to read an unexpected stripe while reading columnar " \
"table %s, stripe with id=" UINT64_FORMAT " is not flushed" "table %s, stripe with id=" UINT64_FORMAT " is not flushed"
typedef struct ChunkGroupReadState typedef struct ChunkGroupReadState
{ {
@ -758,7 +758,9 @@ SnapshotMightSeeUnflushedStripes(Snapshot snapshot)
} }
default: default:
{
return false; return false;
}
} }
} }

View File

@ -547,7 +547,8 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation)
if (!ColumnarLogicalOffsetIsValid(newDataReservation)) if (!ColumnarLogicalOffsetIsValid(newDataReservation))
{ {
elog(ERROR, elog(ERROR,
"attempted to truncate relation %d to invalid logical offset: " UINT64_FORMAT, "attempted to truncate relation %d to invalid logical offset: " UINT64_FORMAT
,
rel->rd_id, newDataReservation); rel->rd_id, newDataReservation);
} }

View File

@ -2410,9 +2410,10 @@ ColumnarProcessUtility(PlannedStmt *pstmt,
} }
default: default:
{
/* FALL THROUGH */ /* FALL THROUGH */
break; break;
}
} }
if (columnarOptions != NIL && columnarRangeVar == NULL) if (columnarOptions != NIL && columnarRangeVar == NULL)

View File

@ -44,17 +44,17 @@
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#define SAVE_AND_PERSIST(c) \ #define SAVE_AND_PERSIST(c) \
do { \ do { \
Oid savedUserId = InvalidOid; \ Oid savedUserId = InvalidOid; \
int savedSecurityContext = 0; \ int savedSecurityContext = 0; \
LogicalClockShmem->clusterClockValue = *(c); \ LogicalClockShmem->clusterClockValue = *(c); \
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); \ GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); \
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); \ SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); \
DirectFunctionCall2(setval_oid, \ DirectFunctionCall2(setval_oid, \
ObjectIdGetDatum(DistClockLogicalSequenceId()), \ ObjectIdGetDatum(DistClockLogicalSequenceId()), \
Int64GetDatum((c)->logical)); \ Int64GetDatum((c)->logical)); \
SetUserIdAndSecContext(savedUserId, savedSecurityContext); \ SetUserIdAndSecContext(savedUserId, savedSecurityContext); \
} while (0) } while (0)
PG_FUNCTION_INFO_V1(citus_get_node_clock); PG_FUNCTION_INFO_V1(citus_get_node_clock);
PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote); PG_FUNCTION_INFO_V1(citus_internal_adjust_local_clock_to_remote);

View File

@ -77,9 +77,9 @@
#define ALTER_TABLE_SET_ACCESS_METHOD 'm' #define ALTER_TABLE_SET_ACCESS_METHOD 'm'
#define UNDISTRIBUTE_TABLE_CASCADE_HINT \ #define UNDISTRIBUTE_TABLE_CASCADE_HINT \
"Use cascade option to undistribute all the relations involved in " \ "Use cascade option to undistribute all the relations involved in " \
"a foreign key relationship with %s by executing SELECT " \ "a foreign key relationship with %s by executing SELECT " \
"undistribute_table($$%s$$, cascade_via_foreign_keys=>true)" "undistribute_table($$%s$$, cascade_via_foreign_keys=>true)"
typedef TableConversionReturn *(*TableConversionFunction)(struct typedef TableConversionReturn *(*TableConversionFunction)(struct
@ -185,8 +185,8 @@ typedef struct TableConversionState
static TableConversionReturn * AlterDistributedTable(TableConversionParameters *params); static TableConversionReturn * AlterDistributedTable(TableConversionParameters *params);
static TableConversionReturn * AlterTableSetAccessMethod( static TableConversionReturn * AlterTableSetAccessMethod(TableConversionParameters *params
TableConversionParameters *params); );
static TableConversionReturn * ConvertTable(TableConversionState *con); static TableConversionReturn * ConvertTable(TableConversionState *con);
static TableConversionReturn * ConvertTableInternal(TableConversionState *con); static TableConversionReturn * ConvertTableInternal(TableConversionState *con);
static bool SwitchToSequentialAndLocalExecutionIfShardNameTooLong(char *relationName, static bool SwitchToSequentialAndLocalExecutionIfShardNameTooLong(char *relationName,
@ -216,7 +216,8 @@ static void ErrorIfMatViewSizeExceedsTheLimit(Oid matViewOid);
static char * CreateMaterializedViewDDLCommand(Oid matViewOid); static char * CreateMaterializedViewDDLCommand(Oid matViewOid);
static char * GetAccessMethodForMatViewIfExists(Oid viewOid); static char * GetAccessMethodForMatViewIfExists(Oid viewOid);
static bool WillRecreateForeignKeyToReferenceTable(Oid relationId, static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
CascadeToColocatedOption cascadeOption); CascadeToColocatedOption cascadeOption)
;
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId); static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ErrorIfUnsupportedCascadeObjects(Oid relationId); static void ErrorIfUnsupportedCascadeObjects(Oid relationId);
static List * WrapTableDDLCommands(List *commandStrings); static List * WrapTableDDLCommands(List *commandStrings);
@ -506,7 +507,8 @@ UndistributeTable(TableConversionParameters *params)
IsCitusTableType(params->relationId, SINGLE_SHARD_DISTRIBUTED)) IsCitusTableType(params->relationId, SINGLE_SHARD_DISTRIBUTED))
{ {
EnsureUndistributeTenantTableSafe(params->relationId, EnsureUndistributeTenantTableSafe(params->relationId,
TenantOperationNames[TENANT_UNDISTRIBUTE_TABLE]); TenantOperationNames[TENANT_UNDISTRIBUTE_TABLE])
;
} }
if (!params->cascadeViaForeignKeys) if (!params->cascadeViaForeignKeys)

View File

@ -64,8 +64,8 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
return NIL; return NIL;
} }
if (ops->qualify && DistOpsValidityState(stmt, ops) == if (ops->qualify && DistOpsValidityState(stmt, ops) == ShouldQualifyAfterLocalCreation
ShouldQualifyAfterLocalCreation) )
{ {
/* qualify the statement after local creation */ /* qualify the statement after local creation */
ops->qualify(stmt); ops->qualify(stmt);

View File

@ -703,7 +703,8 @@ EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
* the relation because the distribution column can be modified in that period. * the relation because the distribution column can be modified in that period.
*/ */
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId, Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId,
distributionColumnName); distributionColumnName)
;
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false); Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false);
@ -1108,8 +1109,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams) DistributedTableParams *distributedTableParams)
{ {
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED || if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) != tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) != (
(distributedTableParams != NULL)) distributedTableParams != NULL))
{ {
ereport(ERROR, (errmsg("distributed table params must be provided " ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must " "when creating a distributed table and must "

View File

@ -2165,7 +2165,9 @@ GetDistributeObjectOps(Node *node)
} }
default: default:
{
return &Any_SecLabel; return &Any_SecLabel;
}
} }
} }

View File

@ -47,13 +47,13 @@
#define BehaviorIsRestrictOrNoAction(x) \ #define BehaviorIsRestrictOrNoAction(x) \
((x) == FKCONSTR_ACTION_NOACTION || (x) == FKCONSTR_ACTION_RESTRICT) ((x) == FKCONSTR_ACTION_NOACTION || (x) == FKCONSTR_ACTION_RESTRICT)
#define USE_CREATE_REFERENCE_TABLE_HINT \ #define USE_CREATE_REFERENCE_TABLE_HINT \
"You could use SELECT create_reference_table('%s') " \ "You could use SELECT create_reference_table('%s') " \
"to replicate the referenced table to all nodes or " \ "to replicate the referenced table to all nodes or " \
"consider dropping the foreign key" "consider dropping the foreign key"
typedef bool (*CheckRelationFunc)(Oid); typedef bool (*CheckRelationFunc)(Oid);

View File

@ -71,7 +71,7 @@
#define DISABLE_LOCAL_CHECK_FUNCTION_BODIES "SET LOCAL check_function_bodies TO off;" #define DISABLE_LOCAL_CHECK_FUNCTION_BODIES "SET LOCAL check_function_bodies TO off;"
#define RESET_CHECK_FUNCTION_BODIES "RESET check_function_bodies;" #define RESET_CHECK_FUNCTION_BODIES "RESET check_function_bodies;"
#define argumentStartsWith(arg, prefix) \ #define argumentStartsWith(arg, prefix) \
(strncmp(arg, prefix, strlen(prefix)) == 0) (strncmp(arg, prefix, strlen(prefix)) == 0)
/* forward declaration for helper functions*/ /* forward declaration for helper functions*/
static bool RecreateSameNonColocatedFunction(ObjectAddress functionAddress, static bool RecreateSameNonColocatedFunction(ObjectAddress functionAddress,
@ -108,7 +108,8 @@ static void DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid
const ObjectAddress * const ObjectAddress *
functionAddress); functionAddress);
static void DistributeFunctionColocatedWithSingleShardTable(const static void DistributeFunctionColocatedWithSingleShardTable(const
ObjectAddress *functionAddress, ObjectAddress *functionAddress
,
text *colocateWithText); text *colocateWithText);
static void DistributeFunctionColocatedWithReferenceTable(const static void DistributeFunctionColocatedWithReferenceTable(const
ObjectAddress *functionAddress); ObjectAddress *functionAddress);
@ -1896,7 +1897,9 @@ ShouldAddFunctionSignature(FunctionParameterMode mode)
} }
default: default:
{
return true; return true;
}
} }
} }

View File

@ -227,8 +227,8 @@ CollectGrantTableIdList(GrantStmt *grantStmt)
bool grantOnTableCommand = (grantStmt->targtype == ACL_TARGET_OBJECT && bool grantOnTableCommand = (grantStmt->targtype == ACL_TARGET_OBJECT &&
grantStmt->objtype == OBJECT_TABLE); grantStmt->objtype == OBJECT_TABLE);
bool grantAllTablesOnSchemaCommand = (grantStmt->targtype == bool grantAllTablesOnSchemaCommand = (grantStmt->targtype == ACL_TARGET_ALL_IN_SCHEMA
ACL_TARGET_ALL_IN_SCHEMA && &&
grantStmt->objtype == OBJECT_TABLE); grantStmt->objtype == OBJECT_TABLE);
/* we are only interested in table level grants */ /* we are only interested in table level grants */

View File

@ -64,8 +64,8 @@ static int GetNumberOfIndexParameters(IndexStmt *createIndexStatement);
static bool IndexAlreadyExists(IndexStmt *createIndexStatement); static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement); static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement); static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
static void SwitchToSequentialAndLocalExecutionIfIndexNameTooLong( static void SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(IndexStmt *
IndexStmt *createIndexStatement); createIndexStatement);
static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement); static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement);
static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement); static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement);
static List * GenerateIndexParameters(IndexStmt *createIndexStatement); static List * GenerateIndexParameters(IndexStmt *createIndexStatement);

View File

@ -1272,7 +1272,9 @@ ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result)
} }
default: default:
{
Assert(false); /* there are no other options for this enum */ Assert(false); /* there are no other options for this enum */
}
} }
} }
@ -2468,7 +2470,7 @@ ProcessAppendToShardOption(Oid relationId, CopyStmt *copyStatement)
if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED)) if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED))
{ {
ereport(ERROR, (errmsg(APPEND_TO_SHARD_OPTION " is only valid for " ereport(ERROR, (errmsg(APPEND_TO_SHARD_OPTION " is only valid for "
"append-distributed tables"))); "append-distributed tables")));
} }
/* throws an error if shard does not exist */ /* throws an error if shard does not exist */
@ -2859,7 +2861,8 @@ ErrorIfCopyHasOnErrorLogVerbosity(CopyStmt *copyStatement)
if (strcmp(option->defname, "on_error") == 0) if (strcmp(option->defname, "on_error") == 0)
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Citus does not support COPY FROM with ON_ERROR option."))); "Citus does not support COPY FROM with ON_ERROR option."))
);
} }
else if (strcmp(option->defname, "log_verbosity") == 0) else if (strcmp(option->defname, "log_verbosity") == 0)
{ {
@ -2877,7 +2880,8 @@ ErrorIfCopyHasOnErrorLogVerbosity(CopyStmt *copyStatement)
if (log_verbosity) if (log_verbosity)
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Citus does not support COPY FROM with LOG_VERBOSITY option."))); "Citus does not support COPY FROM with LOG_VERBOSITY option.")
));
} }
#endif #endif
} }

View File

@ -35,13 +35,13 @@
#define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \ #define EXECUTE_COMMAND_ON_REMOTE_NODES_AS_USER \
"SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)" "SELECT citus_internal.execute_command_on_remote_nodes_as_user(%s, %s)"
#define START_MANAGEMENT_TRANSACTION \ #define START_MANAGEMENT_TRANSACTION \
"SELECT citus_internal.start_management_transaction('%lu')" "SELECT citus_internal.start_management_transaction('%lu')"
#define MARK_OBJECT_DISTRIBUTED \ #define MARK_OBJECT_DISTRIBUTED \
"SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)" "SELECT citus_internal.mark_object_distributed(%d, %s, %d, %s)"
#define UNMARK_OBJECT_DISTRIBUTED \ #define UNMARK_OBJECT_DISTRIBUTED \
"SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)" "SELECT pg_catalog.citus_unmark_object_distributed(%d, %d, %d, %s)"
/* /*

View File

@ -149,13 +149,14 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
} }
default: default:
{
/* /*
* Nodes that are not supported by Citus: we pass-through to the * Nodes that are not supported by Citus: we pass-through to the
* main PostgreSQL executor. Any Citus-supported RenameStmt * main PostgreSQL executor. Any Citus-supported RenameStmt
* renameType must appear above in the switch, explicitly. * renameType must appear above in the switch, explicitly.
*/ */
return NIL; return NIL;
}
} }
bool isCitusRelation = IsCitusTable(tableRelationId); bool isCitusRelation = IsCitusTable(tableRelationId);

View File

@ -463,7 +463,8 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
DEPENDENCY_INTERNAL)) DEPENDENCY_INTERNAL))
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Altering a distributed sequence is currently not supported."))); "Altering a distributed sequence is currently not supported.")
));
} }
/* /*
@ -991,8 +992,8 @@ FilterDistributedSequences(GrantStmt *stmt)
{ {
bool grantOnSequenceCommand = (stmt->targtype == ACL_TARGET_OBJECT && bool grantOnSequenceCommand = (stmt->targtype == ACL_TARGET_OBJECT &&
stmt->objtype == OBJECT_SEQUENCE); stmt->objtype == OBJECT_SEQUENCE);
bool grantOnAllSequencesInSchemaCommand = (stmt->targtype == bool grantOnAllSequencesInSchemaCommand = (stmt->targtype == ACL_TARGET_ALL_IN_SCHEMA
ACL_TARGET_ALL_IN_SCHEMA && &&
stmt->objtype == OBJECT_SEQUENCE); stmt->objtype == OBJECT_SEQUENCE);
/* we are only interested in sequence level grants */ /* we are only interested in sequence level grants */
@ -1036,7 +1037,8 @@ FilterDistributedSequences(GrantStmt *stmt)
RangeVar *distributedSequence = makeRangeVar(get_namespace_name( RangeVar *distributedSequence = makeRangeVar(get_namespace_name(
namespaceOid), namespaceOid),
get_rel_name( get_rel_name(
sequenceAddress->objectId), sequenceAddress->objectId
),
-1); -1);
grantSequenceList = lappend(grantSequenceList, distributedSequence); grantSequenceList = lappend(grantSequenceList, distributedSequence);
} }

View File

@ -237,7 +237,9 @@ AcquireCitusAdvisoryObjectClassLockGetOid(ObjectClass objectClass,
} }
default: default:
{
elog(ERROR, "unsupported object class: %d", objectClass); elog(ERROR, "unsupported object class: %d", objectClass);
}
} }
} }
@ -270,6 +272,8 @@ AcquireCitusAdvisoryObjectClassLockCheckPrivileges(ObjectClass objectClass, Oid
} }
default: default:
{
elog(ERROR, "unsupported object class: %d", objectClass); elog(ERROR, "unsupported object class: %d", objectClass);
}
} }
} }

View File

@ -50,7 +50,7 @@
#define DEFAULT_STATISTICS_TARGET -1 #define DEFAULT_STATISTICS_TARGET -1
#define ALTER_INDEX_COLUMN_SET_STATS_COMMAND \ #define ALTER_INDEX_COLUMN_SET_STATS_COMMAND \
"ALTER INDEX %s ALTER COLUMN %d SET STATISTICS %d" "ALTER INDEX %s ALTER COLUMN %d SET STATISTICS %d"
static char * GenerateAlterIndexColumnSetStatsCommand(char *indexNameWithSchema, static char * GenerateAlterIndexColumnSetStatsCommand(char *indexNameWithSchema,
int16 attnum, int16 attnum,

View File

@ -81,23 +81,23 @@ static void ErrorIfAttachCitusTableToPgLocalTable(Oid parentRelationId,
Oid partitionRelationId); Oid partitionRelationId);
static bool DeparserSupportsAlterTableAddColumn(AlterTableStmt *alterTableStatement, static bool DeparserSupportsAlterTableAddColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *addColumnSubCommand); AlterTableCmd *addColumnSubCommand);
static bool ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef( static bool ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(AlterTableStmt *
AlterTableStmt *alterTableStatement); alterTableStatement);
static bool ShouldMarkConnectedRelationsNotAutoConverted(Oid leftRelationId, static bool ShouldMarkConnectedRelationsNotAutoConverted(Oid leftRelationId,
Oid rightRelationId); Oid rightRelationId);
static bool RelationIdListContainsCitusTableType(List *relationIdList, static bool RelationIdListContainsCitusTableType(List *relationIdList,
CitusTableType citusTableType); CitusTableType citusTableType);
static bool RelationIdListContainsPostgresTable(List *relationIdList); static bool RelationIdListContainsPostgresTable(List *relationIdList);
static void ConvertPostgresLocalTablesToCitusLocalTables( static void ConvertPostgresLocalTablesToCitusLocalTables(AlterTableStmt *
AlterTableStmt *alterTableStatement); alterTableStatement);
static bool RangeVarListHasLocalRelationConvertedByUser(List *relationRangeVarList, static bool RangeVarListHasLocalRelationConvertedByUser(List *relationRangeVarList,
AlterTableStmt * AlterTableStmt *
alterTableStatement); alterTableStatement);
static int CompareRangeVarsByOid(const void *leftElement, const void *rightElement); static int CompareRangeVarsByOid(const void *leftElement, const void *rightElement);
static List * GetAlterTableAddFKeyRightRelationIdList( static List * GetAlterTableAddFKeyRightRelationIdList(AlterTableStmt *alterTableStatement)
AlterTableStmt *alterTableStatement); ;
static List * GetAlterTableAddFKeyRightRelationRangeVarList( static List * GetAlterTableAddFKeyRightRelationRangeVarList(AlterTableStmt *
AlterTableStmt *alterTableStatement); alterTableStatement);
static List * GetAlterTableAddFKeyConstraintList(AlterTableStmt *alterTableStatement); static List * GetAlterTableAddFKeyConstraintList(AlterTableStmt *alterTableStatement);
static List * GetAlterTableCommandFKeyConstraintList(AlterTableCmd *command); static List * GetAlterTableCommandFKeyConstraintList(AlterTableCmd *command);
static List * GetRangeVarListFromFKeyConstraintList(List *fKeyConstraintList); static List * GetRangeVarListFromFKeyConstraintList(List *fKeyConstraintList);
@ -2181,7 +2181,9 @@ AlterTableCommandTypeIsTrigger(AlterTableType alterTableType)
} }
default: default:
{
return false; return false;
}
} }
} }
@ -3426,7 +3428,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
columnConstraints->length > 1) columnConstraints->length > 1)
{ {
ereport(ERROR, (errcode( ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED), ERRCODE_FEATURE_NOT_SUPPORTED)
,
errmsg( errmsg(
"cannot execute ADD COLUMN .. DEFAULT nextval('..')" "cannot execute ADD COLUMN .. DEFAULT nextval('..')"
" command with other subcommands/constraints"), " command with other subcommands/constraints"),
@ -3441,7 +3444,8 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
if (!TableEmpty(relationId)) if (!TableEmpty(relationId))
{ {
ereport(ERROR, (errcode( ereport(ERROR, (errcode(
ERRCODE_FEATURE_NOT_SUPPORTED), ERRCODE_FEATURE_NOT_SUPPORTED)
,
errmsg( errmsg(
"cannot add a column involving DEFAULT nextval('..') " "cannot add a column involving DEFAULT nextval('..') "
"because the table is not empty"), "because the table is not empty"),

View File

@ -1297,7 +1297,8 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
"partial failure, potentially leading to an inconsistent " "partial failure, potentially leading to an inconsistent "
"state.\nIf the problematic command is a CREATE operation, " "state.\nIf the problematic command is a CREATE operation, "
"consider using the 'IF EXISTS' syntax to drop the object," "consider using the 'IF EXISTS' syntax to drop the object,"
"\nif applicable, and then re-attempt the original command."))); "\nif applicable, and then re-attempt the original command.")
));
} }
PG_RE_THROW(); PG_RE_THROW();

View File

@ -475,8 +475,8 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
if (flags & OUTSIDE_TRANSACTION) if (flags & OUTSIDE_TRANSACTION)
{ {
/* don't return connections that are used in transactions */ /* don't return connections that are used in transactions */
if (connection->remoteTransaction.transactionState != if (connection->remoteTransaction.transactionState != REMOTE_TRANS_NOT_STARTED
REMOTE_TRANS_NOT_STARTED) )
{ {
continue; continue;
} }

View File

@ -191,8 +191,8 @@ static HTAB *ConnectionShardHash;
static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList, static MultiConnection * FindPlacementListConnection(int flags, List *placementAccessList,
const char *userName); const char *userName);
static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry( static ConnectionPlacementHashEntry * FindOrCreatePlacementEntry(ShardPlacement *placement
ShardPlacement *placement); );
static bool CanUseExistingConnection(uint32 flags, const char *userName, static bool CanUseExistingConnection(uint32 flags, const char *userName,
ConnectionReference *placementConnection); ConnectionReference *placementConnection);
static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection, static bool ConnectionAccessedDifferentPlacement(MultiConnection *connection,

View File

@ -676,7 +676,8 @@ SharedConnectionStatsShmemInit(void)
ConnectionStatsSharedState->sharedConnectionHashTrancheName = ConnectionStatsSharedState->sharedConnectionHashTrancheName =
"Shared Connection Tracking Hash Tranche"; "Shared Connection Tracking Hash Tranche";
LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId, LWLockRegisterTranche(ConnectionStatsSharedState->sharedConnectionHashTrancheId,
ConnectionStatsSharedState->sharedConnectionHashTrancheName); ConnectionStatsSharedState->sharedConnectionHashTrancheName)
;
LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock, LWLockInitialize(&ConnectionStatsSharedState->sharedConnectionHashLock,
ConnectionStatsSharedState->sharedConnectionHashTrancheId); ConnectionStatsSharedState->sharedConnectionHashTrancheId);

View File

@ -28,7 +28,8 @@ static void AppendCreateExtensionStmtOptions(StringInfo buf, List *options);
static void AppendDropExtensionStmt(StringInfo buf, DropStmt *stmt); static void AppendDropExtensionStmt(StringInfo buf, DropStmt *stmt);
static void AppendExtensionNameList(StringInfo buf, List *objects); static void AppendExtensionNameList(StringInfo buf, List *objects);
static void AppendAlterExtensionSchemaStmt(StringInfo buf, static void AppendAlterExtensionSchemaStmt(StringInfo buf,
AlterObjectSchemaStmt *alterExtensionSchemaStmt); AlterObjectSchemaStmt *alterExtensionSchemaStmt
);
static void AppendAlterExtensionStmt(StringInfo buf, static void AppendAlterExtensionStmt(StringInfo buf,
AlterExtensionStmt *alterExtensionStmt); AlterExtensionStmt *alterExtensionStmt);

View File

@ -290,7 +290,9 @@ GetDefElemActionString(DefElemAction action)
} }
default: default:
{
return ""; return "";
}
} }
} }

View File

@ -118,8 +118,10 @@ ObjectTypeToKeyword(ObjectType objtype)
} }
default: default:
{
elog(ERROR, "Unknown object type: %d", objtype); elog(ERROR, "Unknown object type: %d", objtype);
return NULL; return NULL;
}
} }
} }

View File

@ -243,7 +243,8 @@ AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg( errmsg(
"only simple column references are allowed in CREATE STATISTICS"))); "only simple column references are allowed in CREATE STATISTICS")
));
} }
const char *columnName = quote_identifier(column->name); const char *columnName = quote_identifier(column->name);

View File

@ -536,8 +536,10 @@ GeneratedWhenStr(char generatedWhen)
} }
default: default:
{
ereport(ERROR, (errmsg("unrecognized generated_when: %d", ereport(ERROR, (errmsg("unrecognized generated_when: %d",
generatedWhen))); generatedWhen)));
}
} }
} }

View File

@ -646,7 +646,8 @@ static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLev
modLevel, modLevel,
List *taskList, List *taskList,
bool bool
exludeFromTransaction); exludeFromTransaction)
;
static void StartDistributedExecution(DistributedExecution *execution); static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution);
@ -711,8 +712,8 @@ static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
shardCommandExecution); shardCommandExecution);
static int GetEventSetSize(List *sessionList); static int GetEventSetSize(List *sessionList);
static bool ProcessSessionsWithFailedWaitEventSetOperations( static bool ProcessSessionsWithFailedWaitEventSetOperations(DistributedExecution *
DistributedExecution *execution); execution);
static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution); static bool HasIncompleteConnectionEstablishment(DistributedExecution *execution);
static void RebuildWaitEventSet(DistributedExecution *execution); static void RebuildWaitEventSet(DistributedExecution *execution);
static void RebuildWaitEventSetForSessions(DistributedExecution *execution); static void RebuildWaitEventSetForSessions(DistributedExecution *execution);
@ -2635,8 +2636,10 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
/* open a new connection to the worker */ /* open a new connection to the worker */
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
workerPool->nodeName, workerPool->nodeName
workerPool->nodePort, ,
workerPool->nodePort
,
NULL, NULL); NULL, NULL);
if (!connection) if (!connection)
{ {

View File

@ -67,8 +67,8 @@ static void CitusPreExecScan(CitusScanState *scanState);
static bool ModifyJobNeedsEvaluation(Job *workerJob); static bool ModifyJobNeedsEvaluation(Job *workerJob);
static void RegenerateTaskForFasthPathQuery(Job *workerJob); static void RegenerateTaskForFasthPathQuery(Job *workerJob);
static void RegenerateTaskListForInsert(Job *workerJob); static void RegenerateTaskListForInsert(Job *workerJob);
static DistributedPlan * CopyDistributedPlanWithoutCache( static DistributedPlan * CopyDistributedPlanWithoutCache(DistributedPlan *
DistributedPlan *originalDistributedPlan); originalDistributedPlan);
static void CitusEndScan(CustomScanState *node); static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node); static void CitusReScan(CustomScanState *node);
static void EnsureForceDelegationDistributionKey(Job *job); static void EnsureForceDelegationDistributionKey(Job *job);

View File

@ -69,8 +69,8 @@ static List * WrapTasksForPartitioning(const char *resultIdPrefix,
bool binaryFormat); bool binaryFormat);
static List * ExecutePartitionTaskList(List *partitionTaskList, static List * ExecutePartitionTaskList(List *partitionTaskList,
CitusTableCacheEntry *targetRelation); CitusTableCacheEntry *targetRelation);
static PartitioningTupleDest * CreatePartitioningTupleDest( static PartitioningTupleDest * CreatePartitioningTupleDest(CitusTableCacheEntry *
CitusTableCacheEntry *targetRelation); targetRelation);
static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task, static void PartitioningTupleDestPutTuple(TupleDestination *self, Task *task,
int placementIndex, int queryNumber, int placementIndex, int queryNumber,
HeapTuple heapTuple, uint64 tupleLibpqSize); HeapTuple heapTuple, uint64 tupleLibpqSize);

View File

@ -66,7 +66,8 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
List *insertTargetList, List *insertTargetList,
PlannedStmt *selectPlan, PlannedStmt *selectPlan,
EState *executorState, EState *executorState,
char *intermediateResultIdPrefix); char *intermediateResultIdPrefix
);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);

View File

@ -824,7 +824,7 @@ RecordNonDistTableAccessesForTask(Task *task)
* if we're wrong. * if we're wrong.
*/ */
ereport(ERROR, (errmsg("shard " UINT64_FORMAT " does not have any shard " ereport(ERROR, (errmsg("shard " UINT64_FORMAT " does not have any shard "
"placements", "placements",
task->anchorShardId))); task->anchorShardId)));
} }

View File

@ -38,11 +38,13 @@ static HTAB * ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetR
sourceTargetList, sourceTargetList,
PlannedStmt * PlannedStmt *
sourcePlan, sourcePlan,
EState *executorState, EState *executorState
,
char * char *
intermediateResultIdPrefix, intermediateResultIdPrefix,
int int
partitionColumnIndex); partitionColumnIndex)
;
/* /*

View File

@ -180,9 +180,10 @@ static bool FollowExtAndInternalDependencies(ObjectAddressCollector *collector,
DependencyDefinition *definition); DependencyDefinition *definition);
static void ApplyAddToDependencyList(ObjectAddressCollector *collector, static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
DependencyDefinition *definition); DependencyDefinition *definition);
static void ApplyAddCitusDependedObjectsToDependencyList( static void ApplyAddCitusDependedObjectsToDependencyList(ObjectAddressCollector *collector
ObjectAddressCollector *collector, ,
DependencyDefinition *definition); DependencyDefinition *definition)
;
static List * GetViewRuleReferenceDependencyList(Oid relationId); static List * GetViewRuleReferenceDependencyList(Oid relationId);
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector, static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
ObjectAddress target); ObjectAddress target);

View File

@ -338,8 +338,8 @@ ShouldMarkRelationDistributed(Oid relationId)
bool ownedByExtension = IsTableOwnedByExtension(relationId); bool ownedByExtension = IsTableOwnedByExtension(relationId);
bool alreadyDistributed = IsObjectDistributed(relationAddress); bool alreadyDistributed = IsObjectDistributed(relationAddress);
bool hasUnsupportedDependency = bool hasUnsupportedDependency =
DeferErrorIfAnyObjectHasUnsupportedDependency(list_make1(relationAddress)) != DeferErrorIfAnyObjectHasUnsupportedDependency(list_make1(relationAddress)) != NULL
NULL; ;
bool hasCircularDependency = bool hasCircularDependency =
DeferErrorIfCircularDependencyExists(relationAddress) != NULL; DeferErrorIfCircularDependencyExists(relationAddress) != NULL;

View File

@ -127,11 +127,11 @@ static bool SetFieldText(int attno, Datum values[], bool isnull[], bool replace[
static bool SetFieldNull(int attno, Datum values[], bool isnull[], bool replace[]); static bool SetFieldNull(int attno, Datum values[], bool isnull[], bool replace[]);
#define InitFieldValue(attno, values, isnull, initValue) \ #define InitFieldValue(attno, values, isnull, initValue) \
(void) SetFieldValue((attno), (values), (isnull), NULL, (initValue)) (void) SetFieldValue((attno), (values), (isnull), NULL, (initValue))
#define InitFieldText(attno, values, isnull, initValue) \ #define InitFieldText(attno, values, isnull, initValue) \
(void) SetFieldText((attno), (values), (isnull), NULL, (initValue)) (void) SetFieldText((attno), (values), (isnull), NULL, (initValue))
#define InitFieldNull(attno, values, isnull) \ #define InitFieldNull(attno, values, isnull) \
(void) SetFieldNull((attno), (values), (isnull), NULL) (void) SetFieldNull((attno), (values), (isnull), NULL)
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_local_disk_space_stats); PG_FUNCTION_INFO_V1(citus_local_disk_space_stats);
@ -812,7 +812,6 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{ {
partitionedShardNames = lappend(partitionedShardNames, quotedShardName); partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
} }
/* for non-partitioned tables, we will use Postgres' size functions */ /* for non-partitioned tables, we will use Postgres' size functions */
else else
{ {
@ -823,7 +822,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
/* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */ /* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */
char *subqueryForPartitionedShards = char *subqueryForPartitionedShards =
GenerateSizeQueryForRelationNameList(partitionedShardNames, GenerateSizeQueryForRelationNameList(partitionedShardNames,
GetWorkerPartitionedSizeUDFNameBySizeQueryType( GetWorkerPartitionedSizeUDFNameBySizeQueryType
(
sizeQueryType)); sizeQueryType));
/* SELECT SUM(pg_..._size) FROM VALUES (...) */ /* SELECT SUM(pg_..._size) FROM VALUES (...) */
@ -4267,7 +4267,8 @@ CancelTasksForJob(int64 jobid)
const bool indexOK = true; const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTaskJobIdTaskIdIndexId(), DistBackgroundTaskJobIdTaskIdIndexId()
,
indexOK, NULL, indexOK, NULL,
lengthof(scanKey), scanKey); lengthof(scanKey), scanKey);

View File

@ -76,7 +76,8 @@ static List * DropTaskList(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList); List *deletableShardIntervalList);
static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement,
const char *shardRelationName, const char *shardRelationName,
const char *dropShardPlacementCommand); const char *dropShardPlacementCommand
);
static char * CreateDropShardPlacementCommand(const char *schemaName, static char * CreateDropShardPlacementCommand(const char *schemaName,
const char *shardRelationName, const char *shardRelationName,
char storageType); char storageType);

View File

@ -78,7 +78,8 @@ static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_
indexForm, indexForm,
List ** List **
indexDDLEventList, indexDDLEventList,
int indexFlags); int indexFlags)
;
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName, static char * CitusCreateAlterColumnarTableSet(char *qualifiedRelationName,

View File

@ -939,8 +939,8 @@ TryDropDatabaseOutsideTransaction(char *databaseName, char *nodeName, int nodePo
* because we don't want to open a transaction block on remote nodes as DROP * because we don't want to open a transaction block on remote nodes as DROP
* DATABASE commands cannot be run inside a transaction block. * DATABASE commands cannot be run inside a transaction block.
*/ */
if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != if (ExecuteOptionalRemoteCommand(connection, commandString, NULL) != RESPONSE_OKAY
RESPONSE_OKAY) )
{ {
executeCommand = false; executeCommand = false;
break; break;

View File

@ -131,17 +131,19 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList,
uint32 colocationId); uint32 colocationId);
static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreatePartitioningHierarchyForBlockingSplit( static void CreatePartitioningHierarchyForBlockingSplit(List *
List *shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
static StringInfo CreateSplitShardReplicationSetupUDF( static StringInfo CreateSplitShardReplicationSetupUDF(List *
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, sourceColocatedShardIntervalList,
List *destinationWorkerNodesList, List *
DistributionColumnMap * shardGroupSplitIntervalListList,
distributionColumnOverrides); List *destinationWorkerNodesList,
DistributionColumnMap *
distributionColumnOverrides);
static List * ParseReplicationSlotInfoFromResult(PGresult *result); static List * ParseReplicationSlotInfoFromResult(PGresult *result);
static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
@ -883,7 +885,8 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, splitCopyTaskList,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
NULL /* jobIdList (ignored by API implementation) */); NULL /* jobIdList (ignored by API implementation) */
);
} }
@ -1881,7 +1884,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg( errmsg(
"Failed to run worker_split_shard_replication_setup UDF. It should successfully execute " "Failed to run worker_split_shard_replication_setup UDF. It should successfully execute "
" for splitting a shard in a non-blocking way. Please retry."))); " for splitting a shard in a non-blocking way. Please retry.")
));
} }
/* Get replication slot information */ /* Get replication slot information */

View File

@ -471,8 +471,8 @@ WriteLocalTuple(TupleTableSlot *slot, ShardCopyDestReceiver *copyDest)
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
bool isBinaryCopy = localCopyOutState->binary; bool isBinaryCopy = localCopyOutState->binary;
bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == bool shouldAddBinaryHeaders = (isBinaryCopy && localCopyOutState->fe_msgbuf->len == 0)
0); ;
if (shouldAddBinaryHeaders) if (shouldAddBinaryHeaders)
{ {
AppendCopyBinaryHeaders(localCopyOutState); AppendCopyBinaryHeaders(localCopyOutState);

View File

@ -72,7 +72,8 @@ worker_split_copy(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg( errmsg(
"pg_catalog.split_copy_info array cannot contain null values"))); "pg_catalog.split_copy_info array cannot contain null values")
));
} }
const int slice_ndim = 0; const int slice_ndim = 0;

View File

@ -85,8 +85,8 @@ int PlannerLevel = 0;
static bool ListContainsDistributedTableRTE(List *rangeTableList, static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable); bool *maybeHasForeignDistributedTable);
static PlannedStmt * CreateDistributedPlannedStmt( static PlannedStmt * CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
DistributedPlanningContext *planContext); ;
static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, static PlannedStmt * InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
DistributedPlanningContext DistributedPlanningContext
*planContext); *planContext);
@ -125,14 +125,14 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
Const *resultFormatConst); Const *resultFormatConst);
static List * OuterPlanParamsList(PlannerInfo *root); static List * OuterPlanParamsList(PlannerInfo *root);
static List * CopyPlanParamList(List *originalPlanParamList); static List * CopyPlanParamList(List *originalPlanParamList);
static void CreateAndPushPlannerRestrictionContext( static void CreateAndPushPlannerRestrictionContext(DistributedPlanningContext *planContext
DistributedPlanningContext *planContext, ,
FastPathRestrictionContext * FastPathRestrictionContext *
fastPathContext); fastPathContext);
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void); static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
static void PopPlannerRestrictionContext(void); static void PopPlannerRestrictionContext(void);
static void ResetPlannerRestrictionContext( static void ResetPlannerRestrictionContext(PlannerRestrictionContext *
PlannerRestrictionContext *plannerRestrictionContext); plannerRestrictionContext);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext); static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
int rteIdCounter); int rteIdCounter);

View File

@ -831,7 +831,8 @@ IsShardKeyValueAllowed(Const *shardKey, uint32 colocationId)
ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d", ereport(DEBUG4, errmsg("Comparing saved:%s with Shard key: %s colocationid:%d:%d",
pretty_format_node_dump( pretty_format_node_dump(
nodeToString( nodeToString(
AllowedDistributionColumnValue.distributionColumnValue)), AllowedDistributionColumnValue.distributionColumnValue)
),
pretty_format_node_dump(nodeToString(shardKey)), pretty_format_node_dump(nodeToString(shardKey)),
AllowedDistributionColumnValue.colocationId, colocationId)); AllowedDistributionColumnValue.colocationId, colocationId));

View File

@ -66,7 +66,8 @@ static bool InsertSelectHasRouterSelect(Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery, static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
CitusTableCacheEntry *targetTableCacheEntry, CitusTableCacheEntry *targetTableCacheEntry
,
ShardInterval *shardInterval, ShardInterval *shardInterval,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,
@ -1155,7 +1156,8 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
TargetEntry *newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar, TargetEntry *newInsertTargetEntry = makeTargetEntry((Expr *) newInsertVar,
originalAttrNo, originalAttrNo,
oldInsertTargetEntry->resname, oldInsertTargetEntry->resname,
oldInsertTargetEntry->resjunk); oldInsertTargetEntry->resjunk)
;
newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry); newInsertTargetlist = lappend(newInsertTargetlist, newInsertTargetEntry);
resno++; resno++;

View File

@ -67,7 +67,8 @@ static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
Query *query, Query *query,
Node *quals, Node *quals,
List *targetList, List *targetList,
CmdType commandType); CmdType commandType
);
static DistributedPlan * CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, static DistributedPlan * CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery,
Query *query, Query *query,
@ -426,10 +427,10 @@ ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, List *rangeTableList)
#if PG_VERSION_NUM >= PG_VERSION_18 #if PG_VERSION_NUM >= PG_VERSION_18
case RTE_GROUP: case RTE_GROUP:
#endif #endif
{ {
/* Skip them as base table(s) will be checked */ /* Skip them as base table(s) will be checked */
continue; continue;
} }
/* /*
* RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations, * RTE_NAMEDTUPLESTORE is typically used in ephmeral named relations,
@ -574,8 +575,8 @@ IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool
Var *distributionColumn = DistPartitionKey(relationId); Var *distributionColumn = DistPartitionKey(relationId);
/* not all distributed tables have partition column */ /* not all distributed tables have partition column */
if (distributionColumn != NULL && column->varattno == if (distributionColumn != NULL && column->varattno == distributionColumn->varattno
distributionColumn->varattno) )
{ {
isDistributionColumn = true; isDistributionColumn = true;
} }
@ -1046,7 +1047,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
{ {
ListCell *restrictionCell = NULL; ListCell *restrictionCell = NULL;
foreach(restrictionCell, foreach(restrictionCell,
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList) plannerRestrictionContext->relationRestrictionContext->relationRestrictionList
)
{ {
RelationRestriction *relationRestriction = RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(restrictionCell); (RelationRestriction *) lfirst(restrictionCell);
@ -1078,7 +1080,8 @@ DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList, DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
PlannerRestrictionContext *plannerRestrictionContext, PlannerRestrictionContext *plannerRestrictionContext
,
Oid targetRelationId) Oid targetRelationId)
{ {
List *distTablesList = NIL; List *distTablesList = NIL;
@ -1116,7 +1119,8 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
if (list_length(distTablesList) > 0 && list_length(localTablesList) > 0) if (list_length(distTablesList) > 0 && list_length(localTablesList) > 0)
{ {
ereport(DEBUG1, (errmsg( ereport(DEBUG1, (errmsg(
"A mix of distributed and local table, try repartitioning"))); "A mix of distributed and local table, try repartitioning")))
;
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"A mix of distributed and citus-local table, " "A mix of distributed and citus-local table, "
"routable query is not possible", NULL, NULL); "routable query is not possible", NULL, NULL);

View File

@ -271,7 +271,8 @@ static void AppendTargetEntryToGroupClause(TargetEntry *targetEntry,
static bool WorkerAggregateWalker(Node *node, static bool WorkerAggregateWalker(Node *node,
WorkerAggregateWalkerContext *walkerContext); WorkerAggregateWalkerContext *walkerContext);
static List * WorkerAggregateExpressionList(Aggref *originalAggregate, static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
WorkerAggregateWalkerContext *walkerContextry); WorkerAggregateWalkerContext *walkerContextry)
;
static AggregateType GetAggregateType(Aggref *aggregatExpression); static AggregateType GetAggregateType(Aggref *aggregatExpression);
static Oid AggregateArgumentType(Aggref *aggregate); static Oid AggregateArgumentType(Aggref *aggregate);
static Expr * FirstAggregateArgument(Aggref *aggregate); static Expr * FirstAggregateArgument(Aggref *aggregate);
@ -293,18 +294,19 @@ static Const * MakeIntegerConst(int32 integerValue);
/* Local functions forward declarations for aggregate expression checks */ /* Local functions forward declarations for aggregate expression checks */
static bool HasNonDistributableAggregates(MultiNode *logicalPlanNode); static bool HasNonDistributableAggregates(MultiNode *logicalPlanNode);
static bool CanPushDownExpression(Node *expression, static bool CanPushDownExpression(Node *expression,
const ExtendedOpNodeProperties *extendedOpNodeProperties); const ExtendedOpNodeProperties *extendedOpNodeProperties
static DeferredErrorMessage * DeferErrorIfHasNonDistributableAggregates( );
MultiNode *logicalPlanNode); static DeferredErrorMessage * DeferErrorIfHasNonDistributableAggregates(MultiNode *
static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate( logicalPlanNode);
Aggref *arrayAggregateExpression); static DeferredErrorMessage * DeferErrorIfUnsupportedArrayAggregate(Aggref *
arrayAggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedJsonAggregate(AggregateType type, static DeferredErrorMessage * DeferErrorIfUnsupportedJsonAggregate(AggregateType type,
Aggref * Aggref *
aggregateExpression); aggregateExpression);
static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct( static DeferredErrorMessage * DeferErrorIfUnsupportedAggregateDistinct(Aggref *
Aggref *aggregateExpression, aggregateExpression,
MultiNode * MultiNode *
logicalPlanNode); logicalPlanNode);
static Var * AggregateDistinctColumn(Aggref *aggregateExpression); static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList, static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode, MultiExtendedOp *opNode,
@ -322,10 +324,10 @@ static bool HasOrderByAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList); static bool HasOrderByNonCommutativeAggregate(List *sortClauseList, List *targetList);
static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList); static bool HasOrderByComplexExpression(List *sortClauseList, List *targetList);
static bool HasOrderByHllType(List *sortClauseList, List *targetList); static bool HasOrderByHllType(List *sortClauseList, List *targetList);
static bool ShouldProcessDistinctOrderAndLimitForWorker( static bool ShouldProcessDistinctOrderAndLimitForWorker(ExtendedOpNodeProperties *
ExtendedOpNodeProperties *extendedOpNodeProperties, extendedOpNodeProperties,
bool pushingDownOriginalGrouping, bool pushingDownOriginalGrouping,
Node *havingQual); Node *havingQual);
static bool IsIndexInRange(const List *list, int index); static bool IsIndexInRange(const List *list, int index);
/* /*
@ -5061,10 +5063,10 @@ HasOrderByHllType(List *sortClauseList, List *targetList)
* neither should ProcessLimitOrderByForWorkerQuery. * neither should ProcessLimitOrderByForWorkerQuery.
*/ */
static bool static bool
ShouldProcessDistinctOrderAndLimitForWorker( ShouldProcessDistinctOrderAndLimitForWorker(ExtendedOpNodeProperties *
ExtendedOpNodeProperties *extendedOpNodeProperties, extendedOpNodeProperties,
bool pushingDownOriginalGrouping, bool pushingDownOriginalGrouping,
Node *havingQual) Node *havingQual)
{ {
if (extendedOpNodeProperties->pullUpIntermediateRows) if (extendedOpNodeProperties->pullUpIntermediateRows)
{ {

View File

@ -153,8 +153,8 @@ static String * MakeDummyColumnString(int dummyColumnId);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList); static List * GroupInsertValuesByShardId(List *insertValuesList);
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
static DeferredErrorMessage * DeferErrorIfUnsupportedRouterPlannableSelectQuery( static DeferredErrorMessage * DeferErrorIfUnsupportedRouterPlannableSelectQuery(Query *
Query *query); query);
static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree); static DeferredErrorMessage * ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree);
static DeferredErrorMessage * ErrorIfQueryHasCTEWithSearchClause(Query *queryTree); static DeferredErrorMessage * ErrorIfQueryHasCTEWithSearchClause(Query *queryTree);
static bool ContainsSearchClauseWalker(Node *node, void *context); static bool ContainsSearchClauseWalker(Node *node, void *context);
@ -855,7 +855,8 @@ DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList)
"Modifying local tables with remote local tables is " "Modifying local tables with remote local tables is "
"not supported.", "not supported.",
NULL, NULL,
"Consider wrapping remote local table to a CTE, or subquery"); "Consider wrapping remote local table to a CTE, or subquery")
;
} }
return NULL; return NULL;
} }
@ -3152,7 +3153,8 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
if (cachedShardInterval == NULL) if (cachedShardInterval == NULL)
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"could not find shardinterval to which to send the query"))); "could not find shardinterval to which to send the query")
));
} }
if (outputPartitionValueConst != NULL) if (outputPartitionValueConst != NULL)

View File

@ -107,8 +107,12 @@ static AttrNumber FindResnoForVarInTargetList(List *targetList, int varno, int v
static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo, static bool RelationInfoContainsOnlyRecurringTuples(PlannerInfo *plannerInfo,
Relids relids); Relids relids);
static char * RecurringTypeDescription(RecurringTuplesType recurType); static char * RecurringTypeDescription(RecurringTuplesType recurType);
static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery( static DeferredErrorMessage * DeferredErrorIfUnsupportedLateralSubquery(PlannerInfo *
PlannerInfo *plannerInfo, Relids recurringRelIds, Relids nonRecurringRelIds); plannerInfo,
Relids
recurringRelIds,
Relids
nonRecurringRelIds);
static bool ContainsLateralSubquery(PlannerInfo *plannerInfo); static bool ContainsLateralSubquery(PlannerInfo *plannerInfo);
static Var * PartitionColumnForPushedDownSubquery(Query *query); static Var * PartitionColumnForPushedDownSubquery(Query *query);
static bool ContainsReferencesToRelids(Query *query, Relids relids, int *foundRelid); static bool ContainsReferencesToRelids(Query *query, Relids relids, int *foundRelid);
@ -790,9 +794,9 @@ FromClauseRecurringTupleType(Query *queryTree)
* such queries have lateral subqueries. * such queries have lateral subqueries.
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
DeferredErrorIfUnsupportedRecurringTuplesJoin( DeferredErrorIfUnsupportedRecurringTuplesJoin(PlannerRestrictionContext *
PlannerRestrictionContext *plannerRestrictionContext, plannerRestrictionContext,
bool plannerPhase) bool plannerPhase)
{ {
List *joinRestrictionList = List *joinRestrictionList =
plannerRestrictionContext->joinRestrictionContext->joinRestrictionList; plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;

View File

@ -161,7 +161,8 @@ static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query,
RecursivePlanningContext * RecursivePlanningContext *
recursivePlanningContext); recursivePlanningContext);
static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query,
RecursivePlanningContext *context, RecursivePlanningContext *context
,
bool chainedJoin); bool chainedJoin);
static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query, static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query,
RecursivePlanningContext *context); RecursivePlanningContext *context);
@ -207,8 +208,8 @@ static bool CanPushdownRecurringOuterJoinOnOuterRTE(RangeTblEntry *rte);
static bool CanPushdownRecurringOuterJoinOnInnerVar(Var *innervar, RangeTblEntry *rte); static bool CanPushdownRecurringOuterJoinOnInnerVar(Var *innervar, RangeTblEntry *rte);
static bool CanPushdownRecurringOuterJoin(JoinExpr *joinExpr, Query *query); static bool CanPushdownRecurringOuterJoin(JoinExpr *joinExpr, Query *query);
#if PG_VERSION_NUM < PG_VERSION_17 #if PG_VERSION_NUM < PG_VERSION_17
static bool hasPseudoconstantQuals( static bool hasPseudoconstantQuals(RelationRestrictionContext *relationRestrictionContext)
RelationRestrictionContext *relationRestrictionContext); ;
#endif #endif
/* /*
@ -2780,8 +2781,8 @@ CanPushdownRecurringOuterJoinOnInnerVar(Var *innerVar, RangeTblEntry *rte)
} }
/* Check if the inner variable is part of the distribution column */ /* Check if the inner variable is part of the distribution column */
if (cacheEntry->partitionColumn && innerVar->varattno == if (cacheEntry->partitionColumn && innerVar->varattno == cacheEntry->partitionColumn->
cacheEntry->partitionColumn->varattno) varattno)
{ {
return true; return true;
} }
@ -2922,7 +2923,8 @@ CanPushdownRecurringOuterJoinExtended(JoinExpr *joinExpr, Query *query,
joinExpr->larg, query->rtable)) joinExpr->larg, query->rtable))
{ {
ereport(DEBUG5, (errmsg( ereport(DEBUG5, (errmsg(
"Lateral join is not supported for pushdown in this path."))); "Lateral join is not supported for pushdown in this path.")))
;
return false; return false;
} }

View File

@ -156,9 +156,10 @@ static bool AllDistributedRelationsInRestrictionContextColocated(
restrictionContext); restrictionContext);
static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node); static bool IsNotSafeRestrictionToRecursivelyPlan(Node *node);
static bool HasPlaceHolderVar(Node *node); static bool HasPlaceHolderVar(Node *node);
static JoinRestrictionContext * FilterJoinRestrictionContext( static JoinRestrictionContext * FilterJoinRestrictionContext(JoinRestrictionContext *
JoinRestrictionContext *joinRestrictionContext, Relids joinRestrictionContext,
queryRteIdentities); Relids
queryRteIdentities);
static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int static bool RangeTableArrayContainsAnyRTEIdentities(RangeTblEntry **rangeTableEntries, int
rangeTableArrayLength, Relids rangeTableArrayLength, Relids
queryRteIdentities); queryRteIdentities);
@ -614,7 +615,8 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *restrictionCon
List *attributeEquivalenceList = GenerateAllAttributeEquivalences(restrictionContext); List *attributeEquivalenceList = GenerateAllAttributeEquivalences(restrictionContext);
return RestrictionEquivalenceForPartitionKeysViaEquivalences(restrictionContext, return RestrictionEquivalenceForPartitionKeysViaEquivalences(restrictionContext,
attributeEquivalenceList); attributeEquivalenceList)
;
} }
@ -1160,8 +1162,8 @@ GenerateCommonEquivalence(List *attributeEquivalenceList,
* with a single AttributeEquivalenceClassMember. * with a single AttributeEquivalenceClassMember.
*/ */
static AttributeEquivalenceClass * static AttributeEquivalenceClass *
GenerateEquivalenceClassForRelationRestriction( GenerateEquivalenceClassForRelationRestriction(RelationRestrictionContext *
RelationRestrictionContext *relationRestrictionContext) relationRestrictionContext)
{ {
ListCell *relationRestrictionCell = NULL; ListCell *relationRestrictionCell = NULL;
AttributeEquivalenceClassMember *eqMember = NULL; AttributeEquivalenceClassMember *eqMember = NULL;
@ -2071,8 +2073,8 @@ FindQueryContainingRTEIdentityInternal(Node *node,
* distributed relations in the given relation restrictions list are co-located. * distributed relations in the given relation restrictions list are co-located.
*/ */
static bool static bool
AllDistributedRelationsInRestrictionContextColocated( AllDistributedRelationsInRestrictionContextColocated(RelationRestrictionContext *
RelationRestrictionContext *restrictionContext) restrictionContext)
{ {
RelationRestriction *relationRestriction = NULL; RelationRestriction *relationRestriction = NULL;
List *relationIdList = NIL; List *relationIdList = NIL;

View File

@ -1215,7 +1215,9 @@ AddPartitionKeyRestrictionToInstance(ClauseWalkerContext *context, OpExpr *opCla
} }
default: default:
{
Assert(false); Assert(false);
}
} }
} }

View File

@ -131,8 +131,8 @@ static void ExecuteRemainingPostLoadTableCommands(List *logicalRepTargetList);
static char * escape_param_str(const char *str); static char * escape_param_str(const char *str);
static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command);
static void WaitForMiliseconds(long timeout); static void WaitForMiliseconds(long timeout);
static XLogRecPtr GetSubscriptionPosition( static XLogRecPtr GetSubscriptionPosition(GroupedLogicalRepTargets *
GroupedLogicalRepTargets *groupedLogicalRepTargets); groupedLogicalRepTargets);
static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode,
List *shardIntervals); List *shardIntervals);

View File

@ -210,9 +210,11 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default: default:
{
ereport(ERROR, errmsg( ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action)); change->action));
}
} }
#else #else
switch (change->action) switch (change->action)
@ -245,9 +247,11 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default: default:
{
ereport(ERROR, errmsg( ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action)); change->action));
}
} }
#endif #endif
@ -318,9 +322,11 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default: default:
{
ereport(ERROR, errmsg( ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action)); change->action));
}
} }
#else #else
switch (change->action) switch (change->action)
@ -373,9 +379,11 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */
default: default:
{
ereport(ERROR, errmsg( ereport(ERROR, errmsg(
"Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE",
change->action)); change->action));
}
} }
#endif #endif
} }

View File

@ -132,15 +132,15 @@ ReadColumnarOptions_type extern_ReadColumnarOptions = NULL;
* module. * module.
*/ */
#define DEFINE_COLUMNAR_PASSTHROUGH_FUNC(funcname) \ #define DEFINE_COLUMNAR_PASSTHROUGH_FUNC(funcname) \
static PGFunction CppConcat(extern_, funcname); \ static PGFunction CppConcat(extern_, funcname); \
PG_FUNCTION_INFO_V1(funcname); \ PG_FUNCTION_INFO_V1(funcname); \
Datum funcname(PG_FUNCTION_ARGS) \ Datum funcname(PG_FUNCTION_ARGS) \
{ \ { \
return CppConcat(extern_, funcname)(fcinfo); \ return CppConcat(extern_, funcname)(fcinfo); \
} }
#define INIT_COLUMNAR_SYMBOL(typename, funcname) \ #define INIT_COLUMNAR_SYMBOL(typename, funcname) \
CppConcat(extern_, funcname) = \ CppConcat(extern_, funcname) = \
(typename) (void *) lookup_external_function(handle, # funcname) (typename) (void *) lookup_external_function(handle, # funcname)
#define CDC_DECODER_DYNAMIC_LIB_PATH "$libdir/citus_decoders:$libdir" #define CDC_DECODER_DYNAMIC_LIB_PATH "$libdir/citus_decoders:$libdir"

View File

@ -541,7 +541,8 @@ StatCountersShmemInit(void)
SharedBackendStatsSlotArray = (BackendStatsSlot *) SharedBackendStatsSlotArray = (BackendStatsSlot *)
ShmemInitStruct("Citus Shared Backend Stats Slot Array", ShmemInitStruct("Citus Shared Backend Stats Slot Array",
SharedBackendStatsSlotArrayShmemSize(), SharedBackendStatsSlotArrayShmemSize(),
&sharedBackendStatsSlotArrayAlreadyInit); &sharedBackendStatsSlotArrayAlreadyInit)
;
bool sharedSavedBackendStatsHashLockAlreadyInit = false; bool sharedSavedBackendStatsHashLockAlreadyInit = false;
SharedSavedBackendStatsHashLock = ShmemInitStruct( SharedSavedBackendStatsHashLock = ShmemInitStruct(

View File

@ -35,9 +35,9 @@
#define ALTER_CURRENT_PROCESS_ID \ #define ALTER_CURRENT_PROCESS_ID \
"ALTER SYSTEM SET citus.isolation_test_session_process_id TO %d" "ALTER SYSTEM SET citus.isolation_test_session_process_id TO %d"
#define ALTER_CURRENT_WORKER_PROCESS_ID \ #define ALTER_CURRENT_WORKER_PROCESS_ID \
"ALTER SYSTEM SET citus.isolation_test_session_remote_process_id TO %ld" "ALTER SYSTEM SET citus.isolation_test_session_remote_process_id TO %ld"
#define GET_PROCESS_ID "SELECT process_id FROM get_current_transaction_id()" #define GET_PROCESS_ID "SELECT process_id FROM get_current_transaction_id()"

View File

@ -34,8 +34,8 @@
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
/* static declarations for json conversion */ /* static declarations for json conversion */
static List * JsonArrayToShardPlacementTestInfoList( static List * JsonArrayToShardPlacementTestInfoList(ArrayType *
ArrayType *shardPlacementJsonArrayObject); shardPlacementJsonArrayObject);
static List * JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject); static List * JsonArrayToWorkerTestInfoList(ArrayType *workerNodeJsonArrayObject);
static bool JsonFieldValueBoolDefault(Datum jsonDocument, const char *key, static bool JsonFieldValueBoolDefault(Datum jsonDocument, const char *key,
bool defaultValue); bool defaultValue);

View File

@ -395,8 +395,8 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
DistributedTransactionId *currentTransactionId = DistributedTransactionId *currentTransactionId =
&currentBackendData.transactionId; &currentBackendData.transactionId;
if (currentTransactionId->transactionNumber != if (currentTransactionId->transactionNumber != transactionNode->transactionId.
transactionNode->transactionId.transactionNumber) transactionNumber)
{ {
continue; continue;
} }

View File

@ -58,7 +58,7 @@
#include "distributed/worker_log_messages.h" #include "distributed/worker_log_messages.h"
#define COMMIT_MANAGEMENT_COMMAND_2PC \ #define COMMIT_MANAGEMENT_COMMAND_2PC \
"SELECT citus_internal.commit_management_command_2pc()" "SELECT citus_internal.commit_management_command_2pc()"
CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE; CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;

View File

@ -634,8 +634,9 @@ SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList
* false. * false.
*/ */
bool bool
SendOptionalCommandListToWorkerOutsideTransactionWithConnection( SendOptionalCommandListToWorkerOutsideTransactionWithConnection(MultiConnection *
MultiConnection *workerConnection, List *commandList) workerConnection, List *
commandList)
{ {
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{ {

View File

@ -376,12 +376,12 @@ ExtractAggregationValues(FunctionCallInfo fcinfo, int argumentIndex,
HeapTupleHeader tupleHeader = HeapTupleHeader tupleHeader =
DatumGetHeapTupleHeader(fcGetArgValue(fcinfo, argumentIndex)); DatumGetHeapTupleHeader(fcGetArgValue(fcinfo, argumentIndex));
if (HeapTupleHeaderGetNatts(tupleHeader) != if (HeapTupleHeaderGetNatts(tupleHeader) != aggregationArgumentContext->
aggregationArgumentContext->argumentCount || argumentCount ||
HeapTupleHeaderGetTypeId(tupleHeader) != HeapTupleHeaderGetTypeId(tupleHeader) != aggregationArgumentContext->
aggregationArgumentContext->tupleDesc->tdtypeid || tupleDesc->tdtypeid ||
HeapTupleHeaderGetTypMod(tupleHeader) != HeapTupleHeaderGetTypMod(tupleHeader) != aggregationArgumentContext->
aggregationArgumentContext->tupleDesc->tdtypmod) tupleDesc->tdtypmod)
{ {
ereport(ERROR, (errmsg("worker_partial_agg_sfunc received " ereport(ERROR, (errmsg("worker_partial_agg_sfunc received "
"incompatible record"))); "incompatible record")));
@ -818,7 +818,8 @@ coord_combine_agg_ffunc(PG_FUNCTION_ARGS)
if (!TypecheckCoordCombineAggReturnType(fcinfo, ffunc, box)) if (!TypecheckCoordCombineAggReturnType(fcinfo, ffunc, box))
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"coord_combine_agg_ffunc could not confirm type correctness"))); "coord_combine_agg_ffunc could not confirm type correctness"))
);
} }
if (ffunc == InvalidOid) if (ffunc == InvalidOid)

View File

@ -88,26 +88,26 @@ static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInf
bool *hadError); bool *hadError);
static void UpdateDependingTasks(BackgroundTask *task); static void UpdateDependingTasks(BackgroundTask *task);
static int64 CalculateBackoffDelay(int retryCount); static int64 CalculateBackoffDelay(int retryCount);
static bool NewExecutorExceedsCitusLimit( static bool NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *
QueueMonitorExecutionContext *queueMonitorExecutionContext); queueMonitorExecutionContext);
static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
QueueMonitorExecutionContext * QueueMonitorExecutionContext *
queueMonitorExecutionContext); queueMonitorExecutionContext);
static bool AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, static bool AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
QueueMonitorExecutionContext * QueueMonitorExecutionContext *
queueMonitorExecutionContext); queueMonitorExecutionContext);
static void AssignRunnableTasks( static void AssignRunnableTasks(QueueMonitorExecutionContext *queueMonitorExecutionContext
QueueMonitorExecutionContext *queueMonitorExecutionContext); );
static List * GetRunningTaskEntries(HTAB *currentExecutors); static List * GetRunningTaskEntries(HTAB *currentExecutors);
static shm_mq_result ReadFromExecutorQueue( static shm_mq_result ReadFromExecutorQueue(BackgroundExecutorHashEntry *
BackgroundExecutorHashEntry *backgroundExecutorHashEntry, backgroundExecutorHashEntry,
bool *hadError); bool *hadError);
static void CheckAndResetLastWorkerAllocationFailure( static void CheckAndResetLastWorkerAllocationFailure(QueueMonitorExecutionContext *
QueueMonitorExecutionContext *queueMonitorExecutionContext); queueMonitorExecutionContext);
static TaskExecutionStatus TaskConcurrentCancelCheck( static TaskExecutionStatus TaskConcurrentCancelCheck(TaskExecutionContext *
TaskExecutionContext *taskExecutionContext); taskExecutionContext);
static TaskExecutionStatus ConsumeExecutorQueue( static TaskExecutionStatus ConsumeExecutorQueue(TaskExecutionContext *taskExecutionContext
TaskExecutionContext *taskExecutionContext); );
static void TaskHadError(TaskExecutionContext *taskExecutionContext); static void TaskHadError(TaskExecutionContext *taskExecutionContext);
static void TaskEnded(TaskExecutionContext *taskExecutionContext); static void TaskEnded(TaskExecutionContext *taskExecutionContext);
static void TerminateAllTaskExecutors(HTAB *currentExecutors); static void TerminateAllTaskExecutors(HTAB *currentExecutors);
@ -537,7 +537,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
*/ */
static bool static bool
AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
QueueMonitorExecutionContext *queueMonitorExecutionContext) QueueMonitorExecutionContext *queueMonitorExecutionContext
)
{ {
Assert(runnableTask && runnableTask->status == BACKGROUND_TASK_STATUS_RUNNABLE); Assert(runnableTask && runnableTask->status == BACKGROUND_TASK_STATUS_RUNNABLE);
@ -649,8 +650,8 @@ GetRunningTaskEntries(HTAB *currentExecutors)
* It also resets the failure timestamp. * It also resets the failure timestamp.
*/ */
static void static void
CheckAndResetLastWorkerAllocationFailure( CheckAndResetLastWorkerAllocationFailure(QueueMonitorExecutionContext *
QueueMonitorExecutionContext *queueMonitorExecutionContext) queueMonitorExecutionContext)
{ {
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime > 0) if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime > 0)
{ {

View File

@ -34,57 +34,57 @@ CitusSetTag(Node *node, int tag)
#define DECLARE_FROM_AND_NEW_NODE(nodeTypeName) \ #define DECLARE_FROM_AND_NEW_NODE(nodeTypeName) \
nodeTypeName *newnode = \ nodeTypeName *newnode = \
(nodeTypeName *) CitusSetTag((Node *) target_node, T_ ## nodeTypeName); \ (nodeTypeName *) CitusSetTag((Node *) target_node, T_ ## nodeTypeName); \
nodeTypeName *from = (nodeTypeName *) source_node nodeTypeName *from = (nodeTypeName *) source_node
/* Copy a simple scalar field (int, float, bool, enum, etc) */ /* Copy a simple scalar field (int, float, bool, enum, etc) */
#define COPY_SCALAR_FIELD(fldname) \ #define COPY_SCALAR_FIELD(fldname) \
(newnode->fldname = from->fldname) (newnode->fldname = from->fldname)
/* Copy a field that is a pointer to some kind of Node or Node tree */ /* Copy a field that is a pointer to some kind of Node or Node tree */
#define COPY_NODE_FIELD(fldname) \ #define COPY_NODE_FIELD(fldname) \
(newnode->fldname = copyObject(from->fldname)) (newnode->fldname = copyObject(from->fldname))
/* Copy a field that is a pointer to a C string, or perhaps NULL */ /* Copy a field that is a pointer to a C string, or perhaps NULL */
#define COPY_STRING_FIELD(fldname) \ #define COPY_STRING_FIELD(fldname) \
(newnode->fldname = from->fldname ? pstrdup(from->fldname) : (char *) NULL) (newnode->fldname = from->fldname ? pstrdup(from->fldname) : (char *) NULL)
/* Copy a node array. Target array is also allocated. */ /* Copy a node array. Target array is also allocated. */
#define COPY_NODE_ARRAY(fldname, type, count) \ #define COPY_NODE_ARRAY(fldname, type, count) \
do { \ do { \
int i = 0; \ int i = 0; \
newnode->fldname = (type **) palloc(count * sizeof(type *)); \ newnode->fldname = (type **) palloc(count * sizeof(type *)); \
for (i = 0; i < count; ++i) \ for (i = 0; i < count; ++i) \
{ \ { \
newnode->fldname[i] = copyObject(from->fldname[i]); \ newnode->fldname[i] = copyObject(from->fldname[i]); \
} \
} \ } \
} \ while (0)
while (0)
/* Copy a scalar array. Target array is also allocated. */ /* Copy a scalar array. Target array is also allocated. */
#define COPY_SCALAR_ARRAY(fldname, type, count) \ #define COPY_SCALAR_ARRAY(fldname, type, count) \
do { \ do { \
int i = 0; \ int i = 0; \
newnode->fldname = (type *) palloc(count * sizeof(type)); \ newnode->fldname = (type *) palloc(count * sizeof(type)); \
for (i = 0; i < count; ++i) \ for (i = 0; i < count; ++i) \
{ \ { \
newnode->fldname[i] = from->fldname[i]; \ newnode->fldname[i] = from->fldname[i]; \
} \
} \ } \
} \ while (0)
while (0)
#define COPY_STRING_LIST(fldname) \ #define COPY_STRING_LIST(fldname) \
do { \ do { \
char *curString = NULL; \ char *curString = NULL; \
List *newList = NIL; \ List *newList = NIL; \
foreach_declared_ptr(curString, from->fldname) { \ foreach_declared_ptr(curString, from->fldname) { \
char *newString = curString ? pstrdup(curString) : (char *) NULL; \ char *newString = curString ? pstrdup(curString) : (char *) NULL; \
newList = lappend(newList, newString); \ newList = lappend(newList, newString); \
} \
newnode->fldname = newList; \
} \ } \
newnode->fldname = newList; \ while (0)
} \
while (0)
static void CopyTaskQuery(Task *newnode, Task *from); static void CopyTaskQuery(Task *newnode, Task *from);

View File

@ -531,8 +531,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,
continue; continue;
} }
if (colocationId == INVALID_COLOCATION_ID || colocationId > if (colocationId == INVALID_COLOCATION_ID || colocationId > colocationForm->
colocationForm->colocationid) colocationid)
{ {
/* /*
* We assign the smallest colocation id among all the matches so that we * We assign the smallest colocation id among all the matches so that we
@ -1051,8 +1051,8 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be * Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex. * same and greater than shardIntervalIndex.
*/ */
Assert(cacheEntry->shardIntervalArrayLength == Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
colocatedTableCacheEntry->shardIntervalArrayLength); shardIntervalArrayLength);
ShardInterval *colocatedShardInterval = ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
@ -1122,8 +1122,8 @@ ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be * Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex. * same and greater than shardIntervalIndex.
*/ */
Assert(cacheEntry->shardIntervalArrayLength == Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
colocatedTableCacheEntry->shardIntervalArrayLength); shardIntervalArrayLength);
ShardInterval *colocatedShardInterval = ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];

View File

@ -43,7 +43,7 @@
#define ENABLE_SSL_QUERY "ALTER SYSTEM SET ssl TO on;" #define ENABLE_SSL_QUERY "ALTER SYSTEM SET ssl TO on;"
#define RESET_CITUS_NODE_CONNINFO \ #define RESET_CITUS_NODE_CONNINFO \
"ALTER SYSTEM SET citus.node_conninfo TO 'sslmode=prefer';" "ALTER SYSTEM SET citus.node_conninfo TO 'sslmode=prefer';"
#define CITUS_AUTO_SSL_COMMON_NAME "citus-auto-ssl" #define CITUS_AUTO_SSL_COMMON_NAME "citus-auto-ssl"
#define X509_SUBJECT_COMMON_NAME "CN" #define X509_SUBJECT_COMMON_NAME "CN"
@ -65,7 +65,7 @@
"ECDHE-RSA-AES128-SHA256:" \ "ECDHE-RSA-AES128-SHA256:" \
"ECDHE-RSA-AES256-SHA384" "ECDHE-RSA-AES256-SHA384"
#define SET_CITUS_SSL_CIPHERS_QUERY \ #define SET_CITUS_SSL_CIPHERS_QUERY \
"ALTER SYSTEM SET ssl_ciphers TO '" CITUS_DEFAULT_SSL_CIPHERS "';" "ALTER SYSTEM SET ssl_ciphers TO '" CITUS_DEFAULT_SSL_CIPHERS "';"
/* forward declaration of helper functions */ /* forward declaration of helper functions */

View File

@ -81,7 +81,8 @@ static List * GetRelationshipNodesForFKeyConnectedRelations(
static List * GetAllNeighboursList(ForeignConstraintRelationshipNode *relationshipNode); static List * GetAllNeighboursList(ForeignConstraintRelationshipNode *relationshipNode);
static ForeignConstraintRelationshipNode * GetRelationshipNodeForRelationId(Oid static ForeignConstraintRelationshipNode * GetRelationshipNodeForRelationId(Oid
relationId, relationId,
bool *isFound); bool *isFound)
;
static void CreateForeignConstraintRelationshipGraph(void); static void CreateForeignConstraintRelationshipGraph(void);
static bool IsForeignConstraintRelationshipGraphValid(void); static bool IsForeignConstraintRelationshipGraphValid(void);
static List * GetNeighbourList(ForeignConstraintRelationshipNode *relationshipNode, static List * GetNeighbourList(ForeignConstraintRelationshipNode *relationshipNode,
@ -177,8 +178,8 @@ ShouldUndistributeCitusLocalTable(Oid relationId)
* to given relation node via a foreign key relationhip graph. * to given relation node via a foreign key relationhip graph.
*/ */
static List * static List *
GetRelationshipNodesForFKeyConnectedRelations( GetRelationshipNodesForFKeyConnectedRelations(ForeignConstraintRelationshipNode *
ForeignConstraintRelationshipNode *relationshipNode) relationshipNode)
{ {
HTAB *oidVisitedMap = CreateSimpleHashSetWithName(Oid, "oid visited hash set"); HTAB *oidVisitedMap = CreateSimpleHashSetWithName(Oid, "oid visited hash set");
@ -566,8 +567,8 @@ PopulateAdjacencyLists(void)
/* we just saw this edge, no need to add it twice */ /* we just saw this edge, no need to add it twice */
if (currentFConstraintRelationshipEdge->referencingRelationOID == if (currentFConstraintRelationshipEdge->referencingRelationOID ==
prevReferencingOid && prevReferencingOid &&
currentFConstraintRelationshipEdge->referencedRelationOID == currentFConstraintRelationshipEdge->referencedRelationOID == prevReferencedOid
prevReferencedOid) )
{ {
continue; continue;
} }

View File

@ -61,8 +61,12 @@ static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
List *indexIdList, List *indexIdList,
Oid partitionRelationId); Oid partitionRelationId);
static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( static List * WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char *
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId); qualifiedParentShardIndexName,
Oid
parentIndexId,
Oid
partitionRelationId);
static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid static List * WorkerFixPartitionShardIndexNamesCommandListForPartitionIndex(Oid
partitionIndexId, partitionIndexId,
char * char *
@ -652,8 +656,10 @@ WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
* given partition. Otherwise, all the partitions are included. * given partition. Otherwise, all the partitions are included.
*/ */
static List * static List *
WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex( WorkerFixPartitionShardIndexNamesCommandListForParentShardIndex(char *
char *qualifiedParentShardIndexName, Oid parentIndexId, Oid partitionRelationId) qualifiedParentShardIndexName,
Oid parentIndexId, Oid
partitionRelationId)
{ {
List *commandList = NIL; List *commandList = NIL;

View File

@ -813,7 +813,7 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
appendStringInfo(queryString, appendStringInfo(queryString,
"SELECT pg_catalog.citus_copy_shard_placement(" "SELECT pg_catalog.citus_copy_shard_placement("
UINT64_FORMAT ", %d, %d, " UINT64_FORMAT ", %d, %d, "
"transfer_mode := %s)", "transfer_mode := %s)",
sourceShardPlacement->shardId, sourceShardPlacement->shardId,
sourceShardPlacement->nodeId, sourceShardPlacement->nodeId,
workerNode->nodeId, workerNode->nodeId,

View File

@ -50,7 +50,7 @@
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#define LOCK_RELATION_IF_EXISTS \ #define LOCK_RELATION_IF_EXISTS \
"SELECT pg_catalog.lock_relation_if_exists(%s, %s);" "SELECT pg_catalog.lock_relation_if_exists(%s, %s);"
/* static definition and declarations */ /* static definition and declarations */
struct LockModeToStringType struct LockModeToStringType

View File

@ -29,7 +29,7 @@
* to the following interval: [FirstOffsetNumber, MaxHeapTuplesPerPage]. * to the following interval: [FirstOffsetNumber, MaxHeapTuplesPerPage].
*/ */
#define VALID_ITEMPOINTER_OFFSETS \ #define VALID_ITEMPOINTER_OFFSETS \
((uint64) (MaxHeapTuplesPerPage - FirstOffsetNumber + 1)) ((uint64) (MaxHeapTuplesPerPage - FirstOffsetNumber + 1))
/* /*
* Number of valid ItemPointer BlockNumber's for "row number" <> "ItemPointer" * Number of valid ItemPointer BlockNumber's for "row number" <> "ItemPointer"

View File

@ -23,6 +23,6 @@
#define ACLCHECK_OBJECT_TABLE OBJECT_TABLE #define ACLCHECK_OBJECT_TABLE OBJECT_TABLE
#define ExplainPropertyLong(qlabel, value, es) \ #define ExplainPropertyLong(qlabel, value, es) \
ExplainPropertyInteger(qlabel, NULL, value, es) ExplainPropertyInteger(qlabel, NULL, value, es)
#endif /* COLUMNAR_COMPAT_H */ #endif /* COLUMNAR_COMPAT_H */

View File

@ -15,11 +15,11 @@
* arguments are allowed to be NULL. * arguments are allowed to be NULL.
*/ */
#define PG_ENSURE_ARGNOTNULL(argIndex, argName) \ #define PG_ENSURE_ARGNOTNULL(argIndex, argName) \
if (PG_ARGISNULL(argIndex)) \ if (PG_ARGISNULL(argIndex)) \
{ \ { \
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \ ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), \
errmsg("%s cannot be NULL", argName))); \ errmsg("%s cannot be NULL", argName))); \
} }
/* /*
* PG_GETARG_TEXT_TO_CSTRING is the same as PG_GETARG_TEXT_P, but instead of * PG_GETARG_TEXT_TO_CSTRING is the same as PG_GETARG_TEXT_P, but instead of
@ -27,7 +27,7 @@
* the argument is not NULL. * the argument is not NULL.
*/ */
#define PG_GETARG_TEXT_TO_CSTRING(argIndex) \ #define PG_GETARG_TEXT_TO_CSTRING(argIndex) \
text_to_cstring(PG_GETARG_TEXT_P(argIndex)) text_to_cstring(PG_GETARG_TEXT_P(argIndex))
/* /*
* PG_GETARG_TEXT_TO_CSTRING_OR_NULL is the same as PG_GETARG_TEXT_TO_CSTRING, * PG_GETARG_TEXT_TO_CSTRING_OR_NULL is the same as PG_GETARG_TEXT_TO_CSTRING,
@ -35,14 +35,14 @@
* return a NULL pointer. * return a NULL pointer.
*/ */
#define PG_GETARG_TEXT_TO_CSTRING_OR_NULL(argIndex) \ #define PG_GETARG_TEXT_TO_CSTRING_OR_NULL(argIndex) \
PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_TEXT_TO_CSTRING(argIndex) PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_TEXT_TO_CSTRING(argIndex)
/* /*
* PG_GETARG_NAME_OR_NULL is the same as PG_GETARG_NAME, but it supports the * PG_GETARG_NAME_OR_NULL is the same as PG_GETARG_NAME, but it supports the
* case where the argument is NULL. In this case it will return a NULL pointer. * case where the argument is NULL. In this case it will return a NULL pointer.
*/ */
#define PG_GETARG_NAME_OR_NULL(argIndex) \ #define PG_GETARG_NAME_OR_NULL(argIndex) \
PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_NAME(argIndex) PG_ARGISNULL(argIndex) ? NULL : PG_GETARG_NAME(argIndex)
/* /*
* PG_GETARG_FLOAT4_OR is the same as PG_GETARG_FLOAT4, but it supports the * PG_GETARG_FLOAT4_OR is the same as PG_GETARG_FLOAT4, but it supports the
@ -50,4 +50,4 @@
* fallback. * fallback.
*/ */
#define PG_GETARG_FLOAT4_OR_DEFAULT(argIndex, fallback) \ #define PG_GETARG_FLOAT4_OR_DEFAULT(argIndex, fallback) \
PG_ARGISNULL(argIndex) ? (fallback) : PG_GETARG_FLOAT4(argIndex) PG_ARGISNULL(argIndex) ? (fallback) : PG_GETARG_FLOAT4(argIndex)

View File

@ -35,7 +35,7 @@ extern void RegisterNodes(void);
#define READFUNC_ARGS struct ExtensibleNode *node #define READFUNC_ARGS struct ExtensibleNode *node
#define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node #define OUTFUNC_ARGS StringInfo str, const struct ExtensibleNode *raw_node
#define COPYFUNC_ARGS struct ExtensibleNode *target_node, const struct \ #define COPYFUNC_ARGS struct ExtensibleNode *target_node, const struct \
ExtensibleNode *source_node ExtensibleNode *source_node
extern void ReadUnsupportedCitusNode(READFUNC_ARGS); extern void ReadUnsupportedCitusNode(READFUNC_ARGS);

View File

@ -29,7 +29,8 @@ extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId, extern char * pg_get_tableschemadef_string(Oid tableRelationId,
IncludeSequenceDefaults includeSequenceDefaults, IncludeSequenceDefaults includeSequenceDefaults
,
IncludeIdentities includeIdentityDefaults, IncludeIdentities includeIdentityDefaults,
char *accessMethod); char *accessMethod);
extern void EnsureRelationKindSupported(Oid relationId); extern void EnsureRelationKindSupported(Oid relationId);

View File

@ -48,7 +48,8 @@ extern void SwitchToSequentialAndLocalExecutionIfRelationNameTooLong(Oid relatio
extern void SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(Oid extern void SwitchToSequentialAndLocalExecutionIfPartitionNameTooLong(Oid
parentRelationId, parentRelationId,
Oid Oid
partitionRelationId); partitionRelationId)
;
/* DistOpsOperationType to be used in DistributeObjectOps */ /* DistOpsOperationType to be used in DistributeObjectOps */
typedef enum DistOpsOperationType typedef enum DistOpsOperationType
@ -560,13 +561,15 @@ extern List * PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryStr
processUtilityContext); processUtilityContext);
extern List * PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString); extern List * PostprocessAlterSequenceSchemaStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString, extern List * PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext
);
extern List * PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString); extern List * PostprocessAlterSequenceOwnerStmt(Node *node, const char *queryString);
extern List * PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString, extern List * PreprocessAlterSequencePersistenceStmt(Node *node, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern List * PreprocessSequenceAlterTableStmt(Node *node, const char *queryString, extern List * PreprocessSequenceAlterTableStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext
);
extern List * PreprocessDropSequenceStmt(Node *node, const char *queryString, extern List * PreprocessDropSequenceStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * SequenceDropStmtObjectAddress(Node *stmt, bool missing_ok, bool extern List * SequenceDropStmtObjectAddress(Node *stmt, bool missing_ok, bool
@ -639,7 +642,8 @@ extern void PrepareAlterTableStmtForConstraint(AlterTableStmt *alterTableStateme
extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, extern List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString, extern List * PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext)
;
extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, extern List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern void SkipForeignKeyValidationIfConstraintIsFkey(AlterTableStmt *alterTableStmt, extern void SkipForeignKeyValidationIfConstraintIsFkey(AlterTableStmt *alterTableStmt,
@ -789,9 +793,9 @@ extern List * PostprocessAlterTriggerDependsStmt(Node *node, const char *querySt
extern List * PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString, extern List * PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern void AlterTriggerDependsEventExtendNames( extern void AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *
AlterObjectDependsStmt *alterTriggerDependsStmt, alterTriggerDependsStmt,
char *schemaName, uint64 shardId); char *schemaName, uint64 shardId);
extern void ErrorOutForTriggerIfNotSupported(Oid relationId); extern void ErrorOutForTriggerIfNotSupported(Oid relationId);
extern void ErrorIfRelationHasUnsupportedTrigger(Oid relationId); extern void ErrorIfRelationHasUnsupportedTrigger(Oid relationId);
extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString, extern List * PreprocessDropTriggerStmt(Node *node, const char *queryString,
@ -834,8 +838,9 @@ extern bool RelationIdListHasReferenceTable(List *relationIdList);
extern List * GetFKeyCreationCommandsForRelationIdList(List *relationIdList); extern List * GetFKeyCreationCommandsForRelationIdList(List *relationIdList);
extern void DropRelationForeignKeys(Oid relationId, int flags); extern void DropRelationForeignKeys(Oid relationId, int flags);
extern void SetLocalEnableLocalReferenceForeignKeys(bool state); extern void SetLocalEnableLocalReferenceForeignKeys(bool state);
extern void ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI( extern void ExecuteAndLogUtilityCommandListInTableTypeConversionViaSPI(List *
List *utilityCommandList); utilityCommandList)
;
extern void ExecuteAndLogUtilityCommandList(List *ddlCommandList); extern void ExecuteAndLogUtilityCommandList(List *ddlCommandList);
extern void ExecuteAndLogUtilityCommand(const char *commandString); extern void ExecuteAndLogUtilityCommand(const char *commandString);
extern void ExecuteForeignKeyCreateCommandList(List *ddlCommandList, extern void ExecuteForeignKeyCreateCommandList(List *ddlCommandList,

View File

@ -59,12 +59,13 @@
/* Remote call definitions to help with data staging and deletion */ /* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \ #define WORKER_APPLY_SHARD_DDL_COMMAND \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)" "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
#define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \ #define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)" "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPLY_INTER_SHARD_DDL_COMMAND \ #define WORKER_APPLY_INTER_SHARD_DDL_COMMAND \
"SELECT worker_apply_inter_shard_ddl_command (" UINT64_FORMAT ", %s, " UINT64_FORMAT \ "SELECT worker_apply_inter_shard_ddl_command (" UINT64_FORMAT \
", %s, %s)" ", %s, " UINT64_FORMAT \
", %s, %s)"
#define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s" #define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)" #define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
@ -225,7 +226,8 @@ extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void); extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName, bool missingOk); extern Oid ResolveRelationId(text *relationName, bool missingOk);
extern List * GetFullTableCreationCommands(Oid relationId, extern List * GetFullTableCreationCommands(Oid relationId,
IncludeSequenceDefaults includeSequenceDefaults, IncludeSequenceDefaults includeSequenceDefaults
,
IncludeIdentities includeIdentityDefaults, IncludeIdentities includeIdentityDefaults,
bool creatingShellTableOnRemoteNode); bool creatingShellTableOnRemoteNode);
extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes, extern List * GetPostLoadTableCreationCommands(Oid relationId, bool includeIndexes,

View File

@ -251,7 +251,7 @@ extern PlannedStmt * distributed_planner(Query *parse,
* in distributed queries * in distributed queries
*/ */
#define LOCAL_TABLE_SUBQUERY_CTE_HINT \ #define LOCAL_TABLE_SUBQUERY_CTE_HINT \
"Use CTE's or subqueries to select from local tables and use them in joins" "Use CTE's or subqueries to select from local tables and use them in joins"
extern List * ExtractRangeTableEntryList(Query *query); extern List * ExtractRangeTableEntryList(Query *query);
extern bool NeedsDistributedPlanning(Query *query); extern bool NeedsDistributedPlanning(Query *query);

View File

@ -18,10 +18,10 @@
#define NOT_SUPPORTED_IN_COMMUNITY(name) \ #define NOT_SUPPORTED_IN_COMMUNITY(name) \
PG_FUNCTION_INFO_V1(name); \ PG_FUNCTION_INFO_V1(name); \
Datum name(PG_FUNCTION_ARGS) { \ Datum name(PG_FUNCTION_ARGS) { \
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
errmsg(# name "() is only supported on Citus Enterprise"))); \ errmsg(# name "() is only supported on Citus Enterprise"))); \
} }
#endif #endif

View File

@ -37,8 +37,8 @@ typedef struct DeferredErrorMessage
* serialized/copied/deserialized, i.e. can be embedded in plans and such. * serialized/copied/deserialized, i.e. can be embedded in plans and such.
*/ */
#define DeferredError(code, message, detail, hint) \ #define DeferredError(code, message, detail, hint) \
DeferredErrorInternal(code, message, detail, hint, __FILE__, __LINE__, \ DeferredErrorInternal(code, message, detail, hint, __FILE__, __LINE__, \
PG_FUNCNAME_MACRO) PG_FUNCNAME_MACRO)
DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, DeferredErrorMessage * DeferredErrorInternal(int code, const char *message,
const char *detail, const char *hint, const char *detail, const char *hint,
@ -54,21 +54,21 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message,
*/ */
#ifdef HAVE__BUILTIN_CONSTANT_P #ifdef HAVE__BUILTIN_CONSTANT_P
#define RaiseDeferredError(error, elevel) \ #define RaiseDeferredError(error, elevel) \
do { \ do { \
RaiseDeferredErrorInternal(error, elevel); \ RaiseDeferredErrorInternal(error, elevel); \
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
pg_unreachable(); } \ pg_unreachable(); } \
} \ } \
while (0) while (0)
#else /* !HAVE_BUILTIN_CONSTANT_P */ #else /* !HAVE_BUILTIN_CONSTANT_P */
#define RaiseDeferredError(error, elevel) \ #define RaiseDeferredError(error, elevel) \
do { \ do { \
const int elevel_ = (elevel); \ const int elevel_ = (elevel); \
RaiseDeferredErrorInternal(error, elevel_); \ RaiseDeferredErrorInternal(error, elevel_); \
if (elevel_ >= ERROR) { \ if (elevel_ >= ERROR) { \
pg_unreachable(); } \ pg_unreachable(); } \
} \ } \
while (0) while (0)
#endif /* HAVE_BUILTIN_CONSTANT_P */ #endif /* HAVE_BUILTIN_CONSTANT_P */
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);

View File

@ -40,8 +40,9 @@ typedef struct ExtendedOpNodeProperties
} ExtendedOpNodeProperties; } ExtendedOpNodeProperties;
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties( extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(MultiExtendedOp *
MultiExtendedOp *extendedOpNode, bool hasNonDistributableAggregates); extendedOpNode, bool
hasNonDistributableAggregates);
#endif /* EXTENDED_OP_NODE_UTILS_H_ */ #endif /* EXTENDED_OP_NODE_UTILS_H_ */

View File

@ -20,21 +20,21 @@
* padding bytes. This is necessary to use a type as a hash key with tag_hash. * padding bytes. This is necessary to use a type as a hash key with tag_hash.
*/ */
#define assert_valid_hash_key2(type, field1, field2) \ #define assert_valid_hash_key2(type, field1, field2) \
StaticAssertDecl( \ StaticAssertDecl( \
sizeof(type) == sizeof(((type) { 0 }).field1) \ sizeof(type) == sizeof(((type) { 0 }).field1) \
+ sizeof(((type) { 0 }).field2), \ + sizeof(((type) { 0 }).field2), \
# type " has padding bytes, but is used as a hash key in a simple hash"); # type " has padding bytes, but is used as a hash key in a simple hash");
/* /*
* assert_valid_hash_key3 checks if a type that contains 3 fields contains no * assert_valid_hash_key3 checks if a type that contains 3 fields contains no
* padding bytes. This is necessary to use a type as a hash key with tag_hash. * padding bytes. This is necessary to use a type as a hash key with tag_hash.
*/ */
#define assert_valid_hash_key3(type, field1, field2, field3) \ #define assert_valid_hash_key3(type, field1, field2, field3) \
StaticAssertDecl( \ StaticAssertDecl( \
sizeof(type) == sizeof(((type) { 0 }).field1) \ sizeof(type) == sizeof(((type) { 0 }).field1) \
+ sizeof(((type) { 0 }).field2) \ + sizeof(((type) { 0 }).field2) \
+ sizeof(((type) { 0 }).field3), \ + sizeof(((type) { 0 }).field3), \
# type " has padding bytes, but is used as a hash key in a simple hash"); # type " has padding bytes, but is used as a hash key in a simple hash");
extern void hash_delete_all(HTAB *htab); extern void hash_delete_all(HTAB *htab);
@ -44,10 +44,10 @@ extern void hash_delete_all(HTAB *htab);
*/ */
#define foreach_htab(var, status, htab) \ #define foreach_htab(var, status, htab) \
hash_seq_init((status), (htab)); \ hash_seq_init((status), (htab)); \
for ((var) = hash_seq_search(status); \ for ((var) = hash_seq_search(status); \
(var) != NULL; \ (var) != NULL; \
(var) = hash_seq_search(status)) (var) = hash_seq_search(status))
extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status); extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status);
@ -72,28 +72,28 @@ extern HTAB * CreateSimpleHashWithNameAndSizeInternal(Size keysize, Size entrysi
* returning undefined values. You can check this using assert_valid_hash_keyX. * returning undefined values. You can check this using assert_valid_hash_keyX.
*/ */
#define CreateSimpleHash(keyType, entryType) \ #define CreateSimpleHash(keyType, entryType) \
CreateSimpleHashWithNameAndSize(keyType, entryType, \ CreateSimpleHashWithNameAndSize(keyType, entryType, \
# entryType "Hash", 32) # entryType "Hash", 32)
/* /*
* Same as CreateSimpleHash but allows specifying the name * Same as CreateSimpleHash but allows specifying the name
*/ */
#define CreateSimpleHashWithName(keyType, entryType, name) \ #define CreateSimpleHashWithName(keyType, entryType, name) \
CreateSimpleHashWithNameAndSize(keyType, entryType, \ CreateSimpleHashWithNameAndSize(keyType, entryType, \
name, 32) name, 32)
/* /*
* CreateSimpleHashWithSize is the same as CreateSimpleHash, but allows * CreateSimpleHashWithSize is the same as CreateSimpleHash, but allows
* configuring of the amount of elements that initially fit in the hash table. * configuring of the amount of elements that initially fit in the hash table.
*/ */
#define CreateSimpleHashWithSize(keyType, entryType, size) \ #define CreateSimpleHashWithSize(keyType, entryType, size) \
CreateSimpleHashWithNameAndSize(keyType, entryType, \ CreateSimpleHashWithNameAndSize(keyType, entryType, \
# entryType "Hash", size) # entryType "Hash", size)
#define CreateSimpleHashWithNameAndSize(keyType, entryType, name, size) \ #define CreateSimpleHashWithNameAndSize(keyType, entryType, name, size) \
CreateSimpleHashWithNameAndSizeInternal(sizeof(keyType), \ CreateSimpleHashWithNameAndSizeInternal(sizeof(keyType), \
sizeof(entryType), \ sizeof(entryType), \
name, size) name, size)
/* /*
@ -101,8 +101,8 @@ extern HTAB * CreateSimpleHashWithNameAndSizeInternal(Size keysize, Size entrysi
* tag_hash and stores the values in the CurrentMemoryContext. * tag_hash and stores the values in the CurrentMemoryContext.
*/ */
#define CreateSimpleHashSet(keyType) \ #define CreateSimpleHashSet(keyType) \
CreateSimpleHashWithName(keyType, keyType, \ CreateSimpleHashWithName(keyType, keyType, \
# keyType "HashSet") # keyType "HashSet")
/* /*
* CreatesSimpleHashSetWithSize creates a hash set that hashes its values using * CreatesSimpleHashSetWithSize creates a hash set that hashes its values using
@ -110,7 +110,7 @@ extern HTAB * CreateSimpleHashWithNameAndSizeInternal(Size keysize, Size entrysi
* specifying its number of elements. * specifying its number of elements.
*/ */
#define CreateSimpleHashSetWithSize(keyType, size) \ #define CreateSimpleHashSetWithSize(keyType, size) \
CreateSimpleHashWithNameAndSize(keyType, keyType, # keyType "HashSet", size) CreateSimpleHashWithNameAndSize(keyType, keyType, # keyType "HashSet", size)
/* /*
* CreatesSimpleHashSetWithName creates a hash set that hashes its values using the * CreatesSimpleHashSetWithName creates a hash set that hashes its values using the
@ -118,7 +118,7 @@ extern HTAB * CreateSimpleHashWithNameAndSizeInternal(Size keysize, Size entrysi
* specifying its name. * specifying its name.
*/ */
#define CreateSimpleHashSetWithName(keyType, name) \ #define CreateSimpleHashSetWithName(keyType, name) \
CreateSimpleHashWithName(keyType, keyType, name) CreateSimpleHashWithName(keyType, keyType, name)
/* /*
* CreatesSimpleHashSetWithName creates a hash set that hashes its values using the * CreatesSimpleHashSetWithName creates a hash set that hashes its values using the
@ -126,7 +126,7 @@ extern HTAB * CreateSimpleHashWithNameAndSizeInternal(Size keysize, Size entrysi
* specifying its name and number of elements. * specifying its name and number of elements.
*/ */
#define CreateSimpleHashSetWithNameAndSize(keyType, name, size) \ #define CreateSimpleHashSetWithNameAndSize(keyType, name, size) \
CreateSimpleHashWithNameAndSize(keyType, keyType, name, size) CreateSimpleHashWithNameAndSize(keyType, keyType, name, size)
#endif #endif

View File

@ -112,8 +112,8 @@ extern List * PartitionTasklistResults(const char *resultIdPrefix, List *selectT
int partitionColumnIndex, int partitionColumnIndex,
CitusTableCacheEntry *distributionScheme, CitusTableCacheEntry *distributionScheme,
bool binaryFormat); bool binaryFormat);
extern char * QueryStringForFragmentsTransfer( extern char * QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *
NodeToNodeFragmentsTransfer *fragmentsTransfer); fragmentsTransfer);
extern void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, extern void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount,
Oid intervalTypeId, ArrayType **minValueArray, Oid intervalTypeId, ArrayType **minValueArray,
ArrayType **maxValueArray); ArrayType **maxValueArray);

View File

@ -51,10 +51,10 @@ typedef struct ListCellAndListWrapper
* var is NULL. * var is NULL.
*/ */
#define foreach_declared_ptr(var, l) \ #define foreach_declared_ptr(var, l) \
for (ListCell *(var ## CellDoNotUse) = list_head(l); \ for (ListCell *(var ## CellDoNotUse) = list_head(l); \
(var ## CellDoNotUse) != NULL && \ (var ## CellDoNotUse) != NULL && \
(((var) = lfirst(var ## CellDoNotUse)) || true); \ (((var) = lfirst(var ## CellDoNotUse)) || true); \
var ## CellDoNotUse = lnext(l, var ## CellDoNotUse)) var ## CellDoNotUse = lnext(l, var ## CellDoNotUse))
/* /*
@ -64,10 +64,10 @@ typedef struct ListCellAndListWrapper
* For explanation of how it works see foreach_declared_ptr. * For explanation of how it works see foreach_declared_ptr.
*/ */
#define foreach_declared_int(var, l) \ #define foreach_declared_int(var, l) \
for (ListCell *(var ## CellDoNotUse) = list_head(l); \ for (ListCell *(var ## CellDoNotUse) = list_head(l); \
(var ## CellDoNotUse) != NULL && \ (var ## CellDoNotUse) != NULL && \
(((var) = lfirst_int(var ## CellDoNotUse)) || true); \ (((var) = lfirst_int(var ## CellDoNotUse)) || true); \
var ## CellDoNotUse = lnext(l, var ## CellDoNotUse)) var ## CellDoNotUse = lnext(l, var ## CellDoNotUse))
/* /*
@ -77,10 +77,10 @@ typedef struct ListCellAndListWrapper
* For explanation of how it works see foreach_declared_ptr. * For explanation of how it works see foreach_declared_ptr.
*/ */
#define foreach_declared_oid(var, l) \ #define foreach_declared_oid(var, l) \
for (ListCell *(var ## CellDoNotUse) = list_head(l); \ for (ListCell *(var ## CellDoNotUse) = list_head(l); \
(var ## CellDoNotUse) != NULL && \ (var ## CellDoNotUse) != NULL && \
(((var) = lfirst_oid(var ## CellDoNotUse)) || true); \ (((var) = lfirst_oid(var ## CellDoNotUse)) || true); \
var ## CellDoNotUse = lnext(l, var ## CellDoNotUse)) var ## CellDoNotUse = lnext(l, var ## CellDoNotUse))
/* /*
* forboth_ptr - * forboth_ptr -
@ -89,15 +89,15 @@ typedef struct ListCellAndListWrapper
* variables to store the pointer of each of the two cells in. * variables to store the pointer of each of the two cells in.
*/ */
#define forboth_ptr(var1, l1, var2, l2) \ #define forboth_ptr(var1, l1, var2, l2) \
for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \ for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \
*(var2 ## CellDoNotUse) = list_head(l2); \ *(var2 ## CellDoNotUse) = list_head(l2); \
(var1 ## CellDoNotUse) != NULL && \ (var1 ## CellDoNotUse) != NULL && \
(var2 ## CellDoNotUse) != NULL && \ (var2 ## CellDoNotUse) != NULL && \
(((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \ (((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \
(((var2) = lfirst(var2 ## CellDoNotUse)) || true); \ (((var2) = lfirst(var2 ## CellDoNotUse)) || true); \
var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \ var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \
var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \ var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \
) )
/* /*
* forboth_ptr_oid - * forboth_ptr_oid -
@ -107,15 +107,15 @@ typedef struct ListCellAndListWrapper
* variables to store the pointer and the Oid of each of the two cells in. * variables to store the pointer and the Oid of each of the two cells in.
*/ */
#define forboth_ptr_oid(var1, l1, var2, l2) \ #define forboth_ptr_oid(var1, l1, var2, l2) \
for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \ for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \
*(var2 ## CellDoNotUse) = list_head(l2); \ *(var2 ## CellDoNotUse) = list_head(l2); \
(var1 ## CellDoNotUse) != NULL && \ (var1 ## CellDoNotUse) != NULL && \
(var2 ## CellDoNotUse) != NULL && \ (var2 ## CellDoNotUse) != NULL && \
(((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \ (((var1) = lfirst(var1 ## CellDoNotUse)) || true) && \
(((var2) = lfirst_oid(var2 ## CellDoNotUse)) || true); \ (((var2) = lfirst_oid(var2 ## CellDoNotUse)) || true); \
var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \ var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \
var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \ var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \
) )
/* /*
* forboth_int_oid - * forboth_int_oid -
@ -125,15 +125,15 @@ typedef struct ListCellAndListWrapper
* variables to store the int and the Oid of each of the two cells in. * variables to store the int and the Oid of each of the two cells in.
*/ */
#define forboth_int_oid(var1, l1, var2, l2) \ #define forboth_int_oid(var1, l1, var2, l2) \
for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \ for (ListCell *(var1 ## CellDoNotUse) = list_head(l1), \
*(var2 ## CellDoNotUse) = list_head(l2); \ *(var2 ## CellDoNotUse) = list_head(l2); \
(var1 ## CellDoNotUse) != NULL && \ (var1 ## CellDoNotUse) != NULL && \
(var2 ## CellDoNotUse) != NULL && \ (var2 ## CellDoNotUse) != NULL && \
(((var1) = lfirst_int(var1 ## CellDoNotUse)) || true) && \ (((var1) = lfirst_int(var1 ## CellDoNotUse)) || true) && \
(((var2) = lfirst_oid(var2 ## CellDoNotUse)) || true); \ (((var2) = lfirst_oid(var2 ## CellDoNotUse)) || true); \
var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \ var1 ## CellDoNotUse = lnext(l1, var1 ## CellDoNotUse), \
var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \ var2 ## CellDoNotUse = lnext(l2, var2 ## CellDoNotUse) \
) )
/* /*
* foreach_ptr_append - * foreach_ptr_append -
@ -157,10 +157,10 @@ typedef struct ListCellAndListWrapper
* - || true is used to always enter the loop even if var is NULL. * - || true is used to always enter the loop even if var is NULL.
*/ */
#define foreach_ptr_append(var, l) \ #define foreach_ptr_append(var, l) \
for (int var ## PositionDoNotUse = 0; \ for (int var ## PositionDoNotUse = 0; \
(var ## PositionDoNotUse) < list_length(l) && \ (var ## PositionDoNotUse) < list_length(l) && \
(((var) = list_nth(l, var ## PositionDoNotUse)) || true); \ (((var) = list_nth(l, var ## PositionDoNotUse)) || true); \
var ## PositionDoNotUse ++) var ## PositionDoNotUse++)
/* utility functions declaration shared within this module */ /* utility functions declaration shared within this module */
extern List * SortList(List *pointerList, extern List * SortList(List *pointerList,

View File

@ -23,10 +23,10 @@ extern bool IsLoggableLevel(int logLevel);
#undef ereport #undef ereport
#define ereport(elevel, rest) \ #define ereport(elevel, rest) \
do { \ do { \
int ereport_loglevel = elevel; \ int ereport_loglevel = elevel; \
(void) (ereport_loglevel); \ (void) (ereport_loglevel); \
ereport_domain(elevel, TEXTDOMAIN, rest); \ ereport_domain(elevel, TEXTDOMAIN, rest); \
} while (0) } while (0)
#endif /* LOG_UTILS_H */ #endif /* LOG_UTILS_H */

View File

@ -210,7 +210,8 @@ extern ShardPlacement * ShardPlacementForFunctionColocatedWithDistTable(
DistObjectCacheEntry *procedure, List *argumentList, Var *partitionColumn, DistObjectCacheEntry *procedure, List *argumentList, Var *partitionColumn,
CitusTableCacheEntry CitusTableCacheEntry
*cacheEntry, *cacheEntry,
PlannedStmt *plan); PlannedStmt *plan)
;
extern bool CitusHasBeenLoaded(void); extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel); extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel); extern bool CheckAvailableVersion(int elevel);

View File

@ -189,16 +189,16 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation" #define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation"
#define DELETE_ALL_TENANT_SCHEMAS "DELETE FROM pg_catalog.pg_dist_schema" #define DELETE_ALL_TENANT_SCHEMAS "DELETE FROM pg_catalog.pg_dist_schema"
#define WORKER_DROP_ALL_SHELL_TABLES \ #define WORKER_DROP_ALL_SHELL_TABLES \
"CALL pg_catalog.worker_drop_all_shell_tables(%s)" "CALL pg_catalog.worker_drop_all_shell_tables(%s)"
#define CITUS_INTERNAL_MARK_NODE_NOT_SYNCED \ #define CITUS_INTERNAL_MARK_NODE_NOT_SYNCED \
"SELECT citus_internal.mark_node_not_synced(%d, %d)" "SELECT citus_internal.mark_node_not_synced(%d, %d)"
#define REMOVE_ALL_CITUS_TABLES_COMMAND \ #define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition" "SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \ #define BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition" "SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \ #define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);" "SELECT pg_catalog.worker_drop_sequence_dependency(%s);"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'" #define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"
@ -206,20 +206,20 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define ENABLE_METADATA_SYNC "SET citus.enable_metadata_sync TO 'on'" #define ENABLE_METADATA_SYNC "SET citus.enable_metadata_sync TO 'on'"
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s,%s)" #define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s,%s)"
#define UPSERT_PLACEMENT \ #define UPSERT_PLACEMENT \
"INSERT INTO pg_dist_placement " \ "INSERT INTO pg_dist_placement " \
"(shardid, shardstate, shardlength, " \ "(shardid, shardstate, shardlength, " \
"groupid, placementid) " \ "groupid, placementid) " \
"VALUES (" UINT64_FORMAT ", 1, " UINT64_FORMAT \ "VALUES (" UINT64_FORMAT ", 1, " UINT64_FORMAT \
", %d, " UINT64_FORMAT \ ", %d, " UINT64_FORMAT \
") " \ ") " \
"ON CONFLICT (shardid, groupid) DO UPDATE SET " \ "ON CONFLICT (shardid, groupid) DO UPDATE SET " \
"shardstate = EXCLUDED.shardstate, " \ "shardstate = EXCLUDED.shardstate, " \
"shardlength = EXCLUDED.shardlength, " \ "shardlength = EXCLUDED.shardlength, " \
"placementid = EXCLUDED.placementid" "placementid = EXCLUDED.placementid"
#define METADATA_SYNC_CHANNEL "metadata_sync" #define METADATA_SYNC_CHANNEL "metadata_sync"
#define WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES \ #define WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES \
"SELECT pg_catalog.worker_adjust_identity_column_seq_ranges(%s)" "SELECT pg_catalog.worker_adjust_identity_column_seq_ranges(%s)"
/* controlled via GUC */ /* controlled via GUC */
extern char *EnableManualMetadataChangesForUser; extern char *EnableManualMetadataChangesForUser;

View File

@ -40,7 +40,7 @@
#define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)" #define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)"
#define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)" #define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)"
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ #define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
"worker_partitioned_relation_total_size(%s)" "worker_partitioned_relation_total_size(%s)"
#define SHARD_SIZES_COLUMN_COUNT (2) #define SHARD_SIZES_COLUMN_COUNT (2)
@ -302,12 +302,12 @@ typedef struct BackgroundTask
} BackgroundTask; } BackgroundTask;
#define SET_NULLABLE_FIELD(ptr, field, value) \ #define SET_NULLABLE_FIELD(ptr, field, value) \
(ptr)->__nullable_storage.field = (value); \ (ptr)->__nullable_storage.field = (value); \
(ptr)->field = &((ptr)->__nullable_storage.field) (ptr)->field = &((ptr)->__nullable_storage.field)
#define UNSET_NULLABLE_FIELD(ptr, field) \ #define UNSET_NULLABLE_FIELD(ptr, field) \
(ptr)->field = NULL; \ (ptr)->field = NULL; \
memset_struct_0((ptr)->__nullable_storage.field) memset_struct_0((ptr)->__nullable_storage.field)
/* Size functions */ /* Size functions */
extern Datum citus_table_size(PG_FUNCTION_ARGS); extern Datum citus_table_size(PG_FUNCTION_ARGS);

View File

@ -20,8 +20,8 @@
/* Adaptive executor repartitioning related defines */ /* Adaptive executor repartitioning related defines */
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);" #define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ", %s);"
#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \ #define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \
UINT64_FORMAT \ UINT64_FORMAT \
");" ");"
/* Enumeration that represents distributed executor types */ /* Enumeration that represents distributed executor types */

View File

@ -21,7 +21,8 @@ extern MultiConnection * StartPlacementConnection(uint32 flags,
struct ShardPlacement *placement, struct ShardPlacement *placement,
const char *userName); const char *userName);
extern MultiConnection * GetConnectionIfPlacementAccessedInXact(int flags, extern MultiConnection * GetConnectionIfPlacementAccessedInXact(int flags,
List *placementAccessList, List *placementAccessList
,
const char *userName); const char *userName);
extern MultiConnection * StartPlacementListConnection(uint32 flags, extern MultiConnection * StartPlacementListConnection(uint32 flags,
List *placementAccessList, List *placementAccessList,

View File

@ -26,7 +26,8 @@ extern int ValuesMaterializationThreshold;
extern bool CanPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit); extern bool CanPushdownSubquery(Query *subqueryTree, bool outerMostQueryHasLimit);
extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery, extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery,
PlannerRestrictionContext *plannerRestrictionContext); PlannerRestrictionContext *plannerRestrictionContext
);
extern bool JoinTreeContainsSubquery(Query *query); extern bool JoinTreeContainsSubquery(Query *query);
extern bool IsNodeSubquery(Node *node); extern bool IsNodeSubquery(Node *node);
extern bool HasEmptyJoinTree(Query *query); extern bool HasEmptyJoinTree(Query *query);
@ -37,12 +38,12 @@ extern MultiNode * SubqueryMultiNodeTree(Query *originalQuery,
Query *queryTree, Query *queryTree,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown( extern DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery
Query *originalQuery, ,
PlannerRestrictionContext PlannerRestrictionContext
* *
plannerRestrictionContext, plannerRestrictionContext,
bool plannerPhase); bool plannerPhase);
extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree, extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subqueryTree,
bool bool
outerMostQueryHasLimit); outerMostQueryHasLimit);

View File

@ -31,8 +31,8 @@ typedef struct RangeTblEntryIndex
Index rteIndex; Index rteIndex;
}RangeTblEntryIndex; }RangeTblEntryIndex;
extern PlannerRestrictionContext * GetPlannerRestrictionContext( extern PlannerRestrictionContext * GetPlannerRestrictionContext(RecursivePlanningContext *
RecursivePlanningContext *recursivePlanningContext); recursivePlanningContext);
extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext, plannerRestrictionContext,

View File

@ -41,10 +41,10 @@ extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry, extern List * GetRestrictInfoListForRelation(RangeTblEntry *rangeTblEntry,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern RelationRestriction * RelationRestrictionForRelation( extern RelationRestriction * RelationRestrictionForRelation(RangeTblEntry *rangeTableEntry
RangeTblEntry *rangeTableEntry, ,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext * extern JoinRestrictionContext * RemoveDuplicateJoinRestrictions(JoinRestrictionContext *
joinRestrictionContext); joinRestrictionContext);

View File

@ -20,12 +20,12 @@ extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelatio
Query * Query *
modifyQueryViaCoordinatorOrRepartition, modifyQueryViaCoordinatorOrRepartition,
char *resultIdPrefix); char *resultIdPrefix);
extern List * GenerateTaskListWithRedistributedResults( extern List * GenerateTaskListWithRedistributedResults(Query *
Query *modifyQueryViaCoordinatorOrRepartition, modifyQueryViaCoordinatorOrRepartition,
CitusTableCacheEntry * CitusTableCacheEntry *
targetRelation, targetRelation,
List **redistributedResults, List **redistributedResults,
bool useBinaryFormat); bool useBinaryFormat);
extern bool IsSupportedRedistributionTarget(Oid targetRelationId); extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
extern bool IsRedistributablePlan(Plan *selectPlan); extern bool IsRedistributablePlan(Plan *selectPlan);
extern bool HasMergeNotMatchedBySource(Query *query); extern bool HasMergeNotMatchedBySource(Query *query);

View File

@ -63,87 +63,87 @@ typedef enum CitusOperations
/* reuse advisory lock, but with different, unused field 4 (4)*/ /* reuse advisory lock, but with different, unused field 4 (4)*/
#define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \ #define SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, db, shardid) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
db, \ db, \
(uint32) ((shardid) >> 32), \ (uint32) ((shardid) >> 32), \
(uint32) (shardid), \ (uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA) ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA)
#define SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, db, colocationId, \ #define SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, db, colocationId, \
shardIntervalIndex) \ shardIntervalIndex) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
db, \ db, \
(uint32) shardIntervalIndex, \ (uint32) shardIntervalIndex, \
(uint32) colocationId, \ (uint32) colocationId, \
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA) ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA)
/* reuse advisory lock, but with different, unused field 4 (5)*/ /* reuse advisory lock, but with different, unused field 4 (5)*/
#define SET_LOCKTAG_SHARD_RESOURCE(tag, db, shardid) \ #define SET_LOCKTAG_SHARD_RESOURCE(tag, db, shardid) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
db, \ db, \
(uint32) ((shardid) >> 32), \ (uint32) ((shardid) >> 32), \
(uint32) (shardid), \ (uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD) ADV_LOCKTAG_CLASS_CITUS_SHARD)
/* advisory lock for citus shard move/copy operations, /* advisory lock for citus shard move/copy operations,
* also it has the database hardcoded to MyDatabaseId, * also it has the database hardcoded to MyDatabaseId,
* to ensure the locks are local to each database */ * to ensure the locks are local to each database */
#define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \ #define SET_LOCKTAG_SHARD_MOVE(tag, shardid) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) ((shardid) >> 32), \ (uint32) ((shardid) >> 32), \
(uint32) (shardid), \ (uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE) ADV_LOCKTAG_CLASS_CITUS_SHARD_MOVE)
/* reuse advisory lock, but with different, unused field 4 (7) /* reuse advisory lock, but with different, unused field 4 (7)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks * Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */ * are local to each database */
#define SET_LOCKTAG_REBALANCE_COLOCATION(tag, colocationOrTableId) \ #define SET_LOCKTAG_REBALANCE_COLOCATION(tag, colocationOrTableId) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) ((colocationOrTableId) >> 32), \ (uint32) ((colocationOrTableId) >> 32), \
(uint32) (colocationOrTableId), \ (uint32) (colocationOrTableId), \
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION) ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION)
/* reuse advisory lock, but with different, unused field 4 (13) /* reuse advisory lock, but with different, unused field 4 (13)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks * Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */ * are local to each database */
#define SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, colocationOrTableId) \ #define SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, colocationOrTableId) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) ((colocationOrTableId) >> 32), \ (uint32) ((colocationOrTableId) >> 32), \
(uint32) (colocationOrTableId), \ (uint32) (colocationOrTableId), \
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION) ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION)
/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId, /* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId,
* to ensure the locks are local to each database */ * to ensure the locks are local to each database */
#define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \ #define SET_LOCKTAG_CITUS_OPERATION(tag, operationId) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) 0, \ (uint32) 0, \
(uint32) operationId, \ (uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS) ADV_LOCKTAG_CLASS_CITUS_OPERATIONS)
/* reuse advisory lock, but with different, unused field 4 (10) /* reuse advisory lock, but with different, unused field 4 (10)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks * Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */ * are local to each database */
#define SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId) \ #define SET_LOCKTAG_CLEANUP_OPERATION_ID(tag, operationId) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) ((operationId) >> 32), \ (uint32) ((operationId) >> 32), \
(uint32) operationId, \ (uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID) ADV_LOCKTAG_CLASS_CITUS_CLEANUP_OPERATION_ID)
/* reuse advisory lock, but with different, unused field 4 (14) /* reuse advisory lock, but with different, unused field 4 (14)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks * Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */ * are local to each database */
#define SET_LOCKTAG_BACKGROUND_TASK(tag, taskId) \ #define SET_LOCKTAG_BACKGROUND_TASK(tag, taskId) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \ MyDatabaseId, \
(uint32) ((taskId) >> 32), \ (uint32) ((taskId) >> 32), \
(uint32) (taskId), \ (uint32) (taskId), \
ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK) ADV_LOCKTAG_CLASS_CITUS_BACKGROUND_TASK)
/* /*
* IsNodeWideObjectClass returns true if the given object class is node-wide, * IsNodeWideObjectClass returns true if the given object class is node-wide,
@ -171,7 +171,8 @@ IsNodeWideObjectClass(ObjectClass objectClass)
* this assertion check based on latest supported major Postgres version. * this assertion check based on latest supported major Postgres version.
*/ */
StaticAssertStmt(PG_MAJORVERSION_NUM <= 18, StaticAssertStmt(PG_MAJORVERSION_NUM <= 18,
"better to check if any of newly added ObjectClass'es are node-wide"); "better to check if any of newly added ObjectClass'es are node-wide")
;
switch (objectClass) switch (objectClass)
{ {
@ -182,12 +183,14 @@ IsNodeWideObjectClass(ObjectClass objectClass)
#if PG_VERSION_NUM >= PG_VERSION_16 #if PG_VERSION_NUM >= PG_VERSION_16
case OCLASS_ROLE_MEMBERSHIP: case OCLASS_ROLE_MEMBERSHIP:
#endif #endif
{ {
return true; return true;
} }
default: default:
{
return false; return false;
}
} }
} }
@ -202,12 +205,12 @@ IsNodeWideObjectClass(ObjectClass objectClass)
* not node-wide, and global if it is. * not node-wide, and global if it is.
*/ */
#define SET_LOCKTAG_GLOBAL_DDL_SERIALIZATION(tag, objectClass, oid) \ #define SET_LOCKTAG_GLOBAL_DDL_SERIALIZATION(tag, objectClass, oid) \
SET_LOCKTAG_ADVISORY(tag, \ SET_LOCKTAG_ADVISORY(tag, \
(uint32) (IsNodeWideObjectClass(objectClass) ? InvalidOid : \ (uint32) (IsNodeWideObjectClass(objectClass) ? InvalidOid : \
MyDatabaseId), \ MyDatabaseId), \
(uint32) objectClass, \ (uint32) objectClass, \
(uint32) oid, \ (uint32) oid, \
ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION) ADV_LOCKTAG_CLASS_CITUS_GLOBAL_DDL_SERIALIZATION)
/* /*
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations * DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations

Some files were not shown because too many files have changed in this diff Show More