Enable replication factor > 1 in metadata syncing (#5392)

- [x] Add some more regression test coverage
- [x] Make sure returning works fine in case of
     local execution + remote execution
     (task->partiallyLocalOrRemote works as expected, already added tests)
- [x] Implement locking properly (and add isolation tests)
     - [x] We do #shardcount round-trips on `SerializeNonCommutativeWrites`.
           We made it a single round-trip.
- [x] Acquire locks for subselects on the workers & add isolation tests
- [x] Add a GUC to prevent modification from the workers, hence increase the
      coordinator-only throughput
       - The performance slightly drops (~%15), unless
         `citus.allow_modifications_from_workers_to_replicated_tables`
         is set to false
pull/5465/head
Önder Kalacı 2021-11-15 13:10:18 +01:00 committed by GitHub
parent bbcf287f7e
commit 8c0bc94b51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 7956 additions and 333 deletions

View File

@ -1721,21 +1721,31 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
}
/* now, iterate on the tasks and acquire the executor locks on the shards */
List *anchorShardIntervalList = NIL;
List *relationRowLockList = NIL;
List *requiresConsistentSnapshotRelationShardList = NIL;
Task *task = NULL;
foreach_ptr(task, taskList)
{
/*
* If we are dealing with a partition we are also taking locks on parent table
* to prevent deadlocks on concurrent operations on a partition and its parent.
*/
LockParentShardResourceIfPartition(task->anchorShardId, lockMode);
ShardInterval *anchorShardInterval = LoadShardInterval(task->anchorShardId);
SerializeNonCommutativeWrites(list_make1(anchorShardInterval), lockMode);
anchorShardIntervalList = lappend(anchorShardIntervalList, anchorShardInterval);
/* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */
AcquireExecutorShardLocksForRelationRowLockList(task->relationRowLockList);
/*
* Due to PG commit 5ee190f8ec37c1bbfb3061e18304e155d600bc8e we copy the
* second parameter in pre-13.
*/
relationRowLockList =
list_concat(relationRowLockList,
#if (PG_VERSION_NUM >= PG_VERSION_12) && (PG_VERSION_NUM < PG_VERSION_13)
list_copy(task->relationRowLockList));
#else
task->relationRowLockList);
#endif
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
@ -1749,9 +1759,63 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
* concurrently.
*/
LockRelationShardResources(task->relationShardList, ExclusiveLock);
/*
* Due to PG commit 5ee190f8ec37c1bbfb3061e18304e155d600bc8e we copy the
* second parameter in pre-13.
*/
requiresConsistentSnapshotRelationShardList =
list_concat(requiresConsistentSnapshotRelationShardList,
#if (PG_VERSION_NUM >= PG_VERSION_12) && (PG_VERSION_NUM < PG_VERSION_13)
list_copy(task->relationShardList));
#else
task->relationShardList);
#endif
}
}
/*
* Acquire the locks in a sorted way to avoid deadlocks due to lock
* ordering across concurrent sessions.
*/
anchorShardIntervalList =
SortList(anchorShardIntervalList, CompareShardIntervalsById);
/*
* If we are dealing with a partition we are also taking locks on parent table
* to prevent deadlocks on concurrent operations on a partition and its parent.
*
* Note that this function currently does not acquire any remote locks as that
* is necessary to control the concurrency across multiple nodes for replicated
* tables. That is because Citus currently does not allow modifications to
* partitions from any node other than the coordinator.
*/
LockParentShardResourceIfPartition(anchorShardIntervalList, lockMode);
/* Acquire distribution execution locks on the affected shards */
SerializeNonCommutativeWrites(anchorShardIntervalList, lockMode);
if (relationRowLockList != NIL)
{
/* Acquire additional locks for SELECT .. FOR UPDATE on reference tables */
AcquireExecutorShardLocksForRelationRowLockList(relationRowLockList);
}
if (requiresConsistentSnapshotRelationShardList != NIL)
{
/*
* If the task has a subselect, then we may need to lock the shards from which
* the query selects as well to prevent the subselects from seeing different
* results on different replicas.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
LockRelationShardResources(requiresConsistentSnapshotRelationShardList,
ExclusiveLock);
}
}

View File

@ -56,6 +56,7 @@
*/
int MultiShardConnectionType = PARALLEL_CONNECTION;
bool WritableStandbyCoordinator = false;
bool AllowModificationsFromWorkersToReplicatedTables = true;
/*
* Pointer to bound parameters of the current ongoing call to ExecutorRun.

View File

@ -34,6 +34,7 @@
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/multi_executor.h"
#include "distributed/function_utils.h"
#include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h"
@ -309,6 +310,32 @@ EnsureModificationsCanRunOnRelation(Oid relationId)
{
EnsureModificationsCanRun();
if (!OidIsValid(relationId) || !IsCitusTable(relationId))
{
/* we are not interested in PG tables */
return;
}
bool modifiedTableReplicated =
IsCitusTableType(relationId, REFERENCE_TABLE) ||
!SingleReplicatedTable(relationId);
if (!IsCoordinator() && !AllowModificationsFromWorkersToReplicatedTables &&
modifiedTableReplicated)
{
ereport(ERROR, (errmsg("modifications via the worker nodes are not "
"allowed for replicated tables such as reference "
"tables or hash distributed tables with replication "
"factor greater than 1."),
errhint("All modifications to replicated tables should "
"happen via the coordinator unless "
"citus.allow_modifications_from_workers_to_replicated_tables "
" = true."),
errdetail("Allowing modifications from the worker nodes "
"requires extra locking which might decrease "
"the throughput.")));
}
/*
* Even if user allows writes from standby, we should not allow for
* replicated tables as they require 2PC. And, 2PC needs to write a log
@ -319,21 +346,15 @@ EnsureModificationsCanRunOnRelation(Oid relationId)
return;
}
if (!OidIsValid(relationId) || !IsCitusTable(relationId))
{
/* we are not interested in PG tables */
return;
}
if (IsCitusTableType(relationId, REFERENCE_TABLE) ||
!SingleReplicatedTable(relationId))
if (modifiedTableReplicated)
{
ereport(ERROR, (errmsg("writing to worker nodes is not currently "
"allowed for replicated tables such as reference "
"tables or hash distributed tables with replication "
"factor greater than 1."),
errhint("All modifications to replicated tables happen via 2PC, "
"and 2PC requires the database to be in a writable state."),
errhint("All modifications to replicated tables "
"happen via 2PC, and 2PC requires the "
"database to be in a writable state."),
errdetail("the database is read-only")));
}
}

View File

@ -367,9 +367,8 @@ ClusterHasKnownMetadataWorkers()
/*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is an MX table or reference table.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
* propagated to metadata workers, i.e. the table is a hash distributed table or
* reference/citus local table.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
@ -381,12 +380,8 @@ ShouldSyncTableMetadata(Oid relationId)
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
bool streamingReplicated =
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
HASH_DISTRIBUTED));
if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{
return true;
}
@ -2182,15 +2177,6 @@ EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int coloc
"known replication models.")));
}
if (distributionMethod == DISTRIBUTE_BY_HASH &&
replicationModel != REPLICATION_MODEL_STREAMING)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Hash distributed tables can only have '%c' "
"as the replication model.",
REPLICATION_MODEL_STREAMING)));
}
if (distributionMethod == DISTRIBUTE_BY_NONE &&
!(replicationModel == REPLICATION_MODEL_STREAMING ||
replicationModel == REPLICATION_MODEL_2PC))

View File

@ -562,6 +562,23 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.allow_modifications_from_workers_to_replicated_tables",
gettext_noop("Enables modifications from workers to replicated "
"tables such as reference tables or hash "
"distributed tables with replication factor "
"greater than 1."),
gettext_noop("Allowing modifications from the worker nodes "
"requires extra locking which might decrease "
"the throughput. Disabling this GUC skips the "
"extra locking and prevents modifications from "
"worker nodes."),
&AllowModificationsFromWorkersToReplicatedTables,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.binary_worker_copy_format",
gettext_noop("Use the binary worker copy format."),

View File

@ -26,6 +26,7 @@
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/distributed_planner.h"
@ -71,6 +72,8 @@ static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map)
/* local function forward declarations */
static LOCKMODE IntToLockMode(int mode);
static void LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode);
static bool AnyTableReplicated(List *shardIntervalList,
List **replicatedShardIntervalList);
static void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
static void LockShardListResourcesOnFirstWorker(LOCKMODE lockmode,
List *shardIntervalList);
@ -244,12 +247,27 @@ lock_shard_resources(PG_FUNCTION_ARGS)
static void
LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
{
if (!AllowModificationsFromWorkersToReplicatedTables)
{
/*
* Allowing modifications from worker nodes for replicated tables requires
* to serialize modifications, see AcquireExecutorShardLocksForExecution()
* for the details.
*
* If the user opted for disabling modifications from the workers, we do not
* need to acquire these remote locks. Returning early saves us from an additional
* network round-trip.
*/
Assert(AnyTableReplicated(shardIntervalList, NULL));
return;
}
StringInfo lockCommand = makeStringInfo();
int processedShardIntervalCount = 0;
int totalShardIntervalCount = list_length(shardIntervalList);
WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
int connectionFlags = 0;
const char *superuser = CurrentUserName();
const char *currentUser = CurrentUserName();
appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode);
@ -282,7 +300,7 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
->workerName,
firstWorkerNode
->workerPort,
superuser,
currentUser,
NULL);
/* the SELECT .. FOR UPDATE breaks if we lose the connection */
@ -717,28 +735,81 @@ LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode)
void
SerializeNonCommutativeWrites(List *shardIntervalList, LOCKMODE lockMode)
{
ShardInterval *firstShardInterval = (ShardInterval *) linitial(shardIntervalList);
int64 firstShardId = firstShardInterval->shardId;
if (shardIntervalList == NIL)
{
return;
}
if (ReferenceTableShardId(firstShardId))
List *replicatedShardList = NIL;
if (AnyTableReplicated(shardIntervalList, &replicatedShardList))
{
if (ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode())
{
LockShardListResourcesOnFirstWorker(lockMode, shardIntervalList);
LockShardListResourcesOnFirstWorker(lockMode, replicatedShardList);
}
/*
* Referenced tables can cascade their changes to this table, and we
* want to serialize changes to keep different replicas consistent.
*/
LockReferencedReferenceShardResources(firstShardId, lockMode);
ShardInterval *firstShardInterval =
(ShardInterval *) linitial(replicatedShardList);
if (ReferenceTableShardId(firstShardInterval->shardId))
{
/*
* Referenced tables can cascade their changes to this table, and we
* want to serialize changes to keep different replicas consistent.
*
* We currently only support foreign keys to reference tables, which are
* single shard. So, getting the first shard should be sufficient here.
*/
LockReferencedReferenceShardResources(firstShardInterval->shardId, lockMode);
}
}
LockShardListResources(shardIntervalList, lockMode);
}
/*
* AnyTableReplicated iterates on the shard list and returns true
* if any of the shard is a replicated table. We qualify replicated
* tables as any reference table or any distributed table with
* replication factor > 1.
*
* If the optional replicatedShardIntervalList is passed, the function
* fills it with the replicated shard intervals.
*/
static bool
AnyTableReplicated(List *shardIntervalList, List **replicatedShardIntervalList)
{
if (replicatedShardIntervalList == NULL)
{
/* the caller is not interested in the replicatedShardIntervalList */
List *localList = NIL;
replicatedShardIntervalList = &localList;
}
*replicatedShardIntervalList = NIL;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
int64 shardId = shardInterval->shardId;
Oid relationId = RelationIdForShard(shardId);
if (ReferenceTableShardId(shardId))
{
*replicatedShardIntervalList =
lappend(*replicatedShardIntervalList, LoadShardInterval(shardId));
}
else if (!SingleReplicatedTable(relationId))
{
*replicatedShardIntervalList =
lappend(*replicatedShardIntervalList, LoadShardInterval(shardId));
}
}
return list_length(*replicatedShardIntervalList) > 0;
}
/*
* LockShardListResources takes locks on all shards in shardIntervalList to
* prevent concurrent DML statements on those shards.
@ -766,19 +837,25 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode)
void
LockRelationShardResources(List *relationShardList, LOCKMODE lockMode)
{
/* lock shards in a consistent order to prevent deadlock */
relationShardList = SortList(relationShardList, CompareRelationShards);
if (relationShardList == NIL)
{
return;
}
List *shardIntervalList = NIL;
RelationShard *relationShard = NULL;
foreach_ptr(relationShard, relationShardList)
{
uint64 shardId = relationShard->shardId;
if (shardId != INVALID_SHARD_ID)
{
LockShardResource(shardId, lockMode);
}
ShardInterval *shardInterval = LoadShardInterval(shardId);
shardIntervalList = lappend(shardIntervalList, shardInterval);
}
/* lock shards in a consistent order to prevent deadlock */
shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById);
SerializeNonCommutativeWrites(shardIntervalList, lockMode);
}
@ -788,19 +865,29 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode)
* shard resource lock on the colocated shard of the parent table.
*/
void
LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode)
LockParentShardResourceIfPartition(List *shardIntervalList, LOCKMODE lockMode)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
List *parentShardIntervalList = NIL;
if (PartitionTable(relationId))
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
int shardIndex = ShardIndex(shardInterval);
Oid parentRelationId = PartitionParentOid(relationId);
uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
Oid relationId = shardInterval->relationId;
LockShardResource(parentShardId, lockMode);
if (PartitionTable(relationId))
{
int shardIndex = ShardIndex(shardInterval);
Oid parentRelationId = PartitionParentOid(relationId);
uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId,
shardIndex);
ShardInterval *parentShardInterval = LoadShardInterval(parentShardId);
parentShardIntervalList = lappend(parentShardIntervalList,
parentShardInterval);
}
}
LockShardListResources(parentShardIntervalList, lockMode);
}

View File

@ -63,6 +63,7 @@ typedef struct TransactionProperties
extern int MultiShardConnectionType;
extern bool WritableStandbyCoordinator;
extern bool AllowModificationsFromWorkersToReplicatedTables;
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern int ExecutorSlowStartInterval;

View File

@ -145,7 +145,8 @@ extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMod
extern List * GetSortedReferenceShardIntervals(List *relationList);
/* Lock parent table's colocated shard resource */
extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode);
extern void LockParentShardResourceIfPartition(List *shardIntervalList,
LOCKMODE lockMode);
/* Lock mode translation between text and enum */
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);

View File

@ -316,7 +316,7 @@ class CitusSingleNodeSingleShardClusterConfig(CitusDefaultClusterConfig):
common.coordinator_should_haveshards(self.bindir, self.coordinator_port())
class CitusShardReplicationFactorClusterConfig(CitusDefaultClusterConfig):
class CitusShardReplicationFactorClusterConfig(CitusMXBaseClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)
self.new_settings = {"citus.shard_replication_factor": 2}

View File

@ -19,6 +19,11 @@ step s1-commit:
COMMIT;
step s2-update: <... completed>
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-insert s2-update
master_create_worker_shards
@ -32,6 +37,11 @@ step s1-insert:
step s2-update:
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-multi-insert s2-update s1-commit
master_create_worker_shards
@ -52,6 +62,11 @@ step s1-commit:
COMMIT;
step s2-update: <... completed>
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-multi-insert s2-multi-insert-overlap s1-commit
master_create_worker_shards
@ -71,6 +86,11 @@ step s2-multi-insert-overlap:
step s1-commit:
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s1-multi-insert s2-multi-insert s1-commit s2-commit
master_create_worker_shards
@ -96,3 +116,8 @@ step s1-commit:
step s2-commit:
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -44,6 +44,11 @@ step s2-update: <... completed>
step s2-abort:
ABORT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s3-begin s1-update s2-update s3-update detector-dump-wait-edges s1-abort s2-abort s3-abort
step s1-begin:
@ -102,3 +107,8 @@ step s3-update: <... completed>
step s3-abort:
ABORT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -17,6 +17,11 @@ count
15
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-router-select s1-commit s1-select-count
create_distributed_table
@ -40,6 +45,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-real-time-select s1-commit s1-select-count
create_distributed_table
@ -67,6 +77,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-adaptive-select s1-commit s1-select-count
create_distributed_table
@ -78,8 +93,8 @@ step s1-initialize: COPY hash_copy FROM PROGRAM 'echo 0, a, 0 && echo 1, b, 1 &&
step s1-begin: BEGIN;
step s1-copy: COPY hash_copy FROM PROGRAM 'echo 5, f, 5 && echo 6, g, 6 && echo 7, h, 7 && echo 8, i, 8 && echo 9, j, 9' WITH CSV;
step s2-adaptive-select:
SET citus.enable_repartition_joins TO ON;
SELECT * FROM hash_copy AS t1 JOIN hash_copy AS t2 ON t1.id = t2.int_data ORDER BY 1, 2, 3, 4;
SET citus.enable_repartition_joins TO ON;
SELECT * FROM hash_copy AS t1 JOIN hash_copy AS t2 ON t1.id = t2.int_data ORDER BY 1, 2, 3, 4;
id|data|int_data|id|data|int_data
---------------------------------------------------------------------
@ -97,6 +112,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-insert s1-commit s1-select-count
create_distributed_table
@ -115,6 +135,11 @@ count
11
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-insert-select s1-commit s1-select-count
create_distributed_table
@ -133,6 +158,11 @@ count
15
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-update s1-commit s1-select-count
create_distributed_table
@ -151,6 +181,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-delete s1-commit s1-select-count
create_distributed_table
@ -169,6 +204,11 @@ count
9
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-truncate s1-commit s1-select-count
create_distributed_table
@ -188,6 +228,11 @@ count
0
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-drop s1-commit s1-select-count
create_distributed_table
@ -203,6 +248,11 @@ step s1-commit: COMMIT;
step s2-drop: <... completed>
step s1-select-count: SELECT COUNT(*) FROM hash_copy;
ERROR: relation "hash_copy" does not exist
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-ddl-create-index s1-commit s1-select-count s1-show-indexes
create_distributed_table
@ -229,6 +279,11 @@ run_command_on_workers
(localhost,57638,t,2)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-ddl-create-index s1-begin s1-copy s2-ddl-drop-index s1-commit s1-select-count s1-show-indexes
create_distributed_table
@ -256,6 +311,11 @@ run_command_on_workers
(localhost,57638,t,0)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-ddl-create-index-concurrently s1-commit s1-select-count s1-show-indexes
create_distributed_table
@ -282,6 +342,11 @@ run_command_on_workers
(localhost,57638,t,2)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-ddl-add-column s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -308,6 +373,11 @@ run_command_on_workers
(localhost,57638,t,new_column)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-ddl-add-column s1-begin s1-copy-additional-column s2-ddl-drop-column s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -335,6 +405,11 @@ run_command_on_workers
(localhost,57638,t,"")
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-ddl-rename-column s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -361,6 +436,11 @@ run_command_on_workers
(localhost,57638,t,new_column)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-table-size s1-commit s1-select-count
create_distributed_table
@ -384,6 +464,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-master-modify-multiple-shards s1-commit s1-select-count
create_distributed_table
@ -402,6 +487,11 @@ count
5
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-copy s2-master-drop-all-shards s1-commit s1-select-count
create_distributed_table
@ -426,6 +516,11 @@ count
0
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-drop s1-create-non-distributed-table s1-initialize s1-begin s1-copy s2-distribute-table s1-commit s1-select-count
create_distributed_table
@ -452,6 +547,11 @@ count
15
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-recreate-with-replication-2 s1-initialize s1-begin s1-copy s2-update s1-commit s1-select-count
create_distributed_table
@ -460,10 +560,10 @@ create_distributed_table
(1 row)
step s1-recreate-with-replication-2:
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -482,6 +582,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-recreate-with-replication-2 s1-initialize s1-begin s1-copy s2-delete s1-commit s1-select-count
create_distributed_table
@ -490,10 +595,10 @@ create_distributed_table
(1 row)
step s1-recreate-with-replication-2:
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -512,6 +617,11 @@ count
9
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-recreate-with-replication-2 s1-initialize s1-begin s1-copy s2-insert-select s1-commit s1-select-count
create_distributed_table
@ -520,10 +630,10 @@ create_distributed_table
(1 row)
step s1-recreate-with-replication-2:
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -542,6 +652,11 @@ count
20
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-recreate-with-replication-2 s1-initialize s1-begin s1-copy s2-master-modify-multiple-shards s1-commit s1-select-count
create_distributed_table
@ -550,10 +665,10 @@ create_distributed_table
(1 row)
step s1-recreate-with-replication-2:
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
DROP TABLE hash_copy;
SET citus.shard_replication_factor TO 2;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -572,6 +687,11 @@ count
0
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-router-select s2-copy s1-commit s1-select-count
create_distributed_table
@ -595,6 +715,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-real-time-select s2-copy s1-commit s1-select-count
create_distributed_table
@ -622,6 +747,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-adaptive-select s2-copy s1-commit s1-select-count
create_distributed_table
@ -632,8 +762,8 @@ create_distributed_table
step s1-initialize: COPY hash_copy FROM PROGRAM 'echo 0, a, 0 && echo 1, b, 1 && echo 2, c, 2 && echo 3, d, 3 && echo 4, e, 4' WITH CSV;
step s1-begin: BEGIN;
step s1-adaptive-select:
SET citus.enable_repartition_joins TO ON;
SELECT * FROM hash_copy AS t1 JOIN hash_copy AS t2 ON t1.id = t2.int_data ORDER BY 1, 2, 3, 4;
SET citus.enable_repartition_joins TO ON;
SELECT * FROM hash_copy AS t1 JOIN hash_copy AS t2 ON t1.id = t2.int_data ORDER BY 1, 2, 3, 4;
id|data|int_data|id|data|int_data
---------------------------------------------------------------------
@ -652,6 +782,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-insert s2-copy s1-commit s1-select-count
create_distributed_table
@ -670,6 +805,11 @@ count
11
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-insert-select s2-copy s1-commit s1-select-count
create_distributed_table
@ -688,6 +828,11 @@ count
15
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-update s2-copy s1-commit s1-select-count
create_distributed_table
@ -706,6 +851,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-delete s2-copy s1-commit s1-select-count
create_distributed_table
@ -724,6 +874,11 @@ count
9
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-truncate s2-copy s1-commit s1-select-count
create_distributed_table
@ -743,6 +898,11 @@ count
5
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-drop s2-copy s1-commit s1-select-count
create_distributed_table
@ -759,6 +919,11 @@ step s2-copy: <... completed>
ERROR: relation "hash_copy" does not exist
step s1-select-count: SELECT COUNT(*) FROM hash_copy;
ERROR: relation "hash_copy" does not exist
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-ddl-create-index s2-copy s1-commit s1-select-count s1-show-indexes
create_distributed_table
@ -785,6 +950,11 @@ run_command_on_workers
(localhost,57638,t,2)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-ddl-create-index s1-begin s1-ddl-drop-index s2-copy s1-commit s1-select-count s1-show-indexes
create_distributed_table
@ -812,6 +982,11 @@ run_command_on_workers
(localhost,57638,t,0)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-ddl-add-column s2-copy s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -839,6 +1014,11 @@ run_command_on_workers
(localhost,57638,t,new_column)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-ddl-add-column s1-begin s1-ddl-drop-column s2-copy s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -866,6 +1046,11 @@ run_command_on_workers
(localhost,57638,t,"")
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-ddl-rename-column s2-copy s1-commit s1-select-count s1-show-columns
create_distributed_table
@ -892,6 +1077,11 @@ run_command_on_workers
(localhost,57638,t,new_column)
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-table-size s2-copy s1-commit s1-select-count
create_distributed_table
@ -915,6 +1105,11 @@ count
10
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-master-modify-multiple-shards s2-copy s1-commit s1-select-count
create_distributed_table
@ -933,6 +1128,11 @@ count
5
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-initialize s1-begin s1-master-drop-all-shards s2-copy s1-commit s1-select-count
create_distributed_table
@ -958,6 +1158,11 @@ count
0
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-drop s1-create-non-distributed-table s1-initialize s1-begin s1-distribute-table s2-copy s1-commit s1-select-count
create_distributed_table
@ -984,3 +1189,8 @@ count
15
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -7,21 +7,21 @@ create_distributed_table
(1 row)
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1|col_2
---------------------------------------------------------------------
@ -33,17 +33,22 @@ col_1|col_2
(5 rows)
step s2-begin:
BEGIN;
BEGIN;
step s2-update:
UPDATE target_table SET col_2 = 5;
UPDATE target_table SET col_2 = 5;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-update: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-insert-into-select-conflict-do-nothing s2-begin s2-delete s1-commit s2-commit
@ -53,34 +58,39 @@ create_distributed_table
(1 row)
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
step s2-begin:
BEGIN;
BEGIN;
step s2-delete:
DELETE FROM target_table;
DELETE FROM target_table;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-delete: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-insert-into-select-conflict-do-nothing s2-begin s2-insert-into-select-conflict-update s1-commit s2-commit
@ -90,40 +100,40 @@ create_distributed_table
(1 row)
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
step s2-begin:
BEGIN;
BEGIN;
step s2-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-insert-into-select-conflict-update: <... completed>
col_1|col_2
@ -136,7 +146,12 @@ col_1|col_2
(5 rows)
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-insert-into-select-conflict-update s2-begin s2-insert-into-select-conflict-update s1-commit s2-commit
@ -146,21 +161,21 @@ create_distributed_table
(1 row)
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1|col_2
---------------------------------------------------------------------
@ -172,23 +187,23 @@ col_1|col_2
(5 rows)
step s2-begin:
BEGIN;
BEGIN;
step s2-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-insert-into-select-conflict-update: <... completed>
col_1|col_2
@ -201,7 +216,12 @@ col_1|col_2
(5 rows)
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-insert-into-select-conflict-update s2-begin s2-insert-into-select-conflict-do-nothing s1-commit s2-commit
@ -211,21 +231,21 @@ create_distributed_table
(1 row)
step s1-begin:
SET citus.shard_replication_factor to 1;
BEGIN;
SET citus.shard_replication_factor to 1;
BEGIN;
step s1-insert-into-select-conflict-update:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1|col_2
---------------------------------------------------------------------
@ -237,27 +257,32 @@ col_1|col_2
(5 rows)
step s2-begin:
BEGIN;
BEGIN;
step s2-insert-into-select-conflict-do-nothing:
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
INSERT INTO target_table
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT DO NOTHING;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-insert-into-select-conflict-do-nothing: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin-replication-factor-2 s1-insert-into-select-conflict-update-replication-factor-2 s2-begin-replication-factor-2 s2-insert-into-select-conflict-update-replication-factor-2 s1-commit s2-commit
@ -267,21 +292,21 @@ create_distributed_table
(1 row)
step s1-begin-replication-factor-2:
SET citus.shard_replication_factor to 2;
BEGIN;
SET citus.shard_replication_factor to 2;
BEGIN;
step s1-insert-into-select-conflict-update-replication-factor-2:
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
col_1|col_2|col_3
---------------------------------------------------------------------
@ -293,24 +318,24 @@ col_1|col_2|col_3
(5 rows)
step s2-begin-replication-factor-2:
SET citus.shard_replication_factor to 2;
BEGIN;
SET citus.shard_replication_factor to 2;
BEGIN;
step s2-insert-into-select-conflict-update-replication-factor-2:
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
INSERT INTO target_table_2
SELECT
col_1, col_2
FROM (
SELECT
col_1, col_2, col_3
FROM
source_table
LIMIT 5
) as foo
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 RETURNING *;
<waiting ...>
step s1-commit:
COMMIT;
COMMIT;
step s2-insert-into-select-conflict-update-replication-factor-2: <... completed>
col_1|col_2|col_3
@ -323,5 +348,10 @@ col_1|col_2|col_3
(5 rows)
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -5,7 +5,7 @@ step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
@ -14,61 +14,76 @@ step s1-insert_to_events_test_table:
INSERT INTO events_test_table VALUES(4,6,8,10);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-insert_to_events_test_table: <... completed>
step s1-commit:
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-update_events_test_table s2-commit s1-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
step s1-update_events_test_table:
UPDATE users_test_table SET value_1 = 3;
UPDATE users_test_table SET value_1 = 3;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-update_events_test_table: <... completed>
step s1-commit:
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s2-modify_with_subquery_v1 s1-delete_events_test_table s2-commit s1-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
step s1-delete_events_test_table:
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-delete_events_test_table: <... completed>
step s1-commit:
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s1-insert_to_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s1-insert_to_events_test_table:
INSERT INTO events_test_table VALUES(4,6,8,10);
@ -81,7 +96,12 @@ step s1-commit:
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s1-update_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
@ -89,10 +109,10 @@ step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s1-update_events_test_table:
UPDATE users_test_table SET value_1 = 3;
UPDATE users_test_table SET value_1 = 3;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
@ -102,7 +122,12 @@ step s1-commit:
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s2-begin s1-delete_events_test_table s2-modify_with_subquery_v1 s1-commit s2-commit
@ -110,10 +135,10 @@ step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
BEGIN;
step s1-delete_events_test_table:
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
DELETE FROM events_test_table WHERE user_id = 1 or user_id = 3;
step s2-modify_with_subquery_v1:
UPDATE users_test_table SET value_2 = 5 FROM events_test_table WHERE users_test_table.user_id = events_test_table.user_id;
@ -123,5 +148,10 @@ step s1-commit:
step s2-modify_with_subquery_v1: <... completed>
step s2-commit:
COMMIT;
COMMIT;
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -75,7 +75,7 @@ step s1-update-ref-table:
update ref_table set a = a + 1;
step s2-sleep:
SELECT pg_sleep(0.5);
SELECT pg_sleep(0.5);
pg_sleep
---------------------------------------------------------------------
@ -83,7 +83,7 @@ pg_sleep
(1 row)
step s2-view-dist:
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC;
SELECT query, query_hostname, query_hostport, distributed_query_host_name, distributed_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_dist_stat_activity WHERE query NOT ILIKE '%pg_prepared_xacts%' AND query NOT ILIKE '%COMMIT%' ORDER BY query DESC;
query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
---------------------------------------------------------------------
@ -96,7 +96,7 @@ query |query_hostname |query_hostport|distribute
(2 rows)
step s2-view-worker:
SELECT query, query_hostname, query_hostport, distributed_query_host_name,
SELECT query, query_hostname, query_hostport, distributed_query_host_name,
distributed_query_host_port, state, wait_event_type, wait_event, usename, datname
FROM citus_worker_stat_activity
WHERE query NOT ILIKE '%pg_prepared_xacts%' AND
@ -106,10 +106,10 @@ step s2-view-worker:
query |query_hostname |query_hostport|distributed_query_host_name|distributed_query_host_port|state |wait_event_type|wait_event|usename |datname
---------------------------------------------------------------------
UPDATE public.ref_table_1500379 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500379 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500379 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500379 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500775 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57638|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500775 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57637|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500775 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|coordinator_host| 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500775 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|localhost | 57636|coordinator_host | 57636|idle in transaction|Client |ClientRead|postgres|regression
(4 rows)
step s2-end:
@ -140,9 +140,9 @@ step s1-update-ref-table:
update ref_table set a = a + 1;
step s2-active-transactions:
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
-- Admin should be able to see all transactions
SELECT count(*) FROM get_all_active_transactions();
SELECT count(*) FROM get_global_active_transactions();
count
---------------------------------------------------------------------

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,11 @@ step s1-commit:
step s2-undistribute: <... completed>
ERROR: cannot complete operation because no such table exists
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-select s1-commit
step s1-begin:
@ -47,6 +52,11 @@ a|b
5|6
(3 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-insert s1-commit s2-select
step s1-begin:
@ -79,6 +89,11 @@ a| b
9|10
(5 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-insert-select s1-commit s2-select
step s1-begin:
@ -112,6 +127,11 @@ a|b
5|6
(6 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-delete s1-commit s2-select
step s1-begin:
@ -141,6 +161,11 @@ a|b
5|6
(2 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-copy s1-commit s2-select
step s1-begin:
@ -173,6 +198,11 @@ step s2-select:
13|14
(5 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-drop s1-commit s2-select
step s1-begin:
@ -197,6 +227,11 @@ step s2-select:
SELECT * FROM dist_table ORDER BY 1, 2;
ERROR: relation "dist_table" does not exist
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-truncate s1-commit s2-select
step s1-begin:
@ -224,6 +259,11 @@ a|b
---------------------------------------------------------------------
(0 rows)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-select-for-update s1-commit
step s1-begin:
@ -249,6 +289,11 @@ a|b
5|6
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)
starting permutation: s1-begin s1-undistribute s2-create-index-concurrently s1-commit
step s1-begin:
@ -269,3 +314,8 @@ step s1-commit:
COMMIT;
step s2-create-index-concurrently: <... completed>
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

File diff suppressed because it is too large Load Diff

View File

@ -107,15 +107,9 @@ SELECT count(*) FROM history;
2
(1 row)
-- test we can not replicate MX tables
-- test we can replicate MX tables
SET citus.shard_replication_factor TO 1;
-- metadata sync will fail as we have a statement replicated table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: relation "mcsp.history" does not exist
CONTEXT: while executing command on localhost:xxxxx
-- use streaming replication to enable metadata syncing
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid IN
('history'::regclass);
-- metadata sync will succeed even if we have rep > 1 tables
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------

View File

@ -357,18 +357,6 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
ERROR: Local or references tables can only have 's' or 't' as the replication model.
ROLLBACK;
-- not-matching replication model for hash table
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
assign_distributed_transaction_id
---------------------------------------------------------------------
(1 row)
SET application_name to 'citus';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 't');
ERROR: Hash distributed tables can only have 's' as the replication model.
ROLLBACK;
-- add entry for super user table
\c - postgres - :worker_1_port
SET search_path TO metadata_sync_helpers;

View File

@ -428,6 +428,7 @@ SELECT create_distributed_table('table_range', 'id', 'range');
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------
@ -1356,3 +1357,4 @@ DROP TABLE range_table;
DROP TABLE none;
DROP TABLE ref;
DROP TABLE local_table;
DROP FOREIGN TABLE table3_groupD CASCADE;

View File

@ -1,6 +1,8 @@
-- ===================================================================
-- create test functions
-- ===================================================================
CREATE SCHEMA metadata_test;
SET search_path TO metadata_test;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
CREATE FUNCTION load_shard_id_array(regclass)
RETURNS bigint[]
@ -286,6 +288,7 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SET search_path TO metadata_test;
SELECT * FROM get_shardid_test_table1_540006;
column1 | column2
---------------------------------------------------------------------
@ -305,6 +308,7 @@ SELECT * FROM get_shardid_test_table1_540007;
(1 row)
\c - - - :master_port
SET search_path TO metadata_test;
-- test non-existing value
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
get_shard_id_for_distribution_column
@ -336,6 +340,7 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SET search_path TO metadata_test;
SELECT * FROM get_shardid_test_table2_540013;
column1 | column2
---------------------------------------------------------------------
@ -349,6 +354,7 @@ SELECT * FROM get_shardid_test_table2_540011;
(1 row)
\c - - - :master_port
SET search_path TO metadata_test;
-- test mismatching data type
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
ERROR: malformed array literal: "a"
@ -578,4 +584,5 @@ ORDER BY
(1 row)
-- clear unnecessary tables;
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5, events_table_count;
SET client_min_messages TO ERROR;
DROP SCHEMA metadata_test CASCADE;

View File

@ -172,6 +172,7 @@ CREATE FOREIGN TABLE foreign_table (
full_name text not null default ''
) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true');
SELECT create_distributed_table('foreign_table', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------

View File

@ -21,6 +21,16 @@ SELECT create_distributed_table('on_update_fkey_table', 'id');
ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERENCES "refer'ence_table"(id) ON UPDATE CASCADE;
INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- also have one replicated table
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('replicated_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
@ -29,8 +39,24 @@ SELECT count(*) FROM on_update_fkey_table;
0
(1 row)
-- fill the table again
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
RESET citus.task_assignment_policy;
-- fill the tables again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -59,6 +85,16 @@ BEGIN;
0
(1 row)
ROLLBACK;
BEGIN;
ALTER TABLE replicated_table ADD COLUMN x INT;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
@ -70,8 +106,17 @@ SELECT count(*) FROM on_update_fkey_table;
0
(1 row)
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -95,6 +140,10 @@ BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE replicated_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
NOTICE: truncate cascades to table "on_update_fkey_table"
@ -112,6 +161,11 @@ NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
NOTICE: truncate cascades to table "on_update_fkey_table_xxxxxxx"
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE replicated_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
@ -136,6 +190,26 @@ BEGIN;
0
(1 row)
ROLLBACK;
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
TRUNCATE replicated_table;
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
1001
(1 row)
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
@ -162,6 +236,16 @@ BEGIN;
t
(1 row)
ROLLBACK;
BEGIN;
-- should work since the schema is in the search path
SET search_path TO 'truncate_from_workers';
SELECT lock_relation_if_exists('replicated_table', 'ACCESS SHARE');
lock_relation_if_exists
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
BEGIN;
-- should return false since there is no such table
@ -249,7 +333,8 @@ BEGIN;
COMMIT;
DROP SCHEMA truncate_from_workers CASCADE;
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table truncate_from_workers."refer'ence_table"
drop cascades to table truncate_from_workers.on_update_fkey_table
drop cascades to table truncate_from_workers.replicated_table
SET search_path TO public;

View File

@ -103,6 +103,7 @@ CREATE FOREIGN TABLE remote_engagements (
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('remote_engagements', 'id', 'hash');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
---------------------------------------------------------------------
@ -118,3 +119,5 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AN
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot repair shard
DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported.
-- clean-up
DROP FOREIGN TABLE remote_engagements CASCADE;

View File

@ -33,6 +33,23 @@ SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
SET citus.shard_replication_factor to 2;
CREATE TABLE table_1_rep (key int, value text);
SELECT create_distributed_table('table_1_rep', 'key', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE table_2_rep (key int, value text);
SELECT create_distributed_table('table_2_rep', 'key', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO table_1_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
set citus.log_intermediate_results TO ON;
set client_min_messages to debug1;
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
@ -96,6 +113,67 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
0
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count | key
---------------------------------------------------------------------
1 | 1
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :worker_1_port
SET search_path TO mx_coordinator_shouldhaveshards;
set citus.log_intermediate_results TO ON;
@ -161,6 +239,67 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator
0
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count | key
---------------------------------------------------------------------
1 | 1
(1 row)
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1_rep ORDER BY key, value DESC LIMIT 1
DEBUG: push down of limit count: 1
DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2_rep USING (key)) GROUP BY a.key HAVING (max(table_2_rep.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1_rep
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2_rep (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1_rep WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2_rep.key, table_2_rep.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: push down of limit count: 1
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
0
(1 row)
\c - - - :master_port
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);
?column?

View File

@ -91,6 +91,42 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
(1 row)
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- add some replicated tables
SET citus.shard_replication_factor TO 2;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events_replicated(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_replicated_2021_jan PARTITION OF events_replicated
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_replicated_2021_feb PARTITION OF events_replicated
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events_replicated SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events_replicated', 'ts');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_table_set_access_method('events_replicated_2021_jan', 'columnar');
alter_table_set_access_method
---------------------------------------------------------------------
(1 row)
CREATE TABLE distributed_table_replicated_1(col int unique, b tt2);
SELECT create_distributed_table('distributed_table_replicated_1', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX indrep1 ON distributed_table_replicated_1(b);
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
@ -120,12 +156,15 @@ SELECT * FROM test_matview;
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text;
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
logicalrelid | partmethod | partkey | colocationid | repmodel | autoconverted
---------------------------------------------------------------------
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
(3 rows)
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s | f
events_replicated | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c | f
events_replicated_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c | f
events_replicated_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390013 | c | f
(6 rows)
SELECT count(*) > 0 FROM pg_dist_node;
?column?
@ -542,6 +581,24 @@ BEGIN;
(1 row)
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep(a int);
SELECT create_distributed_table('test_table_rep', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
@ -584,6 +641,49 @@ BEGIN;
(1 row)
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET citus.multi_shard_modify_mode TO sequential;
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
count
---------------------------------------------------------------------
101
(1 row)
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
count
---------------------------------------------------------------------
1
(1 row)
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- cleanup
\c - - - :master_port

View File

@ -10,6 +10,11 @@ SET search_path TO citus_mx_test_schema;
\COPY citus_mx_test_schema_join_1.nation_hash_2 FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_2.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
SET citus.shard_replication_factor TO 2;
CREATE TABLE citus_mx_test_schema.nation_hash_replicated AS SELECT * FROM citus_mx_test_schema.nation_hash;
SELECT create_distributed_table('citus_mx_test_schema.nation_hash_replicated', 'n_nationkey');
\COPY nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- now try loading data from worker node
\c - - - :worker_1_port
SET search_path TO public;
@ -17,10 +22,13 @@ SET search_path TO public;
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\c - - - :worker_2_port
-- and use second worker as well
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- get ready for the next test
TRUNCATE orders_mx;
@ -35,6 +43,8 @@ show citus.local_shared_pool_size;
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- set it back
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();

View File

@ -89,6 +89,7 @@ test: isolation_ref_select_for_update_vs_all_on_mx
test: isolation_ref_update_delete_upsert_vs_all_on_mx
test: isolation_dis2ref_foreign_keys_on_mx
test: isolation_metadata_sync_deadlock
test: isolation_replicated_dist_on_mx
# MXless tests
test: isolation_turn_mx_off

View File

@ -45,7 +45,7 @@ test: coordinator_evaluation_modify
test: coordinator_evaluation_select
test: multi_mx_call
test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution
test: multi_mx_modifications local_shard_execution local_shard_execution_replicated
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy
test: undistribute_table_cascade_mx

View File

@ -7,15 +7,30 @@ SET search_path TO citus_mx_test_schema;
\COPY citus_mx_test_schema_join_1.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_1.nation_hash_2 FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\COPY citus_mx_test_schema_join_2.nation_hash FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
SET citus.shard_replication_factor TO 2;
CREATE TABLE citus_mx_test_schema.nation_hash_replicated AS SELECT * FROM citus_mx_test_schema.nation_hash;
SELECT create_distributed_table('citus_mx_test_schema.nation_hash_replicated', 'n_nationkey');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$citus_mx_test_schema.nation_hash_replicated$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
\COPY nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- now try loading data from worker node
\c - - - :worker_1_port
SET search_path TO public;
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\COPY lineitem_mx FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
\c - - - :worker_2_port
-- and use second worker as well
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
-- get ready for the next test
TRUNCATE orders_mx;
\c - - - :worker_2_port
@ -23,74 +38,83 @@ SET citus.log_local_commands TO ON;
-- simulate the case where there is no connection slots available
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
pg_reload_conf
----------------
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
----------
pg_sleep
---------------------------------------------------------------------
(1 row)
show citus.local_shared_pool_size;
citus.local_shared_pool_size
------------------------------
citus.local_shared_pool_size
---------------------------------------------------------------------
-1
(1 row)
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
NOTICE: executing the copy locally for shard 1220075
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..."
NOTICE: executing the copy locally for shard 1220071
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..."
NOTICE: executing the copy locally for shard 1220069
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request"
NOTICE: executing the copy locally for shard 1220079
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir"
NOTICE: executing the copy locally for shard 1220083
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..."
NOTICE: executing the copy locally for shard 1220073
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto"
NOTICE: executing the copy locally for shard 1220077
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..."
NOTICE: executing the copy locally for shard 1220081
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos"
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
NOTICE: executing the copy locally for shard 1220079
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo"
NOTICE: executing the copy locally for shard 1220077
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..."
NOTICE: executing the copy locally for shard 1220073
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully "
NOTICE: executing the copy locally for shard 1220071
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..."
NOTICE: executing the copy locally for shard 1220083
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..."
NOTICE: executing the copy locally for shard 1220081
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol"
NOTICE: executing the copy locally for shard 1220075
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..."
NOTICE: executing the copy locally for shard 1220069
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..."
\COPY citus_mx_test_schema.nation_hash_replicated FROM '@abs_srcdir@/data/nation.data' with delimiter '|';
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 1: "0|ALGERIA|0| haggle. carefully final deposits detect slyly agai"
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 2: "1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon"
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 3: "2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forg..."
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY nation_hash_replicated, line 7: "6|FRANCE|3|refully final requests. regular, ironi"
-- set it back
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
----------------
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
----------
pg_sleep
---------------------------------------------------------------------
(1 row)
show citus.local_shared_pool_size;
citus.local_shared_pool_size
------------------------------
citus.local_shared_pool_size
---------------------------------------------------------------------
50
(1 row)

View File

@ -1,5 +1,7 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE test_concurrent_dml (test_id integer NOT NULL, data text);
SELECT master_create_distributed_table('test_concurrent_dml', 'test_id', 'hash');
SELECT master_create_worker_shards('test_concurrent_dml', 4, 2);
@ -8,6 +10,7 @@ setup
teardown
{
DROP TABLE IF EXISTS test_concurrent_dml CASCADE;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -1,5 +1,7 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE distributed_table (x int primary key, y int);
SELECT create_distributed_table('distributed_table', 'x');
INSERT INTO distributed_table VALUES (1,0);
@ -15,6 +17,7 @@ setup
teardown
{
DROP TABLE distributed_table;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -5,6 +5,8 @@
// create append distributed table to test behavior of COPY in concurrent operations
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE hash_copy(id integer, data text, int_data int);
SELECT create_distributed_table('hash_copy', 'id');
@ -14,6 +16,7 @@ setup
teardown
{
DROP TABLE IF EXISTS hash_copy CASCADE;
SELECT citus_internal.restore_isolation_tester_func();
}
// session 1

View File

@ -1,5 +1,7 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE target_table(col_1 int primary key, col_2 int);
SELECT create_distributed_table('target_table','col_1');
INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6);
@ -16,6 +18,7 @@ setup
teardown
{
DROP TABLE target_table, target_table_2, source_table;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -1,7 +1,8 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor to 2;
CREATE TABLE users_test_table(user_id int, value_1 int, value_2 int, value_3 int);
SELECT create_distributed_table('users_test_table', 'user_id');
INSERT INTO users_test_table VALUES
@ -30,6 +31,7 @@ teardown
DROP TABLE users_test_table;
DROP TABLE events_test_table;
SET citus.shard_replication_factor to 1;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -0,0 +1,220 @@
#include "isolation_mx_common.include.spec"
setup
{
CREATE TABLE replicated_table(user_id int, value_1 int);
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('replicated_table', 'user_id', shard_count:=4);
INSERT INTO replicated_table VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
CREATE TABLE replicated_table_2(user_id int, value_1 int);
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('replicated_table_2', 'user_id', shard_count:=4);
INSERT INTO replicated_table_2 VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
CREATE TABLE single_replicated_table(user_id int, value_1 int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('single_replicated_table', 'user_id', shard_count:=4);
INSERT INTO single_replicated_table VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
}
// Create and use UDF to close the connection opened in the setup step. Also return the cluster
// back to the initial state.
teardown
{
DROP TABLE replicated_table, replicated_table_2, single_replicated_table;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"
step "s1-begin"
{
BEGIN;
}
// We do not need to begin a transaction on coordinator, since it will be open on workers.
step "s1-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "s1-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s1-update-1-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE replicated_table SET value_1 = 12 WHERE user_id = 1');
}
step "s1-update-all-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE replicated_table SET value_1 = 12');
}
step "s1-delete-1-from-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM replicated_table WHERE user_id = 1');
}
step "s1-delete-all-from-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM replicated_table');
}
step "s1-insert-into-1-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table VALUES(1,81)');
}
step "s1-insert-into-all-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table VALUES(8,81),(9,91),(10,91),(11,91),(12,91), (13,91), (14,91), (15,91), (16,91), (17,91), (18,91), (19,91), (20,91)');
}
step "s1-insert-into-select"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table SELECT * FROM replicated_table_2');
}
step "s1-insert-into-select-from-single-rep"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table SELECT * FROM single_replicated_table LIMIT 10');
}
step "s1-copy-all-to-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('COPY replicated_table FROM PROGRAM ''echo 10, 101 && echo 11, 111 && echo 11, 111 && echo 12, 111 && echo 13, 111 && echo 14, 111 && echo 15, 111 && echo 16, 111 && echo 17, 111 && echo 18, 111 && echo 19, 111 && echo 20, 111 && echo 21, 111 && echo 22, 111 && echo 23, 111'' WITH CSV');
}
step "s1-copy-1-to-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('COPY replicated_table FROM PROGRAM ''echo 1, 101 && echo 1, 111 && echo 1,1111'' WITH CSV');
}
step "s1-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
step "s1-alter-table"
{
ALTER TABLE replicated_table ADD COLUMN x INT;
}
step "s1-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "s2-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s2-update-1-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE replicated_table SET value_1 = 12 WHERE user_id = 1');
}
step "s2-update-all-single-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE single_replicated_table SET value_1 = 12');
}
step "s2-update-all-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE replicated_table SET value_1 = 12');
}
step "s2-select-from-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM replicated_table');
}
step "s2-insert-into-1-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table VALUES(1,81)');
}
step "s2-insert-into-all-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO replicated_table VALUES(8,81),(9,91),(10,91),(11,91),(12,91), (13,91), (14,91), (15,91), (16,91), (17,91), (18,91), (19,91), (20,91)');
}
step "s2-copy-all-to-rep-table"
{
SELECT run_commands_on_session_level_connection_to_node('COPY replicated_table FROM PROGRAM ''echo 10, 101 && echo 11, 111 && echo 11, 111 && echo 12, 111 && echo 13, 111 && echo 14, 111 && echo 15, 111 && echo 16, 111 && echo 17, 111 && echo 18, 111 && echo 19, 111 && echo 20, 111 && echo 21, 111 && echo 22, 111 && echo 23, 111'' WITH CSV');
}
step "s2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s2-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-1-from-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-all-from-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-1-from-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-all-from-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-1-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-all-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-1-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-1-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-all-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-all-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-all-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-1-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-all-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-all-to-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-all-to-rep-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-from-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection"
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-from-rep-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-select" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-select" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-1-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-select-from-single-rep" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-all-single-rep-table" "s2-update-all-rep-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"

View File

@ -1,5 +1,8 @@
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE dist_table(a INT, b INT);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table VALUES (1, 2), (3, 4), (5, 6);
@ -8,6 +11,7 @@ setup
teardown
{
DROP TABLE IF EXISTS dist_table;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

File diff suppressed because it is too large Load Diff

View File

@ -89,15 +89,10 @@ WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nod
SELECT count(*) FROM data;
SELECT count(*) FROM history;
-- test we can not replicate MX tables
-- test we can replicate MX tables
SET citus.shard_replication_factor TO 1;
-- metadata sync will fail as we have a statement replicated table
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- use streaming replication to enable metadata syncing
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid IN
('history'::regclass);
-- metadata sync will succeed even if we have rep > 1 tables
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
CREATE TABLE mx_table(a int);

View File

@ -225,14 +225,6 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_add_partition_metadata ('test_ref'::regclass, 'n', NULL, 0, 'c');
ROLLBACK;
-- not-matching replication model for hash table
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
SELECT citus_internal_add_partition_metadata ('test_2'::regclass, 'h', 'col_1', 0, 't');
ROLLBACK;
-- add entry for super user table
\c - postgres - :worker_1_port
SET search_path TO metadata_sync_helpers;

View File

@ -553,5 +553,4 @@ DROP TABLE range_table;
DROP TABLE none;
DROP TABLE ref;
DROP TABLE local_table;
DROP FOREIGN TABLE table3_groupD CASCADE;

View File

@ -1,7 +1,8 @@
-- ===================================================================
-- create test functions
-- ===================================================================
CREATE SCHEMA metadata_test;
SET search_path TO metadata_test;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 540000;
@ -196,10 +197,13 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 3);
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SET search_path TO metadata_test;
SELECT * FROM get_shardid_test_table1_540006;
SELECT * FROM get_shardid_test_table1_540009;
SELECT * FROM get_shardid_test_table1_540007;
\c - - - :master_port
SET search_path TO metadata_test;
-- test non-existing value
SELECT get_shard_id_for_distribution_column('get_shardid_test_table1', 4);
@ -217,9 +221,12 @@ SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', '{d, e, f
-- verify result of the get_shard_id_for_distribution_column
\c - - - :worker_1_port
SET search_path TO metadata_test;
SELECT * FROM get_shardid_test_table2_540013;
SELECT * FROM get_shardid_test_table2_540011;
\c - - - :master_port
SET search_path TO metadata_test;
-- test mismatching data type
SELECT get_shard_id_for_distribution_column('get_shardid_test_table2', 'a');
@ -355,4 +362,5 @@ ORDER BY
types;$$);
-- clear unnecessary tables;
DROP TABLE get_shardid_test_table1, get_shardid_test_table2, get_shardid_test_table3, get_shardid_test_table4, get_shardid_test_table5, events_table_count;
SET client_min_messages TO ERROR;
DROP SCHEMA metadata_test CASCADE;

View File

@ -18,12 +18,25 @@ ALTER TABLE on_update_fkey_table ADD CONSTRAINT fkey FOREIGN KEY(value_1) REFERE
INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- also have one replicated table
SET citus.shard_replication_factor TO 2;
CREATE TABLE replicated_table(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('replicated_table', 'id');
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- first, make sure that truncate from the coordinator workers as expected
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- fill the table again
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
SET citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM replicated_table;
RESET citus.task_assignment_policy;
-- fill the tables again
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the coordinator
TRUNCATE "refer'ence_table" CASCADE;
@ -41,6 +54,12 @@ BEGIN;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
BEGIN;
ALTER TABLE replicated_table ADD COLUMN x INT;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
ROLLBACK;
\c - - - :worker_1_port
SET search_path TO 'truncate_from_workers';
@ -49,8 +68,13 @@ SET search_path TO 'truncate_from_workers';
TRUNCATE on_update_fkey_table;
SELECT count(*) FROM on_update_fkey_table;
-- make sure that TRUNCATE workes expected from the worker node
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
-- load some data
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
-- now, show that TRUNCATE CASCADE works expected from the worker
TRUNCATE "refer'ence_table" CASCADE;
@ -62,6 +86,11 @@ BEGIN;
TRUNCATE on_update_fkey_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE replicated_table;
ROLLBACK;
-- test within transaction blocks
BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
@ -74,6 +103,13 @@ BEGIN;
TRUNCATE "refer'ence_table" CASCADE;
ROLLBACK;
-- test with sequential mode and CASCADE
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO sequential;
TRUNCATE replicated_table CASCADE;
ROLLBACK;
-- fill some data for the next test
\c - - - :master_port
SET search_path TO 'truncate_from_workers';
@ -92,6 +128,17 @@ BEGIN;
SELECT count(*) FROM on_update_fkey_table;
ROLLBACK;
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
TRUNCATE replicated_table;
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO replicated_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM replicated_table;
TRUNCATE replicated_table;
SELECT count(*) FROM replicated_table;
ROLLBACK;
RESET client_min_messages;
\c - - - :master_port
@ -113,6 +160,13 @@ BEGIN;
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
ROLLBACK;
BEGIN;
-- should work since the schema is in the search path
SET search_path TO 'truncate_from_workers';
SELECT lock_relation_if_exists('replicated_table', 'ACCESS SHARE');
ROLLBACK;
BEGIN;
-- should return false since there is no such table
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE');

View File

@ -103,3 +103,6 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AN
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- clean-up
DROP FOREIGN TABLE remote_engagements CASCADE;

View File

@ -19,6 +19,18 @@ SELECT create_distributed_table('table_2', 'key', colocate_with := 'none');
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
SET citus.shard_replication_factor to 2;
CREATE TABLE table_1_rep (key int, value text);
SELECT create_distributed_table('table_1_rep', 'key', colocate_with := 'none');
CREATE TABLE table_2_rep (key int, value text);
SELECT create_distributed_table('table_2_rep', 'key', colocate_with := 'none');
INSERT INTO table_1_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
INSERT INTO table_2_rep VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
set citus.log_intermediate_results TO ON;
set client_min_messages to debug1;
@ -50,6 +62,34 @@ inserts AS (
RETURNING *
) SELECT count(*) FROM inserts;
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
\c - - - :worker_1_port
SET search_path TO mx_coordinator_shouldhaveshards;
@ -84,6 +124,34 @@ inserts AS (
RETURNING *
) SELECT count(*) FROM inserts;
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH a AS (SELECT * FROM table_1_rep ORDER BY 1,2 DESC LIMIT 1)
INSERT INTO table_1_rep SELECT count(*),
key
FROM a JOIN table_2_rep USING (key)
GROUP BY key
HAVING (max(table_2_rep.value) >= (SELECT value FROM a));
WITH stats AS (
SELECT count(key) m FROM table_1_rep
),
inserts AS (
INSERT INTO table_2_rep
SELECT key, count(*)
FROM table_1_rep
WHERE key >= (SELECT m FROM stats)
GROUP BY key
HAVING count(*) <= (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
\c - - - :master_port
SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false);

View File

@ -68,6 +68,35 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- add some replicated tables
SET citus.shard_replication_factor TO 2;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events_replicated(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_replicated_2021_jan PARTITION OF events_replicated
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_replicated_2021_feb PARTITION OF events_replicated
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events_replicated SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events_replicated', 'ts');
SELECT alter_table_set_access_method('events_replicated_2021_jan', 'columnar');
CREATE TABLE distributed_table_replicated_1(col int unique, b tt2);
SELECT create_distributed_table('distributed_table_replicated_1', 'col');
CREATE INDEX indrep1 ON distributed_table_replicated_1(b);
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
@ -230,6 +259,15 @@ BEGIN;
SELECT create_distributed_table('test_table', 'a');
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table_rep(a int);
SELECT create_distributed_table('test_table_rep', 'a');
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
@ -248,6 +286,26 @@ BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.multi_shard_modify_mode TO sequential;
SET LOCAL citus.shard_replication_factor TO 2;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- cleanup
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";