Merge branch 'main' into niupre/DeferredDropAndCleanup

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-30 16:58:12 -07:00
commit 231b8ac719
320 changed files with 44879 additions and 6911 deletions

View File

@ -6,16 +6,22 @@ orbs:
parameters:
image_suffix:
type: string
default: '-v5579d00'
default: '-v643b0b7'
pg13_version:
type: string
default: '13.4'
default: '13.8'
pg14_version:
type: string
default: '14.0'
default: '14.5'
pg15_version:
type: string
default: '15beta3'
upgrade_pg_versions:
type: string
default: '13.4-14.0'
default: '13.8-14.5-15beta3'
style_checker_tools_version:
type: string
default: '0.8.18'
jobs:
build:
description: Build the citus extension
@ -46,7 +52,7 @@ jobs:
check-style:
docker:
- image: 'citus/stylechecker:latest'
- image: 'citus/stylechecker:<< pipeline.parameters.style_checker_tools_version >><< pipeline.parameters.image_suffix >>'
steps:
- checkout
- run:
@ -111,7 +117,6 @@ jobs:
image_tag:
description: 'docker image tag to use'
type: string
default: 12-13
docker:
- image: '<< parameters.image >>:<< parameters.image_tag >><< pipeline.parameters.image_suffix >>'
working_directory: /home/circleci/project
@ -192,7 +197,6 @@ jobs:
image_tag:
description: 'docker image tag to use'
type: string
default: 12-13
docker:
- image: '<< parameters.image >>:<< parameters.image_tag >><< pipeline.parameters.image_suffix >>'
resource_class: xlarge
@ -409,6 +413,15 @@ jobs:
- store_artifacts:
name: 'Save core dumps'
path: /tmp/core_dumps
- store_artifacts:
name: 'Save coordinator log'
path: src/test/regress/tmp_check/master/log
- store_artifacts:
name: 'Save worker1 log'
path: src/test/regress/tmp_check/worker.57637/log
- store_artifacts:
name: 'Save worker2 log'
path: src/test/regress/tmp_check/worker.57638/log
- codecov/upload:
flags: 'test_<< parameters.pg_major >>,<< parameters.make >>'
when: always
@ -528,6 +541,10 @@ workflows:
name: build-14
pg_major: 14
image_tag: '<< pipeline.parameters.pg14_version >>'
- build:
name: build-15
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
- check-style
- check-sql-snapshots
@ -767,6 +784,123 @@ workflows:
make: check-failure
requires: [build-14]
- test-citus:
name: 'test-15_check-split'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-split
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise-isolation'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise-isolation
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise-isolation-logicalrep-1'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise-isolation-logicalrep-1
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise-isolation-logicalrep-2'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise-isolation-logicalrep-2
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise-isolation-logicalrep-3'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise-isolation-logicalrep-3
requires: [build-15]
- test-citus:
name: 'test-15_check-enterprise-failure'
pg_major: 15
image: citus/failtester
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-enterprise-failure
requires: [build-15]
- test-citus:
name: 'test-15_check-multi'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-multi
requires: [build-15]
- test-citus:
name: 'test-15_check-multi-1'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-multi-1
requires: [build-15]
- test-citus:
name: 'test-15_check-mx'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-multi-mx
requires: [build-15]
- test-citus:
name: 'test-15_check-vanilla'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-vanilla
requires: [build-15]
- test-citus:
name: 'test-15_check-isolation'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-isolation
requires: [build-15]
- test-citus:
name: 'test-15_check-operations'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-operations
requires: [build-15]
- test-citus:
name: 'test-15_check-follower-cluster'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-follower-cluster
requires: [build-15]
- test-citus:
name: 'test-15_check-columnar'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-columnar
requires: [build-15]
- test-citus:
name: 'test-15_check-columnar-isolation'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-columnar-isolation
requires: [build-15]
- tap-test-citus:
name: 'test-15_tap-recovery'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
suite: recovery
requires: [build-15]
- tap-test-citus:
name: 'test-15_tap-columnar-freezing'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
suite: columnar_freezing
requires: [build-15]
- test-citus:
name: 'test-15_check-failure'
pg_major: 15
image: citus/failtester
image_tag: '<< pipeline.parameters.pg15_version >>'
make: check-failure
requires: [build-15]
- test-arbitrary-configs:
name: 'test-13_check-arbitrary-configs'
pg_major: 13
@ -777,6 +911,11 @@ workflows:
pg_major: 14
image_tag: '<< pipeline.parameters.pg14_version >>'
requires: [build-14]
- test-arbitrary-configs:
name: 'test-15_check-arbitrary-configs'
pg_major: 15
image_tag: '<< pipeline.parameters.pg15_version >>'
requires: [build-15]
- test-pg-upgrade:
name: 'test-13-14_check-pg-upgrade'
@ -785,6 +924,13 @@ workflows:
image_tag: '<< pipeline.parameters.upgrade_pg_versions >>'
requires: [build-13, build-14]
- test-pg-upgrade:
name: 'test-14-15_check-pg-upgrade'
old_pg_major: 14
new_pg_major: 15
image_tag: '<< pipeline.parameters.upgrade_pg_versions >>'
requires: [build-14, build-15]
- test-citus-upgrade:
name: test-13_check-citus-upgrade
pg_major: 13

1
.gitattributes vendored
View File

@ -27,6 +27,7 @@ configure -whitespace
src/backend/distributed/utils/citus_outfuncs.c -citus-style
src/backend/distributed/deparser/ruleutils_13.c -citus-style
src/backend/distributed/deparser/ruleutils_14.c -citus-style
src/backend/distributed/deparser/ruleutils_15.c -citus-style
src/backend/distributed/commands/index_pg_source.c -citus-style
src/include/distributed/citus_nodes.h -citus-style

View File

@ -1,3 +1,36 @@
### citus v10.2.8 (August 19, 2022) ###
* Fixes compilation warning caused by latest upgrade script changes
* Fixes compilation warning on PG13 + OpenSSL 3.0
### citus v11.0.6 (August 19, 2022) ###
* Fixes a bug that could cause failures in `CREATE ROLE` statement
* Fixes a bug that could cause failures in `create_distributed_table`
* Fixes a bug that prevents distributing tables that depend on sequences
* Fixes reference table lock contention
* Fixes upgrade paths for 11.0
### citus v10.2.7 (August 19, 2022) ###
* Fixes a bug that could cause failures in `INSERT INTO .. SELECT`
* Fixes a bug that could cause leaking files when materialized views are
refreshed
* Fixes an unexpected error for foreign tables when upgrading Postgres
* Fixes columnar freezing/wraparound bug
* Fixes reference table lock contention
* Prevents alter table functions from dropping extensions
### citus v11.0.5 (August 1, 2022) ###
* Avoids possible information leakage about existing users

View File

@ -13,6 +13,17 @@ why we ask this as well as instructions for how to proceed, see the
### Getting and building
[PostgreSQL documentation](https://www.postgresql.org/support/versioning/) has a
section on upgrade policy.
We always recommend that all users run the latest available minor release [for PostgreSQL] for whatever major version is in use.
We expect Citus users to honor this recommendation and use latest available
PostgreSQL minor release. Failure to do so may result in failures in our test
suite. There are some known improvements in PG test architecture such as
[this commit](https://github.com/postgres/postgres/commit/3f323956128ff8589ce4d3a14e8b950837831803)
that are missing in earlier minor versions.
#### Mac
1. Install Xcode

2
configure vendored
View File

@ -2588,7 +2588,7 @@ fi
if test "$with_pg_version_check" = no; then
{ $as_echo "$as_me:${as_lineno-$LINENO}: building against PostgreSQL $version_num (skipped compatibility check)" >&5
$as_echo "$as_me: building against PostgreSQL $version_num (skipped compatibility check)" >&6;}
elif test "$version_num" != '13' -a "$version_num" != '14'; then
elif test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15'; then
as_fn_error $? "Citus is not compatible with the detected PostgreSQL version ${version_num}." "$LINENO" 5
else
{ $as_echo "$as_me:${as_lineno-$LINENO}: building against PostgreSQL $version_num" >&5

View File

@ -80,7 +80,7 @@ AC_SUBST(with_pg_version_check)
if test "$with_pg_version_check" = no; then
AC_MSG_NOTICE([building against PostgreSQL $version_num (skipped compatibility check)])
elif test "$version_num" != '13' -a "$version_num" != '14'; then
elif test "$version_num" != '13' -a "$version_num" != '14' -a "$version_num" != '15'; then
AC_MSG_ERROR([Citus is not compatible with the detected PostgreSQL version ${version_num}.])
else
AC_MSG_NOTICE([building against PostgreSQL $version_num])

View File

@ -640,6 +640,8 @@ ConvertTable(TableConversionState *con)
Oid partitionRelationId = InvalidOid;
foreach_oid(partitionRelationId, partitionList)
{
char *tableQualifiedName = generate_qualified_relation_name(
partitionRelationId);
char *detachPartitionCommand = GenerateDetachPartitionCommand(
partitionRelationId);
char *attachPartitionCommand = GenerateAlterTableAttachPartitionCommand(
@ -685,6 +687,19 @@ ConvertTable(TableConversionState *con)
foreignKeyCommands = list_concat(foreignKeyCommands,
partitionReturn->foreignKeyCommands);
}
/*
* If we are altering a partitioned distributed table by
* colocateWith:none, we override con->colocationWith parameter
* with the first newly created partition table to share the
* same colocation group for rest of partitions and partitioned
* table.
*/
if (con->colocateWith != NULL && IsColocateWithNone(con->colocateWith))
{
con->colocateWith = tableQualifiedName;
}
}
}

View File

@ -63,15 +63,53 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
const char *collname = NameStr(collationForm->collname);
bool collisdeterministic = collationForm->collisdeterministic;
char *collcollate;
char *collctype;
#if PG_VERSION_NUM >= PG_VERSION_15
/*
* In PG15, there is an added option to use ICU as global locale provider.
* pg_collation has three locale-related fields: collcollate and collctype,
* which are libc-related fields, and a new one colliculocale, which is the
* ICU-related field. Only the libc-related fields or the ICU-related field
* is set, never both.
*/
char *colliculocale;
bool isnull;
Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate,
&isnull);
Assert(!isnull);
char *collcollate = TextDatumGetCString(datum);
if (!isnull)
{
collcollate = TextDatumGetCString(datum);
}
else
{
collcollate = NULL;
}
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collctype, &isnull);
Assert(!isnull);
char *collctype = TextDatumGetCString(datum);
if (!isnull)
{
collctype = TextDatumGetCString(datum);
}
else
{
collctype = NULL;
}
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_colliculocale, &isnull);
if (!isnull)
{
colliculocale = TextDatumGetCString(datum);
}
else
{
colliculocale = NULL;
}
AssertArg((collcollate && collctype) || colliculocale);
#else
/*
@ -79,8 +117,8 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
* pstrdup() to match the interface of 15 so that we consistently free the
* result later.
*/
char *collcollate = pstrdup(NameStr(collationForm->collcollate));
char *collctype = pstrdup(NameStr(collationForm->collctype));
collcollate = pstrdup(NameStr(collationForm->collcollate));
collctype = pstrdup(NameStr(collationForm->collctype));
#endif
if (collowner != NULL)
@ -106,6 +144,33 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
"CREATE COLLATION %s (provider = '%s'",
*quotedCollationName, providerString);
#if PG_VERSION_NUM >= PG_VERSION_15
if (colliculocale)
{
appendStringInfo(&collationNameDef,
", locale = %s",
quote_literal_cstr(colliculocale));
pfree(colliculocale);
}
else
{
if (strcmp(collcollate, collctype) == 0)
{
appendStringInfo(&collationNameDef,
", locale = %s",
quote_literal_cstr(collcollate));
}
else
{
appendStringInfo(&collationNameDef,
", lc_collate = %s, lc_ctype = %s",
quote_literal_cstr(collcollate),
quote_literal_cstr(collctype));
}
pfree(collcollate);
pfree(collctype);
}
#else
if (strcmp(collcollate, collctype) == 0)
{
appendStringInfo(&collationNameDef,
@ -122,6 +187,7 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
pfree(collcollate);
pfree(collctype);
#endif
if (!collisdeterministic)
{

View File

@ -42,6 +42,7 @@
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/distribution_column.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
@ -59,12 +60,16 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/repair_shards.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/shard_split.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "distributed/worker_transaction.h"
#include "distributed/utils/distribution_column_map.h"
#include "distributed/version_compat.h"
#include "executor/executor.h"
#include "executor/spi.h"
@ -76,6 +81,7 @@
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "parser/parser.h"
#include "postmaster/postmaster.h"
#include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
@ -93,8 +99,18 @@
#define LOG_PER_TUPLE_AMOUNT 1000000
/* local function forward declarations */
static void CreateDistributedTableConcurrently(Oid relationId,
char *distributionColumnName,
char distributionMethod,
char *colocateWithTableName,
int shardCount,
bool shardCountIsStrict);
static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName,
bool viaDeprecatedAPI);
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
@ -105,9 +121,6 @@ static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
char distributionMethod, uint32 colocationId,
char replicationModel, bool viaDeprecatedAPI);
static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid sourceRelationId);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureRelationHasNoTriggers(Oid relationId);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
@ -117,6 +130,7 @@ static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMe
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod, bool
viaDeprecatedAPI);
static void EnsureCitusTableCanBeCreated(Oid relationOid);
static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId);
static void EnsureDistributedSequencesHaveOneType(Oid relationId,
List *seqInfoList);
static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,
@ -134,9 +148,17 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
EState *estate);
static void ErrorIfTemporaryTable(Oid relationId);
static void ErrorIfForeignTable(Oid relationOid);
static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId);
static void EnsureDistributableTable(Oid relationId);
static void EnsureForeignKeysForDistributedTableConcurrently(Oid relationId);
static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
char *distributionColumnName,
char *colocateWithTableName);
static void WarnIfTableHaveNoReplicaIdentity(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table_concurrently);
PG_FUNCTION_INFO_V1(create_distributed_table);
PG_FUNCTION_INFO_V1(create_reference_table);
@ -254,6 +276,589 @@ create_distributed_table(PG_FUNCTION_ARGS)
}
/*
* create_distributed_concurrently gets a table name, distribution column,
* distribution method and colocate_with option, then it creates a
* distributed table.
*/
Datum
create_distributed_table_concurrently(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Oid distributionMethodOid = PG_GETARG_OID(2);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}
shardCount = PG_GETARG_INT32(4);
/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}
CreateDistributedTableConcurrently(relationId, distributionColumnName,
distributionMethod,
colocateWithTableName,
shardCount,
shardCountIsStrict);
PG_RETURN_VOID();
}
/*
* CreateDistributedTableConcurrently distributes a table by first converting
* it to a Citus local table and then splitting the shard of the Citus local
* table.
*
* If anything goes wrong during the second phase, the table is left as a
* Citus local table.
*/
static void
CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
char distributionMethod,
char *colocateWithTableName,
int shardCount,
bool shardCountIsStrict)
{
/*
* We disallow create_distributed_table_concurrently in transaction blocks
* because we cannot handle preceding writes, and we block writes at the
* very end of the operation so the transaction should end immediately after.
*/
PreventInTransactionBlock(true, "create_distributed_table_concurrently");
/*
* do not allow multiple create_distributed_table_concurrently in the same
* transaction. We should do that check just here because concurrent local table
* conversion can cause issues.
*/
ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction();
/* do not allow concurrent CreateDistributedTableConcurrently operations */
AcquireCreateDistributedTableConcurrentlyLock(relationId);
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("only hash-distributed tables can be distributed "
"without blocking writes")));
}
if (ShardReplicationFactor > 1)
{
ereport(ERROR, (errmsg("cannot distribute a table concurrently when "
"citus.shard_replication_factor > 1")));
}
EnsureCoordinatorIsInMetadata();
EnsureCitusTableCanBeCreated(relationId);
EnsureValidDistributionColumn(relationId, distributionColumnName);
/*
* Ensure table type is valid to be distributed. It should be either regular or citus local table.
*/
EnsureDistributableTable(relationId);
/*
* we rely on citus_add_local_table_to_metadata, so it can generate irrelevant messages.
* we want to error with a user friendly message if foreign keys are not supported.
* We can miss foreign key violations because we are not holding locks, so relation
* can be modified until we acquire the lock for the relation, but we do as much as we can
* to be user friendly on foreign key violation messages.
*/
EnsureForeignKeysForDistributedTableConcurrently(relationId);
bool viaDeprecatedAPI = false;
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
viaDeprecatedAPI);
/*
* we fail transaction before local table conversion if the table could not be colocated with
* given table. We should make those checks after local table conversion by acquiring locks to
* the relation because the distribution column can be modified in that period.
*/
if (!IsColocateWithDefault(colocateWithTableName) && !IsColocateWithNone(
colocateWithTableName))
{
EnsureColocateWithTableIsValid(relationId, distributionMethod,
distributionColumnName,
colocateWithTableName);
}
/*
* Get name of the table before possibly replacing it in
* citus_add_local_table_to_metadata.
*/
char *tableName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1);
/* If table is a regular table, then we need to add it into metadata. */
if (!IsCitusTable(relationId))
{
/*
* Before taking locks, convert the table into a Citus local table and commit
* to allow shard split to see the shard.
*/
SendAddLocalTableToMetadataCommandOutsideTransaction(relationId);
}
/*
* Lock target relation with a shard update exclusive lock to
* block DDL, but not writes.
*
* If there was a concurrent drop/rename, error out by setting missingOK = false.
*/
bool missingOK = false;
relationId = RangeVarGetRelid(rangeVar, ShareUpdateExclusiveLock, missingOK);
if (PartitionedTableNoLock(relationId))
{
/* also lock partitions */
LockPartitionRelations(relationId, ShareUpdateExclusiveLock);
}
WarnIfTableHaveNoReplicaIdentity(relationId);
List *shardList = LoadShardIntervalList(relationId);
/*
* It's technically possible for the table to have been concurrently
* distributed just after citus_add_local_table_to_metadata and just
* before acquiring the lock, so double check.
*/
if (list_length(shardList) != 1 ||
!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table was concurrently modified")));
}
/*
* The table currently has one shard, we will split that shard to match the
* target distribution.
*/
ShardInterval *shardToSplit = (ShardInterval *) linitial(shardList);
PropagatePrerequisiteObjectsForDistributedTable(relationId);
/*
* we should re-evaluate distribution column values. It may have changed,
* because we did not lock the relation at the previous check before local
* table conversion.
*/
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
NoLock);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = distributionColumn->varcollid;
/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(colocateWithTableName))
{
AcquireColocationDefaultLock();
}
/*
* At this stage, we only want to check for an existing co-location group.
* We cannot create a new co-location group until after replication slot
* creation in NonBlockingShardSplit.
*/
uint32 colocationId = FindColocateWithColocationId(relationId,
replicationModel,
distributionColumnType,
distributionColumnCollation,
shardCount,
shardCountIsStrict,
colocateWithTableName);
if (IsColocateWithDefault(colocateWithTableName) && (colocationId !=
INVALID_COLOCATION_ID))
{
/*
* we can release advisory lock if there is already a default entry for given params;
* else, we should keep it to prevent different default coloc entry creation by
* concurrent operations.
*/
ReleaseColocationDefaultLock();
}
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
colocationId, replicationModel, viaDeprecatedAPI);
Oid colocatedTableId = InvalidOid;
if (colocationId != INVALID_COLOCATION_ID)
{
colocatedTableId = ColocatedTableId(colocationId);
}
List *workersForPlacementList;
List *shardSplitPointsList;
if (colocatedTableId != InvalidOid)
{
List *colocatedShardList = LoadShardIntervalList(colocatedTableId);
/*
* Match the shard ranges of an existing table.
*/
shardSplitPointsList = HashSplitPointsForShardList(colocatedShardList);
/*
* Find the node IDs of the shard placements.
*/
workersForPlacementList = WorkerNodesForShardList(colocatedShardList);
}
else
{
/*
* Generate a new set of #shardCount shards.
*/
shardSplitPointsList = HashSplitPointsForShardCount(shardCount);
/*
* Place shards in a round-robin fashion across all data nodes.
*/
List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);
}
/*
* Make sure that existing reference tables have been replicated to all the nodes
* such that we can create foreign keys and joins work immediately after creation.
* We do this after applying all essential checks to error out early in case of
* user error.
*/
EnsureReferenceTablesExistOnAllNodes();
/*
* At this point, the table is a Citus local table, which means it does
* not have a partition column in the metadata. However, we cannot update
* the metadata here because that would prevent us from creating a replication
* slot to copy ongoing changes. Instead, we pass a hash that maps relation
* IDs to partition column vars.
*/
DistributionColumnMap *distributionColumnOverrides = CreateDistributionColumnMap();
AddDistributionColumnForRelation(distributionColumnOverrides, relationId,
distributionColumnName);
/*
* there is no colocation entries yet for local table, so we should
* check if table has any partition and add them to same colocation
* group
*/
List *sourceColocatedShardIntervalList = ListShardsUnderParentRelation(relationId);
SplitMode splitMode = NON_BLOCKING_SPLIT;
SplitOperation splitOperation = CREATE_DISTRIBUTED_TABLE;
SplitShard(
splitMode,
splitOperation,
shardToSplit->shardId,
shardSplitPointsList,
workersForPlacementList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
colocationId
);
}
/*
* EnsureForeignKeysForDistributedTableConcurrently ensures that referenced and referencing foreign
* keys for the given table are supported.
*
* We allow distributed -> reference
* distributed -> citus local
*
* We disallow reference -> distributed
* citus local -> distributed
* regular -> distributed
*
* Normally regular -> distributed is allowed but it is not allowed when we create the
* distributed table concurrently because we rely on conversion of regular table to citus local table,
* which errors with an unfriendly message.
*/
static void
EnsureForeignKeysForDistributedTableConcurrently(Oid relationId)
{
/*
* disallow citus local -> distributed fkeys.
* disallow reference -> distributed fkeys.
* disallow regular -> distributed fkeys.
*/
EnsureNoFKeyFromTableType(relationId, INCLUDE_CITUS_LOCAL_TABLES |
INCLUDE_REFERENCE_TABLES | INCLUDE_LOCAL_TABLES);
/*
* disallow distributed -> regular fkeys.
*/
EnsureNoFKeyToTableType(relationId, INCLUDE_LOCAL_TABLES);
}
/*
* EnsureColocateWithTableIsValid ensures given relation can be colocated with the table of given name.
*/
static void
EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
char *distributionColumnName, char *colocateWithTableName)
{
bool viaDeprecatedAPI = false;
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
viaDeprecatedAPI);
/*
* we fail transaction before local table conversion if the table could not be colocated with
* given table. We should make those checks after local table conversion by acquiring locks to
* the relation because the distribution column can be modified in that period.
*/
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId,
distributionColumnName);
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false);
EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, colocateWithTableId);
}
/*
* AcquireCreateDistributedTableConcurrentlyLock does not allow concurrent create_distributed_table_concurrently
* operations.
*/
void
AcquireCreateDistributedTableConcurrentlyLock(Oid relationId)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = true;
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY);
LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock,
dontWait);
if (!lockAcquired)
{
ereport(ERROR, (errmsg("another create_distributed_table_concurrently "
"operation is in progress"),
errhint("Make sure that the concurrent operation has "
"finished and re-run the command")));
}
}
/*
* SendAddLocalTableToMetadataCommandOutsideTransaction executes metadata add local
* table command locally to avoid deadlock.
*/
static void
SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId)
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
/*
* we need to allow nested distributed execution, because we start a new distributed
* execution inside the pushed-down UDF citus_add_local_table_to_metadata. Normally
* citus does not allow that because it cannot guarantee correctness.
*/
StringInfo allowNestedDistributionCommand = makeStringInfo();
appendStringInfo(allowNestedDistributionCommand,
"SET LOCAL citus.allow_nested_distributed_execution to ON");
StringInfo addLocalTableToMetadataCommand = makeStringInfo();
appendStringInfo(addLocalTableToMetadataCommand,
"SELECT pg_catalog.citus_add_local_table_to_metadata(%s)",
quote_literal_cstr(qualifiedRelationName));
List *commands = list_make2(allowNestedDistributionCommand->data,
addLocalTableToMetadataCommand->data);
char *username = NULL;
SendCommandListToWorkerOutsideTransaction(LocalHostName, PostPortNumber, username,
commands);
}
/*
* WarnIfTableHaveNoReplicaIdentity notices user if the given table or its partitions (if any)
* do not have a replica identity which is required for logical replication to replicate
* UPDATE and DELETE commands during create_distributed_table_concurrently.
*/
void
WarnIfTableHaveNoReplicaIdentity(Oid relationId)
{
bool foundRelationWithNoReplicaIdentity = false;
/*
* Check for source relation's partitions if any. We do not need to check for the source relation
* because we can replicate partitioned table even if it does not have replica identity.
* Source table will have no data if it has partitions.
*/
if (PartitionedTable(relationId))
{
List *partitionList = PartitionList(relationId);
ListCell *partitionCell = NULL;
foreach(partitionCell, partitionList)
{
Oid partitionTableId = lfirst_oid(partitionCell);
if (!RelationCanPublishAllModifications(partitionTableId))
{
foundRelationWithNoReplicaIdentity = true;
break;
}
}
}
/* check for source relation if it is not partitioned */
else
{
if (!RelationCanPublishAllModifications(relationId))
{
foundRelationWithNoReplicaIdentity = true;
}
}
if (foundRelationWithNoReplicaIdentity)
{
char *relationName = get_rel_name(relationId);
ereport(NOTICE, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("relation %s does not have a REPLICA "
"IDENTITY or PRIMARY KEY", relationName),
errdetail("UPDATE and DELETE commands on the relation will "
"error out during create_distributed_table_concurrently unless "
"there is a REPLICA IDENTITY or PRIMARY KEY. "
"INSERT commands will still work.")));
}
}
/*
* HashSplitPointsForShardList returns a list of split points which match
* the shard ranges of the given list of shards;
*/
static List *
HashSplitPointsForShardList(List *shardList)
{
List *splitPointList = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardList)
{
int32 shardMaxValue = DatumGetInt32(shardInterval->maxValue);
splitPointList = lappend_int(splitPointList, shardMaxValue);
}
/*
* Split point lists only include the upper boundaries.
*/
splitPointList = list_delete_last(splitPointList);
return splitPointList;
}
/*
* HashSplitPointsForShardCount returns a list of split points for a given
* shard count with roughly equal hash ranges.
*/
static List *
HashSplitPointsForShardCount(int shardCount)
{
List *splitPointList = NIL;
/* calculate the split of the hash space */
uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
/*
* Split points lists only include the upper boundaries, so we only
* go up to shardCount - 1 and do not have to apply the correction
* for the last shardmaxvalue.
*/
for (int64 shardIndex = 0; shardIndex < shardCount - 1; shardIndex++)
{
/* initialize the hash token space for this shard */
int32 shardMinValue = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxValue = shardMinValue + (hashTokenIncrement - 1);
splitPointList = lappend_int(splitPointList, shardMaxValue);
}
return splitPointList;
}
/*
* WorkerNodesForShardList returns a list of node ids reflecting the locations of
* the given list of shards.
*/
static List *
WorkerNodesForShardList(List *shardList)
{
List *nodeIdList = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardList)
{
WorkerNode *workerNode = ActiveShardPlacementWorkerNode(shardInterval->shardId);
nodeIdList = lappend_int(nodeIdList, workerNode->nodeId);
}
return nodeIdList;
}
/*
* RoundRobinWorkerNodeList round robins over the workers in the worker node list
* and adds node ids to a list of length listLength.
*/
static List *
RoundRobinWorkerNodeList(List *workerNodeList, int listLength)
{
List *nodeIdList = NIL;
for (int idx = 0; idx < listLength; idx++)
{
int nodeIdx = idx % list_length(workerNodeList);
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, nodeIdx);
nodeIdList = lappend_int(nodeIdList, workerNode->nodeId);
}
return nodeIdList;
}
/*
* create_reference_table creates a distributed table with the given relationId. The
* created table has one shard and replication factor is set to the active worker
@ -394,7 +999,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
INCLUDE_ALL_TABLE_TYPES);
relationId = DropFKeysAndUndistributeTable(relationId);
}
/*
* To support foreign keys between reference tables and local tables,
* we drop & re-define foreign keys at the end of this function so
@ -431,21 +1035,9 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
LockRelationOid(relationId, ExclusiveLock);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
EnsureRelationHasCompatibleSequenceTypes(relationId);
EnsureTableNotDistributed(relationId);
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
PropagatePrerequisiteObjectsForDistributedTable(relationId);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
@ -453,7 +1045,7 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
ExclusiveLock);
NoLock);
/*
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
@ -582,6 +1174,31 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
}
/*
* PropagatePrerequisiteObjectsForDistributedTable ensures we can create shards
* on all nodes by ensuring all dependent objects exist on all node.
*/
static void
PropagatePrerequisiteObjectsForDistributedTable(Oid relationId)
{
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
EnsureRelationHasCompatibleSequenceTypes(relationId);
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress));
}
/*
* EnsureSequenceTypeSupported ensures that the type of the column that uses
* a sequence on its DEFAULT is consistent with previous uses (if any) of the
@ -956,80 +1573,56 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
*/
Assert(distributionMethod == DISTRIBUTE_BY_HASH);
Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
bool createdColocationGroup = false;
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
/* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(colocateWithTableName))
{
/* check for default colocation group */
colocationId = ColocationId(shardCount, ShardReplicationFactor,
AcquireColocationDefaultLock();
}
colocationId = FindColocateWithColocationId(relationId,
replicationModel,
distributionColumnType,
distributionColumnCollation);
distributionColumnCollation,
shardCount,
shardCountIsStrict,
colocateWithTableName);
if (IsColocateWithDefault(colocateWithTableName) && (colocationId !=
INVALID_COLOCATION_ID))
{
/*
* if the shardCount is strict then we check if the shard count
* of the colocated table is actually shardCount
* we can release advisory lock if there is already a default entry for given params;
* else, we should keep it to prevent different default coloc entry creation by
* concurrent operations.
*/
if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
{
Oid colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
CitusTableCacheEntry *cacheEntry =
GetCitusTableCacheEntry(colocatedTableId);
int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
if (colocatedTableShardCount != shardCount)
{
colocationId = INVALID_COLOCATION_ID;
}
}
ReleaseColocationDefaultLock();
}
if (colocationId == INVALID_COLOCATION_ID)
{
if (IsColocateWithDefault(colocateWithTableName))
{
/*
* Generate a new colocation ID and insert a pg_dist_colocation
* record.
*/
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
distributionColumnType,
distributionColumnCollation);
createdColocationGroup = true;
}
}
else if (IsColocateWithNone(colocateWithTableName))
{
colocationId = GetNextColocationId();
createdColocationGroup = true;
}
else
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, sourceRelationId);
colocationId = TableColocationId(sourceRelationId);
}
/*
* If we created a new colocation group then we need to keep the lock to
* prevent a concurrent create_distributed_table call from creating another
* colocation group with the same parameters. If we're using an existing
* colocation group then other transactions will use the same one.
* Generate a new colocation ID and insert a pg_dist_colocation
* record.
*/
if (createdColocationGroup)
{
/* keep the exclusive lock */
table_close(pgDistColocation, NoLock);
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
distributionColumnType,
distributionColumnCollation);
}
else
{
/* release the exclusive lock */
table_close(pgDistColocation, ExclusiveLock);
}
}
@ -1053,7 +1646,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
{
Oid parentRelationId = InvalidOid;
EnsureTableNotDistributed(relationId);
EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI);
/* user really wants triggers? */
@ -1129,13 +1721,13 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
}
}
if (PartitionTable(relationId))
if (PartitionTableNoLock(relationId))
{
parentRelationId = PartitionParentOid(relationId);
}
/* partitions cannot be distributed if their parent is not distributed */
if (PartitionTable(relationId) && !IsCitusTable(parentRelationId))
if (PartitionTableNoLock(relationId) && !IsCitusTable(parentRelationId))
{
char *parentRelationName = get_rel_name(parentRelationId);
@ -1153,7 +1745,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
* reach this point because, we call CreateDistributedTable for partitions if their
* parent table is distributed.
*/
if (PartitionedTable(relationId))
if (PartitionedTableNoLock(relationId))
{
/* we cannot distribute partitioned tables with master_create_distributed_table */
if (viaDeprecatedAPI)
@ -1172,7 +1764,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
}
/* we don't support distributing tables with multi-level partitioning */
if (PartitionTable(relationId))
if (PartitionTableNoLock(relationId))
{
char *parentRelationName = get_rel_name(parentRelationId);
@ -1225,55 +1817,6 @@ ErrorIfTableIsACatalogTable(Relation relation)
}
/*
* EnsureTableCanBeColocatedWith checks whether a given replication model and
* distribution column type is suitable to distribute a table to be colocated
* with given source table.
*
* We only pass relationId to provide meaningful error messages.
*/
static void
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId)
{
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
if (sourceReplicationModel != replicationModel)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Replication models don't match for %s and %s.",
sourceRelationName, relationName)));
}
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
relationName)));
}
}
/*
* EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
* according to ShouldLocalTableBeEmpty but it is not.
@ -1348,6 +1891,27 @@ EnsureLocalTableEmpty(Oid relationId)
}
/*
* EnsureDistributableTable ensures the given table type is appropriate to
* be distributed. Table type should be regular or citus local table.
*/
static void
EnsureDistributableTable(Oid relationId)
{
bool isLocalTable = IsCitusTableType(relationId, CITUS_LOCAL_TABLE);
bool isRegularTable = !IsCitusTableType(relationId, ANY_CITUS_TABLE_TYPE);
if (!isLocalTable && !isRegularTable)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" is already distributed",
relationName)));
}
}
/*
* EnsureTableNotDistributed errors out if the table is distributed.
*/

View File

@ -12,6 +12,7 @@
#include "miscadmin.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/commands.h"
#include "distributed/metadata_utility.h"
@ -70,6 +71,8 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
char *schemaName = text_to_cstring(schemaNameText);
char *tableName = text_to_cstring(tableNameText);
uint32 colocationId = ColocationIdViaCatalog(relationId);
/*
* The SQL_DROP trigger calls this function even for tables that are
* not distributed. In that case, silently ignore. This is not very
@ -87,6 +90,8 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
DeletePartitionRow(relationId);
DeleteColocationGroupIfNoTablesBelong(colocationId);
PG_RETURN_VOID();
}

View File

@ -97,6 +97,66 @@ ConstraintIsAForeignKeyToReferenceTable(char *inputConstaintName, Oid relationId
}
/*
* EnsureNoFKeyFromTableType ensures that given relation is not referenced by any table specified
* by table type flag.
*/
void
EnsureNoFKeyFromTableType(Oid relationId, int tableTypeFlag)
{
int flags = INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
tableTypeFlag;
List *referencedFKeyOids = GetForeignKeyOids(relationId, flags);
if (list_length(referencedFKeyOids) > 0)
{
Oid referencingFKeyOid = linitial_oid(referencedFKeyOids);
Oid referencingTableId = GetReferencingTableId(referencingFKeyOid);
char *referencingRelName = get_rel_name(referencingTableId);
char *referencedRelName = get_rel_name(relationId);
char *referencingTableTypeName = GetTableTypeName(referencingTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("relation %s is referenced by a foreign key from %s",
referencedRelName, referencingRelName),
errdetail(
"foreign keys from a %s to a distributed table are not supported.",
referencingTableTypeName)));
}
}
/*
* EnsureNoFKeyToTableType ensures that given relation is not referencing by any table specified
* by table type flag.
*/
void
EnsureNoFKeyToTableType(Oid relationId, int tableTypeFlag)
{
int flags = INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
tableTypeFlag;
List *referencingFKeyOids = GetForeignKeyOids(relationId, flags);
if (list_length(referencingFKeyOids) > 0)
{
Oid referencedFKeyOid = linitial_oid(referencingFKeyOids);
Oid referencedTableId = GetReferencedTableId(referencedFKeyOid);
char *referencedRelName = get_rel_name(referencedTableId);
char *referencingRelName = get_rel_name(relationId);
char *referencedTableTypeName = GetTableTypeName(referencedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("relation %s is referenced by a foreign key from %s",
referencedRelName, referencingRelName),
errdetail(
"foreign keys from a distributed table to a %s are not supported.",
referencedTableTypeName)));
}
}
/*
* ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign
* constraints and errors out if it is not possible to create one of the
@ -745,19 +805,6 @@ GetForeignKeysFromLocalTables(Oid relationId)
}
/*
* HasForeignKeyToCitusLocalTable returns true if any of the foreign key constraints
* on the relation with relationId references to a citus local table.
*/
bool
HasForeignKeyToCitusLocalTable(Oid relationId)
{
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_CITUS_LOCAL_TABLES;
List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
return list_length(foreignKeyOidList) > 0;
}
/*
* HasForeignKeyToReferenceTable returns true if any of the foreign key
* constraints on the relation with relationId references to a reference

View File

@ -1202,6 +1202,15 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
"is currently unsupported")));
}
if (AllowUnsafeConstraints)
{
/*
* The user explicitly wants to allow the constraint without
* distribution column.
*/
return;
}
Var *partitionKey = DistPartitionKeyOrError(relationId);
List *indexParameterList = createIndexStatement->indexParams;
IndexElem *indexElement = NULL;

View File

@ -217,15 +217,6 @@ struct CopyShardState
List *placementStateList;
};
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
/* list of MultiConnection structs */
List *connectionList;
} ShardConnections;
/*
* Represents the state for allowing copy via local
@ -1195,7 +1186,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
bool haveDetail = remoteDetail != NULL;
ereport(ERROR, (errmsg("%s", remoteMessage),
haveDetail ? errdetail("%s", ApplyLogRedaction(remoteDetail)) :
haveDetail ? errdetail("%s", remoteDetail) :
0));
}
else
@ -1206,7 +1197,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to complete COPY on %s:%d", connection->hostname,
connection->port),
errdetail("%s", ApplyLogRedaction(remoteMessage))));
errdetail("%s", remoteMessage)));
}
}

View File

@ -10,6 +10,8 @@
#include "postgres.h"
#include "pg_version_compat.h"
#include "distributed/pg_version_constants.h"
#include "access/heapam.h"
@ -59,6 +61,7 @@ static char * CreateCreateOrAlterRoleCommand(const char *roleName,
CreateRoleStmt *createRoleStmt,
AlterRoleStmt *alterRoleStmt);
static DefElem * makeDefElemInt(char *name, int value);
static DefElem * makeDefElemBool(char *name, bool value);
static List * GenerateRoleOptionsList(HeapTuple tuple);
static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options);
static List * GenerateGrantRoleStmtsOfRole(Oid roleid);
@ -454,13 +457,13 @@ GenerateRoleOptionsList(HeapTuple tuple)
Form_pg_authid role = ((Form_pg_authid) GETSTRUCT(tuple));
List *options = NIL;
options = lappend(options, makeDefElemInt("superuser", role->rolsuper));
options = lappend(options, makeDefElemInt("createdb", role->rolcreatedb));
options = lappend(options, makeDefElemInt("createrole", role->rolcreaterole));
options = lappend(options, makeDefElemInt("inherit", role->rolinherit));
options = lappend(options, makeDefElemInt("canlogin", role->rolcanlogin));
options = lappend(options, makeDefElemInt("isreplication", role->rolreplication));
options = lappend(options, makeDefElemInt("bypassrls", role->rolbypassrls));
options = lappend(options, makeDefElemBool("superuser", role->rolsuper));
options = lappend(options, makeDefElemBool("createdb", role->rolcreatedb));
options = lappend(options, makeDefElemBool("createrole", role->rolcreaterole));
options = lappend(options, makeDefElemBool("inherit", role->rolinherit));
options = lappend(options, makeDefElemBool("canlogin", role->rolcanlogin));
options = lappend(options, makeDefElemBool("isreplication", role->rolreplication));
options = lappend(options, makeDefElemBool("bypassrls", role->rolbypassrls));
options = lappend(options, makeDefElemInt("connectionlimit", role->rolconnlimit));
/* load password from heap tuple, use NULL if not set */
@ -616,6 +619,16 @@ makeDefElemInt(char *name, int value)
}
/*
* makeDefElemBool creates a DefElem with boolean typed value with -1 as location.
*/
static DefElem *
makeDefElemBool(char *name, bool value)
{
return makeDefElem(name, (Node *) makeBoolean(value), -1);
}
/*
* GetDatabaseNameFromDbRoleSetting performs a lookup, and finds the database name
* associated DbRoleSetting Tuple

View File

@ -151,6 +151,11 @@ List *
PreprocessGrantOnSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
if (!ShouldPropagate())
{
return NIL;
}
GrantStmt *stmt = castNode(GrantStmt, node);
Assert(stmt->objtype == OBJECT_SCHEMA);

View File

@ -54,6 +54,12 @@
/* controlled via GUC, should be accessed via GetEnableLocalReferenceForeignKeys() */
bool EnableLocalReferenceForeignKeys = true;
/*
* GUC that controls whether to allow unique/exclude constraints without
* distribution column.
*/
bool AllowUnsafeConstraints = false;
/* Local functions forward declarations for unsupported command checks */
static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement);
static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement,
@ -2573,6 +2579,16 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
errhint("Consider using hash partitioning.")));
}
if (AllowUnsafeConstraints)
{
/*
* The user explicitly wants to allow the constraint without
* distribution column.
*/
index_close(indexDesc, NoLock);
continue;
}
int attributeCount = indexInfo->ii_NumIndexAttrs;
AttrNumber *attributeNumberArray = indexInfo->ii_IndexAttrNumbers;

View File

@ -182,8 +182,17 @@ GetExplicitTriggerIdList(Oid relationId)
* Note that we mark truncate trigger that we create on citus tables as
* internal. Hence, below we discard citus_truncate_trigger as well as
* the implicit triggers created by postgres for foreign key validation.
*
* Pre PG15, tgisinternal is true for a "child" trigger on a partition
* cloned from the trigger on the parent.
* In PG15, tgisinternal is false in that case. However, we don't want to
* create this trigger on the partition since it will create a conflict
* when we try to attach the partition to the parent table:
* ERROR: trigger "..." for relation "{partition_name}" already exists
* Hence we add an extra check on whether the parent id is invalid to
* make sure this is not a child trigger
*/
if (!triggerForm->tgisinternal)
if (!triggerForm->tgisinternal && (triggerForm->tgparentid == InvalidOid))
{
triggerIdList = lappend_oid(triggerIdList, triggerForm->oid);
}

View File

@ -1415,53 +1415,6 @@ set_indexsafe_procflags(void)
#endif
/*
* CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
* of shards of a distributed table. The command to be applied is generated by the
* TableDDLCommand structure passed in.
*/
DDLJob *
CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
uint64 jobId = INVALID_JOB_ID;
Oid namespace = get_rel_namespace(relationId);
char *namespaceName = get_namespace_name(namespace);
int taskId = 1;
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
uint64 shardId = shardInterval->shardId;
char *commandStr = GetShardedTableDDLCommand(command, shardId, namespaceName);
Task *task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
SetTaskQueryString(task, commandStr);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId);
taskList = lappend(taskList, task);
}
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = GetTableDDLCommand(command);
ddlJob->taskList = taskList;
return ddlJob;
}
/*
* SetSearchPathToCurrentSearchPathCommand generates a command which can
* set the search path to the exact same search path that the issueing node

View File

@ -258,10 +258,6 @@ ReportConnectionError(MultiConnection *connection, int elevel)
if (messageDetail)
{
/*
* We don't use ApplyLogRedaction(messageDetail) as we expect any error
* detail that requires log reduction should have done it locally.
*/
ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection to the remote node %s:%d failed with the "
"following error: %s", nodeName, nodePort,
@ -315,7 +311,7 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
messageDetail ?
errdetail("%s", ApplyLogRedaction(messageDetail)) : 0,
errdetail("%s", messageDetail) : 0,
messageHint ? errhint("%s", messageHint) : 0,
messageContext ? errcontext("%s", messageContext) : 0,
errcontext("while executing command on %s:%d",
@ -349,7 +345,7 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
return;
}
ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
ereport(NOTICE, (errmsg("issuing %s", command),
errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
connection->hostname,
connection->port, connection->connectionId)));

View File

@ -173,7 +173,7 @@ DefaultCitusNoticeReceiver(void *arg, const PGresult *result)
ereport(logLevel,
(errcode(sqlState),
errmsg("%s", ApplyLogRedaction(trimmedMessage)),
errmsg("%s", trimmedMessage),
errdetail("from %s:%d", nodeName, nodePort)));
}

View File

@ -196,7 +196,7 @@ AppendDefElem(StringInfo buf, DefElem *def)
static void
AppendDefElemStrict(StringInfo buf, DefElem *def)
{
if (intVal(def->arg) == 1)
if (boolVal(def->arg))
{
appendStringInfo(buf, " STRICT");
}
@ -223,7 +223,7 @@ AppendDefElemVolatility(StringInfo buf, DefElem *def)
static void
AppendDefElemLeakproof(StringInfo buf, DefElem *def)
{
if (intVal(def->arg) == 0)
if (!boolVal(def->arg))
{
appendStringInfo(buf, " NOT");
}
@ -237,7 +237,7 @@ AppendDefElemLeakproof(StringInfo buf, DefElem *def)
static void
AppendDefElemSecurity(StringInfo buf, DefElem *def)
{
if (intVal(def->arg) == 0)
if (!boolVal(def->arg))
{
appendStringInfo(buf, " SECURITY INVOKER");
}

View File

@ -13,6 +13,8 @@
#include "postgres.h"
#include "pg_version_compat.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "lib/stringinfo.h"
@ -98,59 +100,59 @@ AppendRoleOption(StringInfo buf, ListCell *optionCell)
{
DefElem *option = (DefElem *) lfirst(optionCell);
if (strcmp(option->defname, "superuser") == 0 && intVal(option->arg))
if (strcmp(option->defname, "superuser") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " SUPERUSER");
}
else if (strcmp(option->defname, "superuser") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "superuser") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOSUPERUSER");
}
else if (strcmp(option->defname, "createdb") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "createdb") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " CREATEDB");
}
else if (strcmp(option->defname, "createdb") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "createdb") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOCREATEDB");
}
else if (strcmp(option->defname, "createrole") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "createrole") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " CREATEROLE");
}
else if (strcmp(option->defname, "createrole") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "createrole") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOCREATEROLE");
}
else if (strcmp(option->defname, "inherit") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "inherit") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " INHERIT");
}
else if (strcmp(option->defname, "inherit") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "inherit") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOINHERIT");
}
else if (strcmp(option->defname, "canlogin") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "canlogin") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " LOGIN");
}
else if (strcmp(option->defname, "canlogin") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "canlogin") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOLOGIN");
}
else if (strcmp(option->defname, "isreplication") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "isreplication") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " REPLICATION");
}
else if (strcmp(option->defname, "isreplication") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "isreplication") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOREPLICATION");
}
else if (strcmp(option->defname, "bypassrls") == 0 && intVal(option->arg))
else if (strcmp(option->defname, "bypassrls") == 0 && boolVal(option->arg))
{
appendStringInfo(buf, " BYPASSRLS");
}
else if (strcmp(option->defname, "bypassrls") == 0 && !intVal(option->arg))
else if (strcmp(option->defname, "bypassrls") == 0 && !boolVal(option->arg))
{
appendStringInfo(buf, " NOBYPASSRLS");
}

View File

@ -18,8 +18,7 @@
#include "pg_config.h"
/* We should drop PG 15 support from this file, this is only for testing purposes until #6085 is merged. */
#if (PG_VERSION_NUM >= PG_VERSION_14) && (PG_VERSION_NUM <= PG_VERSION_15)
#if (PG_VERSION_NUM >= PG_VERSION_14) && (PG_VERSION_NUM < PG_VERSION_15)
#include "postgres.h"

File diff suppressed because it is too large Load Diff

View File

@ -519,7 +519,7 @@ LogLocalCommand(Task *task)
}
ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(command))));
command)));
}

View File

@ -155,9 +155,9 @@ MultiClientSendQuery(int32 connectionId, const char *query)
* we cannot send the queries that Citus itself produced.
*/
ereport(WARNING, (errmsg("could not send remote query \"%s\"",
ApplyLogRedaction(query)),
query),
errdetail("Client error: %s",
ApplyLogRedaction(errorMessage))));
errorMessage)));
success = false;
}

View File

@ -495,6 +495,49 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
}
/*
* GetTableTypeName returns string representation of the table type.
*/
char *
GetTableTypeName(Oid tableId)
{
bool regularTable = false;
char partitionMethod = ' ';
char replicationModel = ' ';
if (IsCitusTable(tableId))
{
CitusTableCacheEntry *referencingCacheEntry = GetCitusTableCacheEntry(tableId);
partitionMethod = referencingCacheEntry->partitionMethod;
replicationModel = referencingCacheEntry->replicationModel;
}
else
{
regularTable = true;
}
if (regularTable)
{
return "regular table";
}
else if (partitionMethod == 'h')
{
return "distributed table";
}
else if (partitionMethod == 'n' && replicationModel == 't')
{
return "reference table";
}
else if (partitionMethod == 'n' && replicationModel != 't')
{
return "citus local table";
}
else
{
return "unknown table";
}
}
/*
* IsCitusTable returns whether relationId is a distributed relation or
* not.
@ -617,6 +660,45 @@ PartitionColumnViaCatalog(Oid relationId)
}
/*
* ColocationIdViaCatalog gets a relationId and returns the colocation
* id column from pg_dist_partition via reading from catalog.
*/
uint32
ColocationIdViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
if (!HeapTupleIsValid(partitionTuple))
{
return INVALID_COLOCATION_ID;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
{
/* colocation id cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return INVALID_COLOCATION_ID;
}
Datum colocationIdDatum = datumArray[Anum_pg_dist_partition_colocationid - 1];
uint32 colocationId = DatumGetUInt32(colocationIdDatum);
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return colocationId;
}
/*
* PgDistPartitionTupleViaCatalog is a helper function that searches
* pg_dist_partition for the given relationId. The caller is responsible
@ -2831,42 +2913,6 @@ TextOutFunctionId(void)
}
/*
* PgTableVisibleFuncId returns oid of the pg_table_is_visible function.
*/
Oid
PgTableVisibleFuncId(void)
{
if (MetadataCache.pgTableIsVisibleFuncId == InvalidOid)
{
const int argCount = 1;
MetadataCache.pgTableIsVisibleFuncId =
FunctionOid("pg_catalog", "pg_table_is_visible", argCount);
}
return MetadataCache.pgTableIsVisibleFuncId;
}
/*
* CitusTableVisibleFuncId returns oid of the citus_table_is_visible function.
*/
Oid
CitusTableVisibleFuncId(void)
{
if (MetadataCache.citusTableIsVisibleFuncId == InvalidOid)
{
const int argCount = 1;
MetadataCache.citusTableIsVisibleFuncId =
FunctionOid("pg_catalog", "citus_table_is_visible", argCount);
}
return MetadataCache.citusTableIsVisibleFuncId;
}
/*
* RelationIsAKnownShardFuncId returns oid of the relation_is_a_known_shard function.
*/
@ -4424,17 +4470,6 @@ CitusTableTypeIdList(CitusTableType citusTableType)
}
/*
* ClusterHasReferenceTable returns true if the cluster has
* any reference table.
*/
bool
ClusterHasReferenceTable(void)
{
return list_length(CitusTableTypeIdList(REFERENCE_TABLE)) > 0;
}
/*
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
* any change happens on pg_dist_node table. It also set WorkerNodeHash to

View File

@ -159,6 +159,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
* or regular users as long as the regular user owns the input object.
*/
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
@ -1211,6 +1212,24 @@ DistributionDeleteCommand(const char *schemaName, const char *tableName)
}
/*
* DistributionDeleteMetadataCommand returns a query to delete pg_dist_partition
* metadata from a worker node for a given table.
*/
char *
DistributionDeleteMetadataCommand(Oid relationId)
{
StringInfo deleteCommand = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
appendStringInfo(deleteCommand,
"SELECT pg_catalog.citus_internal_delete_partition_metadata(%s)",
quote_literal_cstr(qualifiedRelationName));
return deleteCommand->data;
}
/*
* TableOwnerResetCommand generates a commands that can be executed
* to reset the table owner.
@ -3199,6 +3218,35 @@ EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int coloc
}
/*
* citus_internal_delete_partition_metadata is an internal UDF to
* delete a row in pg_dist_partition.
*/
Datum
citus_internal_delete_partition_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_ENSURE_ARGNOTNULL(0, "relation");
Oid relationId = PG_GETARG_OID(0);
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);
/* we want to serialize all the metadata changes to this table */
LockRelationOid(relationId, ShareUpdateExclusiveLock);
if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}
DeletePartitionRow(relationId);
PG_RETURN_VOID();
}
/*
* citus_internal_add_shard_metadata is an internal UDF to
* add a row to pg_dist_shard.

View File

@ -39,6 +39,7 @@
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
@ -81,6 +82,8 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
uint64 *tableSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList);
static char * GenerateSizeQueryForRelationNameList(List *quotedShardNames,
char *sizeFunction);
static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType);
static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType);
static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode,
@ -720,7 +723,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{
StringInfo selectQuery = makeStringInfo();
appendStringInfo(selectQuery, "SELECT ");
List *partitionedShardNames = NIL;
List *nonPartitionedShardNames = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
@ -746,30 +750,76 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
/* for partitoned tables, we will call worker_partitioned_... size functions */
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
{
appendStringInfo(selectQuery, GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType), quotedShardName);
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
}
/* for non-partitioned tables, we will use Postgres' size functions */
else
{
appendStringInfo(selectQuery, GetSizeQueryBySizeQueryType(sizeQueryType),
quotedShardName);
nonPartitionedShardNames = lappend(nonPartitionedShardNames, quotedShardName);
}
}
appendStringInfo(selectQuery, " + ");
}
/* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */
char *subqueryForPartitionedShards =
GenerateSizeQueryForRelationNameList(partitionedShardNames,
GetWorkerPartitionedSizeUDFNameBySizeQueryType(
sizeQueryType));
/*
* Add 0 as a last size, it handles empty list case and makes size control checks
* unnecessary which would have implemented without this line.
*/
appendStringInfo(selectQuery, "0;");
/* SELECT SUM(pg_..._size) FROM VALUES (...) */
char *subqueryForNonPartitionedShards =
GenerateSizeQueryForRelationNameList(nonPartitionedShardNames,
GetSizeQueryBySizeQueryType(sizeQueryType));
appendStringInfo(selectQuery, "SELECT (%s) + (%s);",
subqueryForPartitionedShards, subqueryForNonPartitionedShards);
elog(DEBUG4, "Size Query: %s", selectQuery->data);
return selectQuery;
}
/*
* GenerateSizeQueryForPartitionedShards generates and returns a query with a template:
* SELECT SUM( <sizeFunction>(relid) ) FROM (VALUES (<shardName>), (<shardName>), ...) as q(relid)
*/
static char *
GenerateSizeQueryForRelationNameList(List *quotedShardNames, char *sizeFunction)
{
if (list_length(quotedShardNames) == 0)
{
return "SELECT 0";
}
StringInfo selectQuery = makeStringInfo();
appendStringInfo(selectQuery, "SELECT SUM(");
appendStringInfo(selectQuery, sizeFunction, "relid");
appendStringInfo(selectQuery, ") FROM (VALUES ");
bool addComma = false;
char *quotedShardName = NULL;
foreach_ptr(quotedShardName, quotedShardNames)
{
if (addComma)
{
appendStringInfoString(selectQuery, ", ");
}
addComma = true;
appendStringInfo(selectQuery, "(%s)", quotedShardName);
}
appendStringInfoString(selectQuery, ") as q(relid)");
return selectQuery->data;
}
/*
* GetWorkerPartitionedSizeUDFNameBySizeQueryType returns the corresponding worker
* partitioned size query for given query type.
@ -2104,6 +2154,96 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
}
/*
* UpdateDistributionColumnGlobally sets the distribution column and colocation ID
* for a table in pg_dist_partition on all nodes
*/
void
UpdateDistributionColumnGlobally(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId)
{
UpdateDistributionColumn(relationId, distributionMethod, distributionColumn,
colocationId);
if (ShouldSyncTableMetadata(relationId))
{
/* we use delete+insert because syncing uses specialized RPCs */
char *deleteMetadataCommand = DistributionDeleteMetadataCommand(relationId);
SendCommandToWorkersWithMetadata(deleteMetadataCommand);
/* pick up the new metadata (updated above) */
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
char *insertMetadataCommand = DistributionCreateCommand(cacheEntry);
SendCommandToWorkersWithMetadata(insertMetadataCommand);
}
}
/*
* UpdateDistributionColumn sets the distribution column and colocation ID for a table
* in pg_dist_partition.
*/
void
UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distributionColumn,
int colocationId)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for citus table with oid: %u",
relationId)));
}
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_partition_partmethod - 1] = true;
values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod);
isnull[Anum_pg_dist_partition_partmethod - 1] = false;
replace[Anum_pg_dist_partition_colocationid - 1] = true;
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
char *distributionColumnString = nodeToString((Node *) distributionColumn);
replace[Anum_pg_dist_partition_partkey - 1] = true;
values[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionColumnString);
isnull[Anum_pg_dist_partition_partkey - 1] = false;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistPartition, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
}
/*
* Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions.
@ -2135,21 +2275,6 @@ EnsureTableOwner(Oid relationId)
}
/*
* Check that the current user has owner rights to the schema, error out if
* not. Superusers are regarded as owners.
*/
void
EnsureSchemaOwner(Oid schemaId)
{
if (!pg_namespace_ownercheck(schemaId, GetUserId()))
{
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
get_namespace_name(schemaId));
}
}
/*
* Check that the current user has owner rights to functionId, error out if
* not. Superusers are regarded as owners. Functions and procedures are

View File

@ -2546,6 +2546,24 @@ EnsureCoordinator(void)
}
/*
* EnsureCoordinatorIsInMetadata checks whether the coordinator is added to the
* metadata, which is required for many operations.
*/
void
EnsureCoordinatorIsInMetadata(void)
{
bool isCoordinatorInMetadata = false;
PrimaryNodeForGroup(COORDINATOR_GROUP_ID, &isCoordinatorInMetadata);
if (!isCoordinatorInMetadata)
{
ereport(ERROR, (errmsg("coordinator is not added to the metadata"),
errhint("Use SELECT citus_set_coordinator_host('<hostname>') "
"to configure the coordinator hostname")));
}
}
/*
* InsertCoordinatorIfClusterEmpty can be used to ensure Citus tables can be
* created even on a node that has just performed CREATE EXTENSION citus;

View File

@ -23,6 +23,7 @@
#include "distributed/connection_management.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_split.h"
#include "distributed/utils/distribution_column_map.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_split_shard_by_split_points);
@ -52,12 +53,17 @@ citus_split_shard_by_split_points(PG_FUNCTION_ARGS)
Oid shardTransferModeOid = PG_GETARG_OID(3);
SplitMode shardSplitMode = LookupSplitMode(shardTransferModeOid);
DistributionColumnMap *distributionColumnOverrides = NULL;
List *sourceColocatedShardIntervalList = NIL;
SplitShard(
shardSplitMode,
SHARD_SPLIT_API,
shardIdToSplit,
shardSplitPointsList,
nodeIdsForPlacementList);
nodeIdsForPlacementList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}

View File

@ -33,6 +33,7 @@
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
#include "distributed/shard_split.h"
#include "distributed/utils/distribution_column_map.h"
#include "nodes/pg_list.h"
#include "storage/lock.h"
#include "utils/builtins.h"
@ -163,12 +164,17 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
nodeIdsForPlacementList = lappend_int(nodeIdsForPlacementList, sourceNodeId);
}
DistributionColumnMap *distributionColumnOverrides = NULL;
List *sourceColocatedShardIntervalList = NIL;
SplitMode splitMode = LookupSplitMode(shardTransferModeOid);
SplitShard(splitMode,
ISOLATE_TENANT_TO_NEW_SHARD,
sourceShard->shardId,
shardSplitPointsList,
nodeIdsForPlacementList);
nodeIdsForPlacementList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
INVALID_COLOCATION_ID);
cacheEntry = GetCitusTableCacheEntry(relationId);
ShardInterval *newShard = FindShardInterval(tenantIdDatum, cacheEntry);

View File

@ -78,7 +78,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt))
{
ereport(ERROR, (errmsg("query \"%s\" is not a delete or update "
"statement", ApplyLogRedaction(queryString))));
"statement", queryString)));
}
ereport(WARNING, (errmsg("master_modify_multiple_shards is deprecated and will be "

View File

@ -71,7 +71,6 @@ typedef struct ShardCommandList
} ShardCommandList;
/* local function forward declarations */
static bool RelationCanPublishAllModifications(Oid relationId);
static bool CanUseLogicalReplication(Oid relationId, char shardReplicationMode);
static void ErrorIfTableCannotBeReplicated(Oid relationId);
static void ErrorIfTargetNodeIsNotSafeToCopyTo(const char *targetNodeName,
@ -635,7 +634,7 @@ VerifyTablesHaveReplicaIdentity(List *colocatedTableList)
* RelationCanPublishAllModifications returns true if the relation is safe to publish
* all modification while being replicated via logical replication.
*/
static bool
bool
RelationCanPublishAllModifications(Oid relationId)
{
Relation relation = RelationIdGetRelation(relationId);

File diff suppressed because it is too large Load Diff

View File

@ -9,15 +9,16 @@
#include "postgres.h"
#include "pg_version_compat.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distribution_column.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/multi_executor.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_shard_copy.h"
#include "utils/lsyscache.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "distributed/utils/array_type.h"
#include "distributed/listutils.h"
#include "distributed/multi_executor.h"
#include "distributed/worker_shard_copy.h"
#include "distributed/intermediate_results.h"
#include "distributed/citus_ruleutils.h"
PG_FUNCTION_INFO_V1(worker_split_copy);
@ -38,6 +39,7 @@ static DestReceiver ** CreateShardCopyDestReceivers(EState *estate,
static DestReceiver * CreatePartitionedSplitCopyDestReceiver(EState *executor,
ShardInterval *
shardIntervalToSplitCopy,
char *partitionColumnName,
List *splitCopyInfoList);
static void BuildMinMaxRangeArrays(List *splitCopyInfoList, ArrayType **minValueArray,
ArrayType **maxValueArray);
@ -54,7 +56,10 @@ worker_split_copy(PG_FUNCTION_ARGS)
uint64 shardIdToSplitCopy = DatumGetUInt64(PG_GETARG_DATUM(0));
ShardInterval *shardIntervalToSplitCopy = LoadShardInterval(shardIdToSplitCopy);
ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(1);
text *partitionColumnText = PG_GETARG_TEXT_P(1);
char *partitionColumnName = text_to_cstring(partitionColumnText);
ArrayType *splitCopyInfoArrayObject = PG_GETARG_ARRAYTYPE_P(2);
bool arrayHasNull = ARR_HASNULL(splitCopyInfoArrayObject);
if (arrayHasNull)
{
@ -82,6 +87,7 @@ worker_split_copy(PG_FUNCTION_ARGS)
EState *executor = CreateExecutorState();
DestReceiver *splitCopyDestReceiver = CreatePartitionedSplitCopyDestReceiver(executor,
shardIntervalToSplitCopy,
partitionColumnName,
splitCopyInfoList);
Oid sourceShardToCopySchemaOId = get_rel_namespace(
@ -228,6 +234,7 @@ CreateShardCopyDestReceivers(EState *estate, ShardInterval *shardIntervalToSplit
static DestReceiver *
CreatePartitionedSplitCopyDestReceiver(EState *estate,
ShardInterval *shardIntervalToSplitCopy,
char *partitionColumnName,
List *splitCopyInfoList)
{
/* Create underlying ShardCopyDestReceivers */
@ -240,10 +247,17 @@ CreatePartitionedSplitCopyDestReceiver(EState *estate,
ArrayType *minValuesArray = NULL;
ArrayType *maxValuesArray = NULL;
BuildMinMaxRangeArrays(splitCopyInfoList, &minValuesArray, &maxValuesArray);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
shardIntervalToSplitCopy->relationId);
char partitionMethod = cacheEntry->partitionMethod;
Var *partitionColumn = cacheEntry->partitionColumn;
/* we currently only support hash-distribution */
char partitionMethod = DISTRIBUTE_BY_HASH;
/* synthetically build the partition column by looking at shard columns */
uint64 shardId = shardIntervalToSplitCopy->shardId;
bool missingOK = false;
Oid shardRelationId = LookupShardRelationFromCatalog(shardId, missingOK);
Var *partitionColumn = BuildDistributionKeyFromColumnName(shardRelationId,
partitionColumnName,
AccessShareLock);
CitusTableCacheEntry *shardSearchInfo =
QueryTupleShardSearchInfo(minValuesArray, maxValuesArray,

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* worker_split_shard_release_dsm.c
* This file contains functions to release dynamic shared memory segment
* allocated during split workflow.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shardsplit_shared_memory.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(worker_split_shard_release_dsm);
Datum
worker_split_shard_release_dsm(PG_FUNCTION_ARGS)
{
ReleaseSharedMemoryOfShardSplitInfo();
PG_RETURN_VOID();
}

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* worker_split_shard_replication_setup.c
* worker_split_shard_replication_setup_udf.c
* This file contains functions to setup information about list of shards
* that are being split.
*

View File

@ -137,7 +137,7 @@ RebuildQueryStrings(Job *workerJob)
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
!isQueryObjectOrText
? "(null)"
: ApplyLogRedaction(TaskQueryString(task)))));
: TaskQueryString(task))));
UpdateTaskQueryString(query, task);
@ -148,7 +148,7 @@ RebuildQueryStrings(Job *workerJob)
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(TaskQueryString(task)))));
TaskQueryString(task))));
}
}

View File

@ -74,6 +74,8 @@ static uint64 NextPlanId = 1;
/* keep track of planner call stack levels */
int PlannerLevel = 0;
static void ErrorIfQueryHasMergeCommand(Query *queryTree);
static bool ContainsMergeCommandWalker(Node *node);
static bool ListContainsDistributedTableRTE(List *rangeTableList,
bool *maybeHasForeignDistributedTable);
static bool IsUpdateOrDelete(Query *query);
@ -148,6 +150,12 @@ distributed_planner(Query *parse,
/* this cursor flag could only be set when Citus has been loaded */
Assert(CitusHasBeenLoaded());
/*
* We cannot have merge command for this path as well because
* there cannot be recursively planned merge command.
*/
Assert(!ContainsMergeCommandWalker((Node *) parse));
needsDistributedPlanning = true;
}
else if (CitusHasBeenLoaded())
@ -187,13 +195,20 @@ distributed_planner(Query *parse,
planContext.originalQuery = copyObject(parse);
if (!fastPathRouterQuery)
{
/*
* Fast path queries cannot have merge command, and we
* prevent the remaining here.
*/
ErrorIfQueryHasMergeCommand(parse);
/*
* When there are partitioned tables (not applicable to fast path),
* pretend that they are regular tables to avoid unnecessary work
* in standard_planner.
*/
if (!fastPathRouterQuery)
{
bool setPartitionedTablesInherited = false;
AdjustPartitioningForDistributedPlanning(rangeTableList,
setPartitionedTablesInherited);
@ -287,6 +302,59 @@ distributed_planner(Query *parse,
}
/*
* ErrorIfQueryHasMergeCommand walks over the query tree and throws error
* if there are any Merge command (e.g., CMD_MERGE) in the query tree.
*/
static void
ErrorIfQueryHasMergeCommand(Query *queryTree)
{
/*
* Postgres currently doesn't support Merge queries inside subqueries and
* ctes, but lets be defensive and do query tree walk anyway.
*
* We do not call this path for fast-path queries to avoid this additional
* overhead.
*/
if (ContainsMergeCommandWalker((Node *) queryTree))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MERGE command is not supported on Citus tables yet")));
}
}
/*
* ContainsMergeCommandWalker walks over the node and finds if there are any
* Merge command (e.g., CMD_MERGE) in the node.
*/
static bool
ContainsMergeCommandWalker(Node *node)
{
#if PG_VERSION_NUM >= PG_VERSION_15
if (node == NULL)
{
return false;
}
if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->commandType == CMD_MERGE)
{
return true;
}
return query_tree_walker((Query *) node, ContainsMergeCommandWalker, NULL, 0);
}
return expression_tree_walker(node, ContainsMergeCommandWalker, NULL);
#endif
return false;
}
/*
* ExtractRangeTableEntryList is a wrapper around ExtractRangeTableEntryWalker.
* The function traverses the input query and returns all the range table

View File

@ -162,6 +162,17 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
return false;
}
#if PG_VERSION_NUM >= PG_VERSION_15
if (query->commandType == CMD_MERGE)
{
/*
* Citus doesn't support MERGE command, lets return
* early and explicitly for fast-path queries.
*/
return false;
}
#endif
/*
* We want to deal with only very simple queries. Some of the
* checks might be too restrictive, still we prefer this way.

View File

@ -927,7 +927,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId,
queryString);
ereport(DEBUG2, (errmsg("distributed statement: %s",
ApplyLogRedaction(queryString->data))));
queryString->data)));
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
queryString->data);

View File

@ -627,7 +627,7 @@ PrintJoinOrderList(List *joinOrder)
}
ereport(LOG, (errmsg("join order: %s",
ApplyLogRedaction(printBuffer->data))));
printBuffer->data)));
}

View File

@ -1154,7 +1154,8 @@ HasComplexRangeTableType(Query *queryTree)
if (rangeTableEntry->rtekind != RTE_RELATION &&
rangeTableEntry->rtekind != RTE_SUBQUERY &&
rangeTableEntry->rtekind != RTE_FUNCTION &&
rangeTableEntry->rtekind != RTE_VALUES)
rangeTableEntry->rtekind != RTE_VALUES &&
!IsJsonTableRTE(rangeTableEntry))
{
hasComplexRangeTableType = true;
}

View File

@ -490,6 +490,10 @@ RangePartitionJoinBaseRelationId(MultiJoin *joinNode)
{
partitionNode = (MultiPartition *) rightChildNode;
}
else
{
Assert(false);
}
Index baseTableId = partitionNode->splitPointTableId;
MultiTable *baseTable = FindTableNode((MultiNode *) joinNode, baseTableId);
@ -575,12 +579,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
Job *job = (Job *) linitial(dependentJobList);
if (CitusIsA(job, MapMergeJob))
{
MapMergeJob *mapMergeJob = (MapMergeJob *) job;
isRepartitionJoin = true;
if (mapMergeJob->reduceQuery)
{
updateColumnAttributes = false;
}
}
}
@ -2566,7 +2565,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
{
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s",
ApplyLogRedaction(queryString->data))));
queryString->data)));
SetTaskQueryString(subqueryTask, queryString->data);
}
@ -2721,7 +2720,7 @@ SqlTaskList(Job *job)
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
errdetail("query string: \"%s\"",
ApplyLogRedaction(sqlQueryString->data))));
sqlQueryString->data)));
sqlTask->anchorShardId = INVALID_SHARD_ID;
if (anchorRangeTableBasedAssignment)
@ -3236,45 +3235,6 @@ BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand)
}
/*
* SimpleOpExpression checks that given expression is a simple operator
* expression. A simple operator expression is a binary operator expression with
* operands of a var and a non-null constant.
*/
bool
SimpleOpExpression(Expr *clause)
{
Const *constantClause = NULL;
Node *leftOperand;
Node *rightOperand;
if (!BinaryOpExpression(clause, &leftOperand, &rightOperand))
{
return false;
}
if (IsA(rightOperand, Const) && IsA(leftOperand, Var))
{
constantClause = (Const *) rightOperand;
}
else if (IsA(leftOperand, Const) && IsA(rightOperand, Var))
{
constantClause = (Const *) leftOperand;
}
else
{
return false;
}
if (constantClause->constisnull)
{
return false;
}
return true;
}
/*
* MakeInt4Column creates a column of int4 type with invalid table id and max
* attribute number.
@ -4710,18 +4670,13 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
for (uint32 partitionId = initialPartitionId; partitionId < partitionCount;
partitionId++)
{
Task *mergeTask = NULL;
List *mapOutputFetchTaskList = NIL;
ListCell *mapTaskCell = NULL;
uint32 mergeTaskId = taskIdIndex;
Query *reduceQuery = mapMergeJob->reduceQuery;
if (reduceQuery == NULL)
{
/* create logical merge task (not executed, but useful for bookkeeping) */
mergeTask = CreateBasicTask(jobId, mergeTaskId, MERGE_TASK,
Task *mergeTask = CreateBasicTask(jobId, mergeTaskId, MERGE_TASK,
"<merge>");
}
mergeTask->partitionId = partitionId;
taskIdIndex++;

View File

@ -60,7 +60,8 @@ typedef enum RecurringTuplesType
RECURRING_TUPLES_FUNCTION,
RECURRING_TUPLES_EMPTY_JOIN_TREE,
RECURRING_TUPLES_RESULT_FUNCTION,
RECURRING_TUPLES_VALUES
RECURRING_TUPLES_VALUES,
RECURRING_TUPLES_JSON_TABLE
} RecurringTuplesType;
/*
@ -345,7 +346,8 @@ IsFunctionOrValuesRTE(Node *node)
RangeTblEntry *rangeTblEntry = (RangeTblEntry *) node;
if (rangeTblEntry->rtekind == RTE_FUNCTION ||
rangeTblEntry->rtekind == RTE_VALUES)
rangeTblEntry->rtekind == RTE_VALUES ||
IsJsonTableRTE(rangeTblEntry))
{
return true;
}
@ -718,6 +720,13 @@ DeferErrorIfFromClauseRecurs(Query *queryTree)
"the FROM clause contains VALUES", NULL,
NULL);
}
else if (recurType == RECURRING_TUPLES_JSON_TABLE)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"correlated subqueries are not supported when "
"the FROM clause contains JSON_TABLE", NULL,
NULL);
}
/*
@ -945,6 +954,13 @@ DeferredErrorIfUnsupportedRecurringTuplesJoin(
"There exist a VALUES clause in the outer "
"part of the outer join", NULL);
}
else if (recurType == RECURRING_TUPLES_JSON_TABLE)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"There exist a JSON_TABLE clause in the outer "
"part of the outer join", NULL);
}
return NULL;
}
@ -1235,7 +1251,8 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
*/
if (rangeTableEntry->rtekind == RTE_RELATION ||
rangeTableEntry->rtekind == RTE_SUBQUERY ||
rangeTableEntry->rtekind == RTE_RESULT)
rangeTableEntry->rtekind == RTE_RESULT ||
IsJsonTableRTE(rangeTableEntry)) /* TODO: can we have volatile???*/
{
/* accepted */
}
@ -1403,6 +1420,13 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree)
"VALUES is not supported within a "
"UNION", NULL);
}
else if (recurType == RECURRING_TUPLES_JSON_TABLE)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot push down this subquery",
"JSON_TABLE is not supported within a "
"UNION", NULL);
}
return NULL;
}
@ -1502,6 +1526,11 @@ RecurringTypeDescription(RecurringTuplesType recurType)
return "a VALUES clause";
}
case RECURRING_TUPLES_JSON_TABLE:
{
return "a JSON_TABLE";
}
case RECURRING_TUPLES_INVALID:
{
/*
@ -1698,7 +1727,8 @@ DeferredErrorIfUnsupportedLateralSubquery(PlannerInfo *plannerInfo,
* strings anyway.
*/
if (recurType != RECURRING_TUPLES_VALUES &&
recurType != RECURRING_TUPLES_RESULT_FUNCTION)
recurType != RECURRING_TUPLES_RESULT_FUNCTION &&
recurType != RECURRING_TUPLES_JSON_TABLE)
{
recurTypeDescription = psprintf("%s (%s)", recurTypeDescription,
recurringRangeTableEntry->eref->
@ -1775,6 +1805,26 @@ ContainsRecurringRangeTable(List *rangeTable, RecurringTuplesType *recurType)
}
/*
* IsJsonTableRTE checks whether the RTE refers to a JSON_TABLE
* table function, which was introduced in PostgreSQL 15.
*/
bool
IsJsonTableRTE(RangeTblEntry *rte)
{
#if PG_VERSION_NUM >= PG_VERSION_15
if (rte == NULL)
{
return false;
}
return (rte->rtekind == RTE_TABLEFUNC &&
rte->tablefunc->functype == TFT_JSON_TABLE);
#endif
return false;
}
/*
* HasRecurringTuples returns whether any part of the expression will generate
* the same set of tuples in every query on shards when executing a distributed
@ -1836,6 +1886,11 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
*recurType = RECURRING_TUPLES_VALUES;
return true;
}
else if (IsJsonTableRTE(rangeTableEntry))
{
*recurType = RECURRING_TUPLES_JSON_TABLE;
return true;
}
return false;
}

View File

@ -242,7 +242,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
ereport(DEBUG1, (errmsg(
"Plan " UINT64_FORMAT
" query after replacing subqueries and CTEs: %s", planId,
ApplyLogRedaction(subPlanString->data))));
subPlanString->data)));
}
recursivePlanningDepth--;
@ -763,7 +763,7 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
"_%u for CTE %s: %s", planId, subPlanId,
cteName,
ApplyLogRedaction(subPlanString->data))));
subPlanString->data)));
}
/* build a sub plan for the CTE */
@ -1181,7 +1181,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
"_%u for subquery %s", planId, subPlanId,
ApplyLogRedaction(subqueryString->data))));
subqueryString->data)));
}
/* finally update the input subquery to point the result query */

View File

@ -1807,12 +1807,22 @@ CreateSubscriptions(MultiConnection *sourceConnection,
appendStringInfo(createSubscriptionCommand,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (citus_use_authinfo=true, create_slot=false, "
" copy_data=false, enabled=false, slot_name=%s)",
"copy_data=false, enabled=false, slot_name=%s",
quote_identifier(target->subscriptionName),
quote_literal_cstr(conninfo->data),
quote_identifier(target->publication->name),
quote_identifier(target->replicationSlot->name));
if (EnableBinaryProtocol && PG_VERSION_NUM >= PG_VERSION_14)
{
appendStringInfoString(createSubscriptionCommand, ", binary=true)");
}
else
{
appendStringInfoString(createSubscriptionCommand, ")");
}
ExecuteCriticalRemoteCommand(target->superuserConnection,
createSubscriptionCommand->data);
pfree(createSubscriptionCommand->data);

View File

@ -14,6 +14,7 @@
#include "replication/logical.h"
#include "utils/typcache.h"
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
static LogicalDecodeChangeCB pgoutputChangeCB;
@ -216,7 +217,8 @@ GetHashValueForIncomingTuple(Relation sourceShardRelation,
TYPECACHE_HASH_PROC_FINFO);
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(&(typeEntry->hash_proc_finfo),
Datum hashedValueDatum = FunctionCall1Coll(&(typeEntry->hash_proc_finfo),
typeEntry->typcollation,
partitionColumnValue);
return DatumGetInt32(hashedValueDatum);

View File

@ -173,6 +173,11 @@ ReleaseSharedMemoryOfShardSplitInfo()
/* Get handle of dynamic shared memory segment*/
dsm_handle dsmHandle = GetShardSplitSharedMemoryHandle();
if (dsmHandle == DSM_HANDLE_INVALID)
{
return;
}
/*
* Unpin the dynamic shared memory segment. 'dsm_pin_segment' was
* called previously by 'AllocateSharedMemoryForShardSplitInfo'.
@ -266,8 +271,10 @@ StoreShardSplitSharedMemoryHandle(dsm_handle dsmHandle)
* before the current function is called.
* If this handle is still valid, it means cleanup of previous split shard
* workflow failed. Log a waring and continue the current shard split operation.
* Skip warning if new handle to be stored is invalid. We store invalid handle
* when shared memory is released by calling worker_split_shard_release_dsm.
*/
if (smData->dsmHandle != DSM_HANDLE_INVALID)
if (smData->dsmHandle != DSM_HANDLE_INVALID && dsmHandle != DSM_HANDLE_INVALID)
{
ereport(WARNING,
errmsg(

View File

@ -146,6 +146,7 @@ DEFINE_COLUMNAR_PASSTHROUGH_FUNC(test_columnar_storage_write_new_page)
#define DUMMY_REAL_TIME_EXECUTOR_ENUM_VALUE 9999999
static char *CitusVersion = CITUS_VERSION;
static char *DeprecatedEmptyString = "";
static char *MitmfifoEmptyString = "";
/* deprecated GUC value that should not be used anywhere outside this file */
static int ReplicationModel = REPLICATION_MODEL_STREAMING;
@ -653,9 +654,17 @@ void
StartupCitusBackend(void)
{
InitializeMaintenanceDaemonBackend();
InitializeBackendData();
RegisterConnectionCleanup();
/*
* For queries this will be a no-op. But for background daemons we might
* still need to initialize the backend data. For those backaground daemons
* it doesn't really matter that we temporarily assign
* INVALID_CITUS_INTERNAL_BACKEND_GPID, since we override it again two
* lines below.
*/
InitializeBackendData(INVALID_CITUS_INTERNAL_BACKEND_GPID);
AssignGlobalPID();
RegisterConnectionCleanup();
}
@ -727,8 +736,8 @@ CitusCleanupConnectionsAtExit(int code, Datum arg)
DeallocateReservedConnections();
/* we don't want any monitoring view/udf to show already exited backends */
UnSetGlobalPID();
SetActiveMyBackend(false);
UnSetGlobalPID();
}
@ -820,6 +829,26 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.allow_unsafe_constraints",
gettext_noop("Enables unique constraints and exclusion constraints "
"that do not include a distribution column."),
gettext_noop("To enforce global uniqueness, Citus normally requires "
"that unique constraints and exclusion constraints contain "
"the distribution column. If the tuple does not include the "
"distribution column, Citus cannot ensure that the same value "
"is not present in another shard. However, in some cases the "
"index creator knows that uniqueness within the shard implies "
"global uniqueness (e.g. when indexing an expression derived "
"from the distribution column) and adding the distribution column "
"separately may not be desirable. This setting can then be used "
"to disable the check."),
&AllowUnsafeConstraints,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.allow_unsafe_locks_from_workers",
gettext_noop("Enables acquiring a distributed lock from a worker "
@ -1786,6 +1815,24 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
/*
* Previously we setting this configuration parameter
* in the fly for failure tests schedule.
* However, PG15 doesn't allow that anymore: reserved prefixes
* like "citus" cannot be used to set non-existing GUCs.
* Relevant PG commit: 88103567cb8fa5be46dc9fac3e3b8774951a2be7
*/
DefineCustomStringVariable(
"citus.mitmfifo",
gettext_noop("Sets the citus mitm fifo path for failure tests"),
gettext_noop("This GUC is only used for testing."),
&MitmfifoEmptyString,
"",
PGC_SUSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.multi_shard_modify_mode",
gettext_noop("Sets the connection type for multi shard modify queries"),
@ -2679,36 +2726,28 @@ CitusAuthHook(Port *port, int status)
"regular client connections",
MaxClientConnections)));
}
}
/*
* Right after this, before we assign global pid, this backend
* might get blocked by a DDL as that happens during parsing.
* Right after this, but before we assign global pid, this backend might
* get blocked by a DDL as that happens during parsing.
*
* That's why, lets mark the backend as an external backend
* which is likely to execute a distributed command.
* That's why, we now initialize its backend data, with the gpid.
*
* We do this so that this backend gets the chance to show
* up in citus_lock_waits.
* We do this so that this backend gets the chance to show up in
* citus_lock_waits.
*
* We cannot assign a new global PID yet here, because that
* would require reading from catalogs, but that's not allowed
* this early in the connection startup (because no database
* has been assigned yet).
* We cannot assign a new global PID yet here, because that would require
* reading from catalogs, but that's not allowed this early in the
* connection startup (because no database has been assigned yet).
*
* A second reason is for backends that never call StartupCitusBackend. For
* those we already set the global PID in the backend data here to be able
* to do blocked process detection on connections that are opened over a
* replication connection. A replication connection backend will never call
* StartupCitusBackend, which normally sets up the global PID.
*/
InitializeBackendData();
SetBackendDataDistributedCommandOriginator(true);
}
else
{
/*
* We set the global PID in the backend data here already to be able to
* do blocked process detection on connections that are opened over a
* replication connection. A replication connection backend will never
* call StartupCitusBackend, which normally sets up the global PID.
*/
InitializeBackendData();
SetBackendDataGlobalPID(gpid);
}
InitializeBackendData(gpid);
/* let other authentication hooks to kick in first */
if (original_client_auth_hook)

View File

@ -1,4 +1,6 @@
#include "udfs/citus_locks/11.1-1.sql"
#include "udfs/create_distributed_table_concurrently/11.1-1.sql"
#include "udfs/citus_internal_delete_partition_metadata/11.1-1.sql"
DROP FUNCTION pg_catalog.worker_create_schema(bigint,text);
DROP FUNCTION pg_catalog.worker_cleanup_job_schema_cache();
@ -73,6 +75,7 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
#include "udfs/replicate_reference_tables/11.1-1.sql"
#include "udfs/worker_split_shard_release_dsm/11.1-1.sql"
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text);
#include "udfs/isolate_tenant_to_new_shard/11.1-1.sql"

View File

@ -70,6 +70,7 @@ DROP FUNCTION pg_catalog.citus_split_shard_by_split_points(
shard_transfer_mode citus.shard_transfer_mode);
DROP FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint,
distribution_column text,
splitCopyInfos pg_catalog.split_copy_info[]);
DROP TYPE pg_catalog.split_copy_info;
@ -82,6 +83,8 @@ DROP FUNCTION pg_catalog.worker_split_shard_replication_setup(
DROP TYPE pg_catalog.split_shard_info;
DROP TYPE pg_catalog.replication_slot_info;
DROP FUNCTION pg_catalog.worker_split_shard_release_dsm();
DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4,
OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz,
OUT global_pid int8);
@ -95,6 +98,8 @@ DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode);
DROP FUNCTION pg_catalog.isolate_tenant_to_new_shard(table_name regclass, tenant_id "any", cascade_option text, shard_transfer_mode citus.shard_transfer_mode);
#include "../udfs/isolate_tenant_to_new_shard/8.0-1.sql"
DROP FUNCTION pg_catalog.create_distributed_table_concurrently;
DROP FUNCTION pg_catalog.citus_internal_delete_partition_metadata(regclass);
DROP TABLE pg_catalog.pg_dist_cleanup;
DROP SEQUENCE pg_catalog.pg_dist_operationid_seq;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_partition_metadata(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_partition_metadata(regclass) IS
'Deletes a row from pg_dist_partition with table ownership checks';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_partition_metadata(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_partition_metadata(regclass) IS
'Deletes a row from pg_dist_partition with table ownership checks';

View File

@ -0,0 +1,14 @@
CREATE FUNCTION pg_catalog.create_distributed_table_concurrently(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$create_distributed_table_concurrently$$;
COMMENT ON FUNCTION pg_catalog.create_distributed_table_concurrently(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text,
shard_count int)
IS 'creates a distributed table and avoids blocking writes';

View File

@ -0,0 +1,14 @@
CREATE FUNCTION pg_catalog.create_distributed_table_concurrently(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$create_distributed_table_concurrently$$;
COMMENT ON FUNCTION pg_catalog.create_distributed_table_concurrently(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text,
shard_count int)
IS 'creates a distributed table and avoids blocking writes';

View File

@ -14,9 +14,10 @@ ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog;
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint,
distribution_column text,
splitCopyInfos pg_catalog.split_copy_info[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[])
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, distribution_column text, splitCopyInfos pg_catalog.split_copy_info[])
IS 'Perform split copy for shard';

View File

@ -14,9 +14,10 @@ ALTER TYPE citus.split_copy_info SET SCHEMA pg_catalog;
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_copy(
source_shard_id bigint,
distribution_column text,
splitCopyInfos pg_catalog.split_copy_info[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_copy$$;
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, splitCopyInfos pg_catalog.split_copy_info[])
COMMENT ON FUNCTION pg_catalog.worker_split_copy(source_shard_id bigint, distribution_column text, splitCopyInfos pg_catalog.split_copy_info[])
IS 'Perform split copy for shard';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm()
IS 'Releases shared memory segment allocated by non-blocking split workflow';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_release_dsm()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_split_shard_release_dsm$$;
COMMENT ON FUNCTION pg_catalog.worker_split_shard_release_dsm()
IS 'Releases shared memory segment allocated by non-blocking split workflow';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_release_dsm() FROM PUBLIC;

View File

@ -671,13 +671,17 @@ TotalProcCount(void)
* the Citus extension is present, and after any subsequent invalidation of
* pg_dist_partition (see InvalidateMetadataSystemCache()).
*
* We only need to initialise MyBackendData once. The only goal here is to make
* We only need to initialise MyBackendData once. The main goal here is to make
* sure that we don't use the backend data from a previous backend with the same
* pgprocno. Resetting the backend data after a distributed transaction happens
* on COMMIT/ABORT through transaction callbacks.
*
* We do also initialize the distributedCommandOriginator and globalPID values
* based on these values. This is to make sure that once the backend date is
* initialized this backend can be correctly shown in citus_lock_waits.
*/
void
InitializeBackendData(void)
InitializeBackendData(uint64 globalPID)
{
if (MyBackendData != NULL)
{
@ -695,10 +699,27 @@ InitializeBackendData(void)
LockBackendSharedMemory(LW_EXCLUSIVE);
/* zero out the backend data */
/* zero out the backend its transaction id */
UnSetDistributedTransactionId();
UnSetGlobalPID();
SpinLockAcquire(&MyBackendData->mutex);
/*
* Use the given globalPID to initialize
*/
if (globalPID == INVALID_CITUS_INTERNAL_BACKEND_GPID)
{
MyBackendData->distributedCommandOriginator =
true;
}
else
{
MyBackendData->globalPID = globalPID;
MyBackendData->distributedCommandOriginator = false;
}
SpinLockRelease(&MyBackendData->mutex);
/*
* Signal that this backend is active and should show up
* on activity monitors.
@ -893,23 +914,6 @@ AssignGlobalPID(void)
}
/*
* SetBackendDataGlobalPID sets the global PID. This specifically does not read
* from catalog tables, because it should be safe to run from our
* authentication hook.
*/
void
SetBackendDataGlobalPID(uint64 globalPID)
{
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->globalPID = globalPID;
MyBackendData->distributedCommandOriginator = false;
SpinLockRelease(&MyBackendData->mutex);
}
/*
* SetBackendDataDistributedCommandOriginator is used to set the distributedCommandOriginator
* field on MyBackendData.

View File

@ -657,7 +657,7 @@ LogDistributedDeadlockDebugMessage(const char *errorMessage)
}
ereport(LOG, (errmsg("[%s] %s", timestamptz_to_str(GetCurrentTimestamp()),
ApplyLogRedaction(errorMessage))));
errorMessage)));
}

View File

@ -44,13 +44,7 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
const Oid *parameterTypes,
const char *const *parameterValues);
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,
const char *user);
static void GetConnectionsResults(List *connectionList, bool failOnError);
static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,
const char *command, const char *user,
bool
failOnError);
/*
* SendCommandToWorker sends a command to a particular worker as part of the
@ -238,129 +232,6 @@ SendCommandToMetadataWorkersParams(const char *command,
}
/*
* SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel.
* It does error if there is a problem while sending the query, but it doesn't error
* if there is a problem while executing the query.
*/
void
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user)
{
bool failOnError = false;
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
failOnError);
}
/*
* SendCommandToWorkersInParallel sends the given command to workers in parallel.
* It does error if there is a problem while sending the query, it errors if there
* was any problem when sending/receiving.
*/
void
SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
char *command,
const char *user)
{
bool failOnError = true;
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
failOnError);
}
/*
* SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel.
*/
static void
SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
char *command, const char *user, bool
failOnError)
{
List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user);
/* finish opening connections */
FinishConnectionListEstablishment(connectionList);
/* send commands in parallel */
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommand(connection, command);
if (failOnError && querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
GetConnectionsResults(connectionList, failOnError);
}
/*
* OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel,
* as the given user.
*/
static List *
OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user)
{
List *connectionList = NIL;
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int32 connectionFlags = OUTSIDE_TRANSACTION;
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
user, NULL);
/*
* connection can only be NULL for optional connections, which we don't
* support in this codepath.
*/
Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
Assert(connection != NULL);
connectionList = lappend(connectionList, connection);
}
return connectionList;
}
/*
* GetConnectionsResults gets remote command results
* for the given connections. It raises any error if failOnError is true.
*/
static void
GetConnectionsResults(List *connectionList, bool failOnError)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
bool raiseInterrupt = false;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
bool isResponseOK = result != NULL && IsResponseOK(result);
if (failOnError && !isResponseOK)
{
ReportResultError(connection, result, ERROR);
}
PQclear(result);
if (isResponseOK)
{
ForgetResults(connection);
}
}
}
/*
* SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The

View File

@ -114,7 +114,8 @@ IntegerArrayTypeToList(ArrayType *arrayObject)
for (int index = 0; index < arrayObjectCount; index++)
{
list = lappend_int(list, datumObjectArray[index]);
int32 intObject = DatumGetInt32(datumObjectArray[index]);
list = lappend_int(list, intObject);
}
return list;

View File

@ -198,7 +198,6 @@ CopyNodeMapMergeJob(COPYFUNC_ARGS)
copyJobInfo(&newnode->job, &from->job);
COPY_NODE_FIELD(reduceQuery);
COPY_SCALAR_FIELD(partitionType);
COPY_NODE_FIELD(partitionColumn);
COPY_SCALAR_FIELD(partitionCount);

View File

@ -401,7 +401,6 @@ OutMapMergeJob(OUTFUNC_ARGS)
WRITE_NODE_TYPE("MAPMERGEJOB");
OutJobFields(str, (Job *) node);
WRITE_NODE_FIELD(reduceQuery);
WRITE_ENUM_FIELD(partitionType, PartitionType);
WRITE_NODE_FIELD(partitionColumn);
WRITE_UINT_FIELD(partitionCount);

View File

@ -53,6 +53,7 @@ static void DeleteColocationGroup(uint32 colocationId);
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
static void BreakColocation(Oid sourceRelationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(mark_tables_colocated);
PG_FUNCTION_INFO_V1(get_colocated_shard_array);
@ -142,6 +143,17 @@ IsColocateWithNone(char *colocateWithTableName)
}
/*
* IsColocateWithDefault returns true if the given table is
* the special keyword "default".
*/
bool
IsColocateWithDefault(char *colocateWithTableName)
{
return pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0;
}
/*
* BreakColocation breaks the colocations of the given relation id.
* If t1, t2 and t3 are colocated and we call this function with t2,
@ -564,6 +576,39 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,
}
/*
* AcquireColocationDefaultLock serializes concurrent creation of a colocation entry
* for default group.
*/
void
AcquireColocationDefaultLock(void)
{
LOCKTAG tag;
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CREATE_COLOCATION_DEFAULT);
(void) LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait);
}
/*
* ReleaseColocationDefaultLock releases the lock for concurrent creation of a colocation entry
* for default group.
*/
void
ReleaseColocationDefaultLock(void)
{
LOCKTAG tag;
const bool sessionLock = false;
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CREATE_COLOCATION_DEFAULT);
LockRelease(&tag, ExclusiveLock, sessionLock);
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
@ -619,7 +664,7 @@ InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicatio
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
table_close(pgDistColocation, RowExclusiveLock);
table_close(pgDistColocation, NoLock);
}
@ -1271,5 +1316,110 @@ DeleteColocationGroupLocally(uint32 colocationId)
}
systable_endscan(scanDescriptor);
table_close(pgDistColocation, RowExclusiveLock);
table_close(pgDistColocation, NoLock);
}
/*
* FindColocateWithColocationId tries to find a colocation ID for a given
* colocate_with clause passed to create_distributed_table.
*/
uint32
FindColocateWithColocationId(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid distributionColumnCollation,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName)
{
uint32 colocationId = INVALID_COLOCATION_ID;
if (IsColocateWithDefault(colocateWithTableName))
{
/* check for default colocation group */
colocationId = ColocationId(shardCount, ShardReplicationFactor,
distributionColumnType,
distributionColumnCollation);
/*
* if the shardCount is strict then we check if the shard count
* of the colocated table is actually shardCount
*/
if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
{
Oid colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
CitusTableCacheEntry *cacheEntry =
GetCitusTableCacheEntry(colocatedTableId);
int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
if (colocatedTableShardCount != shardCount)
{
colocationId = INVALID_COLOCATION_ID;
}
}
}
}
else if (!IsColocateWithNone(colocateWithTableName))
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, sourceRelationId);
colocationId = TableColocationId(sourceRelationId);
}
return colocationId;
}
/*
* EnsureTableCanBeColocatedWith checks whether a given replication model and
* distribution column type is suitable to distribute a table to be colocated
* with given source table.
*
* We only pass relationId to provide meaningful error messages.
*/
void
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId)
{
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
if (sourceReplicationModel != replicationModel)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Replication models don't match for %s and %s.",
sourceRelationName, relationName)));
}
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
relationName)));
}
}

View File

@ -22,35 +22,6 @@
static bool FileIsLink(const char *filename, struct stat filestat);
/*
* CacheDirectoryElement takes in a filename, and checks if this name lives in
* the directory path that is used for job, task, table etc. files.
*/
bool
CacheDirectoryElement(const char *filename)
{
bool directoryElement = false;
StringInfo directoryPath = makeStringInfo();
appendStringInfo(directoryPath, "base/%s/", PG_JOB_CACHE_DIR);
char *directoryPathFound = strstr(filename, directoryPath->data);
/*
* If directoryPath occurs at the beginning of the filename, then the
* pointers should now be equal.
*/
if (directoryPathFound == filename)
{
directoryElement = true;
}
pfree(directoryPath);
return directoryElement;
}
/*
* CitusCreateDirectory creates a new directory with the given directory name.
*/

View File

@ -24,6 +24,7 @@
#include "nodes/nodes.h"
#include "nodes/primnodes.h"
#include "parser/scansup.h"
#include "parser/parse_relation.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
@ -123,7 +124,7 @@ column_to_column_name(PG_FUNCTION_ARGS)
Var *
BuildDistributionKeyFromColumnName(Oid relationId, char *columnName, LOCKMODE lockMode)
{
Relation relation = try_relation_open(relationId, ExclusiveLock);
Relation relation = try_relation_open(relationId, lockMode);
if (relation == NULL)
{
@ -172,6 +173,76 @@ BuildDistributionKeyFromColumnName(Oid relationId, char *columnName, LOCKMODE lo
}
/*
* EnsureValidDistributionColumn Errors out if the
* specified column does not exist or is not suitable to be used as a
* distribution column. It does not hold locks.
*/
void
EnsureValidDistributionColumn(Oid relationId, char *columnName)
{
Relation relation = try_relation_open(relationId, AccessShareLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("relation does not exist")));
}
char *tableName = get_rel_name(relationId);
/* it'd probably better to downcase identifiers consistent with SQL case folding */
truncate_identifier(columnName, strlen(columnName), true);
/* lookup column definition */
HeapTuple columnTuple = SearchSysCacheAttName(relationId, columnName);
if (!HeapTupleIsValid(columnTuple))
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
columnName, tableName)));
}
Form_pg_attribute columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple);
/* check if the column may be referenced in the distribution key */
if (columnForm->attnum <= 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot reference system column \"%s\" in relation \"%s\"",
columnName, tableName)));
}
ReleaseSysCache(columnTuple);
relation_close(relation, AccessShareLock);
}
/*
* ColumnTypeIdForRelationColumnName returns type id for the given relation's column name.
*/
Oid
ColumnTypeIdForRelationColumnName(Oid relationId, char *columnName)
{
Assert(columnName != NULL);
AttrNumber attNum = get_attnum(relationId, columnName);
if (attNum == InvalidAttrNumber)
{
ereport(ERROR, (errmsg("invalid attr %s", columnName)));
}
Relation relation = relation_open(relationId, AccessShareLock);
Oid typeId = attnumTypeId(relation, attNum);
relation_close(relation, AccessShareLock);
return typeId;
}
/*
* ColumnToColumnName returns the human-readable name of a column given a
* relation identifier and the column's internal (Var) representation.

View File

@ -0,0 +1,139 @@
/*-------------------------------------------------------------------------
*
* distribution_column_map.c
* Implementation of a relation OID to distribution column map.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "common/hashfn.h"
#include "distributed/distribution_column.h"
#include "distributed/listutils.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/utils/distribution_column_map.h"
#include "nodes/primnodes.h"
/*
* RelationIdDistributionColumnMapEntry is used to map relation IDs to
* distribution column Vars.
*/
typedef struct RelationIdDistributionColumnMapEntry
{
/* OID of the relation */
Oid relationId;
/* a Var describing the distribution column */
Var *distributionColumn;
} RelationIdDistributionColumnMapEntry;
/*
* CreateDistributionColumnMap creates an empty (OID -> distribution column Var) map.
*/
DistributionColumnMap *
CreateDistributionColumnMap(void)
{
HASHCTL info = { 0 };
info.keysize = sizeof(Oid);
info.entrysize = sizeof(RelationIdDistributionColumnMapEntry);
info.hash = oid_hash;
info.hcxt = CurrentMemoryContext;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
HTAB *distributionColumnMap = hash_create("Distribution Column Map", 32,
&info, hashFlags);
return distributionColumnMap;
}
/*
* AddDistributionColumnForRelation adds the given OID and its distribution column
* to the hash, as well as any child partitions.
*/
void
AddDistributionColumnForRelation(DistributionColumnMap *distributionColumnMap,
Oid relationId,
char *distributionColumnName)
{
bool entryFound = false;
RelationIdDistributionColumnMapEntry *entry =
hash_search(distributionColumnMap, &relationId, HASH_ENTER, &entryFound);
Assert(!entryFound);
entry->distributionColumn =
BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock);
if (PartitionedTable(relationId))
{
/*
* Recursively add partitions as well.
*/
List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid;
foreach_oid(partitionRelationId, partitionList)
{
AddDistributionColumnForRelation(distributionColumnMap, partitionRelationId,
distributionColumnName);
}
}
}
/*
* GetDistributionColumnFromMap returns the distribution column for a given
* relation ID from the distribution column map.
*/
Var *
GetDistributionColumnFromMap(DistributionColumnMap *distributionColumnMap,
Oid relationId)
{
bool entryFound = false;
RelationIdDistributionColumnMapEntry *entry =
hash_search(distributionColumnMap, &relationId, HASH_FIND, &entryFound);
if (entryFound)
{
return entry->distributionColumn;
}
else
{
return NULL;
}
}
/*
* GetDistributionColumnWithOverrides returns the distribution column for a given
* relation from the distribution column overrides map, or the metadata if no
* override is specified.
*/
Var *
GetDistributionColumnWithOverrides(Oid relationId,
DistributionColumnMap *distributionColumnOverrides)
{
Var *distributionColumn = NULL;
if (distributionColumnOverrides != NULL)
{
distributionColumn = GetDistributionColumnFromMap(distributionColumnOverrides,
relationId);
if (distributionColumn != NULL)
{
return distributionColumn;
}
}
/* no override defined, use distribution column from metadata */
return DistPartitionKey(relationId);
}

View File

@ -8,6 +8,19 @@
*-------------------------------------------------------------------------
*/
/*
* Make sure that functions marked as deprecated in OpenSSL 3.0 don't trigger
* deprecation warnings by indicating that we're using the OpenSSL 1.0.1
* compatibile API. Postgres does this by already in PG14, so we should not do
* it otherwise we get warnings about redefining this value.
*/
#if PG_VERSION_NUM < PG_VERSION_14
#ifndef OPENSSL_API_COMPAT
#define OPENSSL_API_COMPAT 0x1000100L
#endif
#endif
#include "postgres.h"
#include "distributed/connection_management.h"

View File

@ -39,14 +39,3 @@ IsLoggableLevel(int logLevel)
{
return log_min_messages <= logLevel || client_min_messages <= logLevel;
}
/*
* HashLogMessage is deprecated and doesn't do anything anymore. Its indirect
* usage will be removed later.
*/
char *
HashLogMessage(const char *logText)
{
return (char *) logText;
}

View File

@ -996,11 +996,19 @@ IsParentTable(Oid relationId)
systable_endscan(scan);
table_close(pgInherits, AccessShareLock);
if (tableInherited && PartitionedTable(relationId))
Relation relation = try_relation_open(relationId, AccessShareLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("relation with OID %u does not exist", relationId)));
}
if (tableInherited && PartitionedTableNoLock(relationId))
{
tableInherited = false;
}
relation_close(relation, AccessShareLock);
return tableInherited;
}
@ -1291,3 +1299,29 @@ PartitionBound(Oid partitionId)
return partitionBoundString;
}
/*
* ListShardsUnderParentRelation returns a list of ShardInterval for every
* shard under a given relation, meaning it includes the shards of child
* tables in a partitioning hierarchy.
*/
List *
ListShardsUnderParentRelation(Oid relationId)
{
List *shardList = LoadShardIntervalList(relationId);
if (PartitionedTable(relationId))
{
List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid;
foreach_oid(partitionRelationId, partitionList)
{
List *childShardList = ListShardsUnderParentRelation(partitionRelationId);
shardList = list_concat(shardList, childShardList);
}
}
return shardList;
}

View File

@ -108,7 +108,13 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
uint64 shardId = INVALID_SHARD_ID;
List *newWorkersList = NIL;
const char *referenceTableName = NULL;
int colocationId = CreateReferenceTableColocationId();
int colocationId = GetReferenceTableColocationId();
if (colocationId == INVALID_COLOCATION_ID)
{
/* we have no reference table yet. */
return;
}
/*
* Most of the time this function should result in a conclusion where we do not need

View File

@ -431,7 +431,7 @@ ParseTreeRawStmt(const char *ddlCommand)
/* log immediately if dictated by log statement */
if (check_log_statement(parseTreeList))
{
ereport(LOG, (errmsg("statement: %s", ApplyLogRedaction(ddlCommand)),
ereport(LOG, (errmsg("statement: %s", ddlCommand),
errhidestmt(true)));
}

View File

@ -50,7 +50,7 @@ extern void BackendManagementShmemInit(void);
extern size_t BackendManagementShmemSize(void);
extern void InitializeBackendManagement(void);
extern int TotalProcCount(void);
extern void InitializeBackendData(void);
extern void InitializeBackendData(uint64 globalPID);
extern void LockBackendSharedMemory(LWLockMode lockMode);
extern void UnlockBackendSharedMemory(void);
extern void UnSetDistributedTransactionId(void);

View File

@ -36,6 +36,7 @@ extern void InsertColocationGroupLocally(uint32 colocationId, int shardCount,
Oid distributionColumnType,
Oid distributionColumnCollation);
extern bool IsColocateWithNone(char *colocateWithTableName);
extern bool IsColocateWithDefault(char *colocateWithTableName);
extern uint32 GetNextColocationId(void);
extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
@ -48,5 +49,15 @@ extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colo
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
extern void DeleteColocationGroupLocally(uint32 colocationId);
extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid distributionColumnCollation,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName);
extern void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid sourceRelationId);
extern void AcquireColocationDefaultLock(void);
extern void ReleaseColocationDefaultLock(void);
#endif /* COLOCATION_UTILS_H_ */

View File

@ -27,6 +27,12 @@ extern bool AddAllLocalTablesToMetadata;
/* controlled via GUC, should be accessed via EnableLocalReferenceForeignKeys() */
extern bool EnableLocalReferenceForeignKeys;
/*
* GUC that controls whether to allow unique/exclude constraints without
* distribution column.
*/
extern bool AllowUnsafeConstraints;
extern bool EnableUnsafeTriggers;
extern int MaxMatViewSizeToAutoRecreate;
@ -252,6 +258,8 @@ extern void ErrorIfUnsupportedForeignConstraintExists(Relation relation,
char distributionMethod,
Var *distributionColumn,
uint32 colocationId);
extern void EnsureNoFKeyFromTableType(Oid relationId, int tableTypeFlag);
extern void EnsureNoFKeyToTableType(Oid relationId, int tableTypeFlag);
extern void ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId);
extern bool ColumnReferencedByAnyForeignKey(char *columnName, Oid relationId);
extern bool ColumnAppearsInForeignKey(char *columnName, Oid relationId);
@ -264,7 +272,6 @@ extern List * GetForeignConstraintFromDistributedTablesCommands(Oid relationId);
extern List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
extern bool AnyForeignKeyDependsOnIndex(Oid indexId);
extern bool HasForeignKeyWithLocalTable(Oid relationId);
extern bool HasForeignKeyToCitusLocalTable(Oid relationId);
extern bool HasForeignKeyToReferenceTable(Oid relationOid);
extern List * GetForeignKeysFromLocalTables(Oid relationId);
extern bool TableReferenced(Oid relationOid);

View File

@ -104,7 +104,4 @@ extern void ResetConstraintDropped(void);
extern void ExecuteDistributedDDLJob(DDLJob *ddlJob);
extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options);
/* forward declarations for sending custom commands to a distributed table */
extern DDLJob * CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command);
#endif /* MULTI_UTILITY_H */

View File

@ -23,5 +23,7 @@ extern Var * BuildDistributionKeyFromColumnName(Oid relationId,
char *columnName,
LOCKMODE lockMode);
extern char * ColumnToColumnName(Oid relationId, Node *columnNode);
extern Oid ColumnTypeIdForRelationColumnName(Oid relationId, char *columnName);
extern void EnsureValidDistributionColumn(Oid relationId, char *columnName);
#endif /* DISTRIBUTION_COLUMN_H */

View File

@ -19,10 +19,6 @@
extern bool EnableUnsupportedFeatureMessages;
extern bool IsLoggableLevel(int logLevel);
extern char * HashLogMessage(const char *text);
#define ApplyLogRedaction(text) \
(log_min_messages <= ereport_loglevel ? HashLogMessage(text) : text)
#undef ereport

View File

@ -143,6 +143,7 @@ extern List * AllCitusTableIds(void);
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
CitusTableType tableType);
extern char * GetTableTypeName(Oid tableId);
extern void SetCreateCitusTransactionLevel(int val);
extern int GetCitusCreationLevel(void);
@ -152,6 +153,7 @@ extern char PgDistPartitionViaCatalog(Oid relationId);
extern List * LookupDistShardTuples(Oid relationId);
extern char PartitionMethodViaCatalog(Oid relationId);
extern Var * PartitionColumnViaCatalog(Oid relationId);
extern uint32 ColocationIdViaCatalog(Oid relationId);
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
@ -180,7 +182,6 @@ extern void FlushDistTableCache(void);
extern void InvalidateMetadataSystemCache(void);
extern List * CitusTableTypeIdList(CitusTableType citusTableType);
extern Datum DistNodeMetadata(void);
extern bool ClusterHasReferenceTable(void);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
@ -260,8 +261,6 @@ extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusAnyValueFunctionId(void);
extern Oid CitusTextSendAsJsonbFunctionId(void);
extern Oid TextOutFunctionId(void);
extern Oid PgTableVisibleFuncId(void);
extern Oid CitusTableVisibleFuncId(void);
extern Oid RelationIsAKnownShardFuncId(void);
extern Oid JsonbExtractPathFuncId(void);
extern Oid JsonbExtractPathTextFuncId(void);

View File

@ -69,6 +69,7 @@ extern char * MarkObjectsDistributedCreateCommand(List *addresses,
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
extern char * DistributionDeleteCommand(const char *schemaName,
const char *tableName);
extern char * DistributionDeleteMetadataCommand(Oid relationId);
extern char * TableOwnerResetCommand(Oid distributedRelationId);
extern char * NodeListInsertCommand(List *workerNodeList);
extern List * ShardListInsertCommand(List *shardIntervalList);

View File

@ -248,6 +248,10 @@ extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId,
char replicationModel, bool autoConverted);
extern void UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted);
extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId);
extern void UpdateDistributionColumn(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
@ -276,7 +280,6 @@ extern Oid TableOwnerOid(Oid relationId);
extern char * TableOwner(Oid relationId);
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
extern void EnsureTableOwner(Oid relationId);
extern void EnsureSchemaOwner(Oid schemaId);
extern void EnsureHashDistributedTable(Oid relationId);
extern void EnsureFunctionOwner(Oid functionId);
extern void EnsureSuperUser(void);

View File

@ -30,5 +30,6 @@ extern char * GeneratePartitioningInformation(Oid tableId);
extern void FixPartitionConstraintsOnWorkers(Oid relationId);
extern void FixLocalPartitionConstraints(Oid relationId, int64 shardId);
extern void FixPartitionShardIndexNames(Oid relationId, Oid parentIndexOid);
extern List * ListShardsUnderParentRelation(Oid relationId);
#endif /* MULTI_PARTITIONING_UTILS_H_ */

View File

@ -160,7 +160,6 @@ typedef struct Job
typedef struct MapMergeJob
{
Job job;
Query *reduceQuery;
PartitionType partitionType;
Var *partitionColumn;
uint32 partitionCount;
@ -551,7 +550,6 @@ extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
extern Node * BuildBaseConstraint(Var *column);
extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand);
extern bool SimpleOpExpression(Expr *clause);
/* helper functions */
extern Var * MakeInt4Column(void);

View File

@ -14,5 +14,6 @@
#define PG_VERSION_13 130000
#define PG_VERSION_14 140000
#define PG_VERSION_15 150000
#define PG_VERSION_16 160000
#endif /* PG_VERSION_CONSTANTS */

View File

@ -46,6 +46,6 @@ extern DeferredErrorMessage * DeferErrorIfCannotPushdownSubquery(Query *subquery
bool
outerMostQueryHasLimit);
extern DeferredErrorMessage * DeferErrorIfUnsupportedUnionQuery(Query *queryTree);
extern bool IsJsonTableRTE(RangeTblEntry *rte);
#endif /* QUERY_PUSHDOWN_PLANNING_H */

View File

@ -17,3 +17,4 @@ extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode,
List *shardIntervalList, char *snapshotName);
extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList);
extern bool RelationCanPublishAllModifications(Oid relationId);

View File

@ -49,8 +49,10 @@ typedef enum AdvisoryLocktagClass
typedef enum CitusOperations
{
CITUS_TRANSACTION_RECOVERY = 0,
CITUS_SHARD_MOVE = 1
CITUS_NONBLOCKING_SPLIT = 1,
CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY = 2,
CITUS_CREATE_COLOCATION_DEFAULT = 3,
CITUS_SHARD_MOVE = 4
} CitusOperations;
/* reuse advisory lock, but with different, unused field 4 (4)*/
@ -179,6 +181,8 @@ extern void SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lock
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
extern List * GetSortedReferenceShardIntervals(List *relationList);
void AcquireCreateDistributedTableConcurrentlyLock(Oid relationId);
/* Lock parent table's colocated shard resource */
extern void LockParentShardResourceIfPartition(List *shardIntervalList,
LOCKMODE lockMode);

View File

@ -12,6 +12,8 @@
#ifndef SHARDSPLIT_H_
#define SHARDSPLIT_H_
#include "distributed/utils/distribution_column_map.h"
/* Split Modes supported by Shard Split API */
typedef enum SplitMode
{
@ -28,10 +30,10 @@ typedef enum SplitMode
typedef enum SplitOperation
{
SHARD_SPLIT_API = 0,
ISOLATE_TENANT_TO_NEW_SHARD
ISOLATE_TENANT_TO_NEW_SHARD,
CREATE_DISTRIBUTED_TABLE
} SplitOperation;
/*
* SplitShard API to split a given shard (or shard group) using split mode and
* specified split points to a set of destination nodes.
@ -40,10 +42,15 @@ extern void SplitShard(SplitMode splitMode,
SplitOperation splitOperation,
uint64 shardIdToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
List *nodeIdsForPlacementList,
DistributionColumnMap *distributionColumnOverrides,
List *colocatedShardIntervalList,
uint32 targetColocationId);
extern void DropShardList(List *shardIntervalList);
extern SplitMode LookupSplitMode(Oid shardTransferModeOid);
extern void ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction(void);
#endif /* SHARDSPLIT_H_ */

View File

@ -47,4 +47,5 @@ extern void DropShardSplitPublications(MultiConnection *sourceConnection,
extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList);
extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection,
List *replicationSlotInfoList);
#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */

View File

@ -18,7 +18,6 @@
#define PG_JOB_CACHE_DIR "pgsql_job_cache"
extern bool CacheDirectoryElement(const char *filename);
extern void CleanupJobCacheDirectory(void);
extern void CitusCreateDirectory(StringInfo directoryName);
extern void CitusRemoveDirectory(const char *filename);

View File

@ -0,0 +1,32 @@
/*-------------------------------------------------------------------------
*
* distribution_column_map.h
* Declarations for a relation OID to distribution column hash.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef DISTRIBUTION_COLUMN_HASH_H
#define DISTRIBUTION_COLUMN_HASH_H
#include "postgres.h"
#include "nodes/primnodes.h"
#include "utils/hsearch.h"
typedef HTAB DistributionColumnMap;
extern DistributionColumnMap * CreateDistributionColumnMap(void);
extern void AddDistributionColumnForRelation(DistributionColumnMap *distributionColumns,
Oid relationId,
char *distributionColumnName);
extern Var * GetDistributionColumnFromMap(DistributionColumnMap *distributionColumnMap,
Oid relationId);
extern Var * GetDistributionColumnWithOverrides(Oid relationId,
DistributionColumnMap *overrides);
#endif /* DISTRIBUTION_COLUMN_HASH_H */

View File

@ -90,6 +90,7 @@ extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePor
extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void);
extern void EnsureCoordinatorIsInMetadata(void);
extern void InsertCoordinatorIfClusterEmpty(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePort);
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);

View File

@ -79,11 +79,6 @@ extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
const char *
nodeUser,
List *commandList);
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,
const char *command,
const char *user);
void SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet,
const char *command, const char *user);
extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
/* helper functions for worker transactions */

View File

@ -39,6 +39,8 @@ typedef Value String;
#define pgstat_init_relation(r) pgstat_initstats(r)
#define pg_analyze_and_rewrite_fixedparams(a, b, c, d, e) pg_analyze_and_rewrite(a, b, c, \
d, e)
#define boolVal(v) intVal(v)
#define makeBoolean(val) makeInteger(val)
static inline int64
pg_strtoint64(char *s)

View File

@ -10,6 +10,25 @@ subdir = src/test/columnar_freezing
top_builddir = ../../..
include $(top_builddir)/Makefile.global
# In PG15, Perl test modules have been moved to a new namespace
# new() and get_new_node() methods have been unified to 1 method: new()
# Relevant PG commits 201a76183e2056c2217129e12d68c25ec9c559c8
# b3b4d8e68ae83f432f43f035c7eb481ef93e1583
pg_version = $(shell $(PG_CONFIG) --version 2>/dev/null)
pg_whole_version = $(shell echo "$(pg_version)"| sed -e 's/^PostgreSQL \([0-9]*\)\(\.[0-9]*\)\{0,1\}\(.*\)/\1\2/')
pg_major_version = $(shell echo "$(pg_whole_version)"| sed -e 's/^\([0-9]\{2\}\)\(.*\)/\1/')
# for now, we only have a single test file
# due to the above explanation, we ended up separating the test paths for
# different versions. If you need to add new test files, be careful to add both versions
ifeq ($(pg_major_version),13)
test_path = t_pg13_pg14/*.pl
else ifeq ($(pg_major_version),14)
test_path = t_pg13_pg14/*.pl
else
test_path = t/*.pl
endif
# copied from pgxs/Makefile.global to use postgres' abs build dir for pg_regress
ifeq ($(enable_tap_tests),yes)
@ -23,7 +42,7 @@ PGPORT='6$(DEF_PGPORT)' \
top_builddir='$(CURDIR)/$(top_builddir)' \
PG_REGRESS='$(pgxsdir)/src/test/regress/pg_regress' \
TEMP_CONFIG='$(CURDIR)'/postgresql.conf \
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),t/*.pl)
$(PROVE) $(PG_PROVE_FLAGS) $(PROVE_FLAGS) $(if $(PROVE_TESTS),$(PROVE_TESTS),$(test_path))
endef
else

View File

@ -1,12 +1,12 @@
# Minimal test testing streaming replication
use strict;
use warnings;
use PostgresNode;
use TestLib;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More tests => 2;
# Initialize single node
my $node_one = get_new_node('node_one');
my $node_one = PostgreSQL::Test::Cluster->new('node_one');
$node_one->init();
$node_one->start;

Some files were not shown because too many files have changed in this diff Show More