mirror of https://github.com/citusdata/citus.git
Merge pull request #3591 from citusdata/copy_shard_placement
Allow master_copy_shard_placement to replicate to new nodespull/3642/head
commit
b166105f16
|
@ -50,15 +50,26 @@
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static char LookupShardTransferMode(Oid shardReplicationModeOid);
|
static char LookupShardTransferMode(Oid shardReplicationModeOid);
|
||||||
|
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
||||||
static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
|
static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
|
||||||
int32 sourceNodePort, const char *targetNodeName,
|
int32 sourceNodePort, const char *targetNodeName,
|
||||||
int32 targetNodePort);
|
int32 targetNodePort);
|
||||||
|
static void ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
|
int32 sourceNodePort, char *targetNodeName,
|
||||||
|
int32 targetNodePort,
|
||||||
|
char shardReplicationMode);
|
||||||
|
static void CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
|
int32 sourceNodePort, char *targetNodeName,
|
||||||
|
int32 targetNodePort);
|
||||||
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
|
static List * CopyPartitionShardsCommandList(ShardInterval *shardInterval,
|
||||||
const char *sourceNodeName,
|
const char *sourceNodeName,
|
||||||
int32 sourceNodePort);
|
int32 sourceNodePort);
|
||||||
static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName,
|
static void EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName,
|
||||||
int32 sourceNodePort, const char *targetNodeName,
|
int32 sourceNodePort, const char *targetNodeName,
|
||||||
int32 targetNodePort);
|
int32 targetNodePort);
|
||||||
|
static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName,
|
||||||
|
int32 sourceNodePort, const char *targetNodeName,
|
||||||
|
int32 targetNodePort);
|
||||||
static List * RecreateTableDDLCommandList(Oid relationId);
|
static List * RecreateTableDDLCommandList(Oid relationId);
|
||||||
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
|
static List * WorkerApplyShardDDLCommandList(List *ddlCommandList, int64 shardId);
|
||||||
|
|
||||||
|
@ -87,31 +98,36 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
|
||||||
int32 targetNodePort = PG_GETARG_INT32(4);
|
int32 targetNodePort = PG_GETARG_INT32(4);
|
||||||
bool doRepair = PG_GETARG_BOOL(5);
|
bool doRepair = PG_GETARG_BOOL(5);
|
||||||
Oid shardReplicationModeOid = PG_GETARG_OID(6);
|
Oid shardReplicationModeOid = PG_GETARG_OID(6);
|
||||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
|
||||||
|
|
||||||
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
|
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
|
||||||
char *targetNodeName = text_to_cstring(targetNodeNameText);
|
char *targetNodeName = text_to_cstring(targetNodeNameText);
|
||||||
|
|
||||||
if (!doRepair)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("master_copy_shard_placement() "
|
|
||||||
"with do not repair functionality "
|
|
||||||
"is only supported on Citus Enterprise")));
|
|
||||||
}
|
|
||||||
else if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("using logical replication with repair functionality "
|
|
||||||
"is currently not supported")));
|
|
||||||
}
|
|
||||||
|
|
||||||
EnsureCoordinator();
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
EnsureCoordinator();
|
||||||
|
|
||||||
/* RepairShardPlacement function repairs only given shard */
|
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||||
|
if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("using logical replication in "
|
||||||
|
"master_copy_shard_placement() requires Citus "
|
||||||
|
"Enterprise")));
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
||||||
|
|
||||||
|
if (doRepair)
|
||||||
|
{
|
||||||
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||||
targetNodePort);
|
targetNodePort);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ReplicateColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
|
||||||
|
targetNodeName, targetNodePort,
|
||||||
|
shardReplicationMode);
|
||||||
|
}
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -171,6 +187,44 @@ BlockWritesToShardList(List *shardList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfTableCannotBeReplicated function errors out if the given table is not suitable
|
||||||
|
* for its shard being replicated. Shard replications is not allowed only for MX tables,
|
||||||
|
* since RF=1 is a must MX tables.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ErrorIfTableCannotBeReplicated(Oid relationId)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Note that ShouldSyncTableMetadata() returns true for both MX tables
|
||||||
|
* and reference tables.
|
||||||
|
*/
|
||||||
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
|
||||||
|
if (!shouldSyncMetadata)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldSyncTableMetadata() returns true also for reference table,
|
||||||
|
* we don't want to error in that case since reference tables aren't
|
||||||
|
* automatically replicated to active nodes with no shards, and
|
||||||
|
* master_copy_shard_placement() can be used to create placements in
|
||||||
|
* such nodes.
|
||||||
|
*/
|
||||||
|
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
(errmsg("Table %s is streaming replicated. Shards "
|
||||||
|
"of streaming replicated tables cannot "
|
||||||
|
"be copied", quote_literal_cstr(relationName)))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
||||||
* values to a char.
|
* values to a char.
|
||||||
|
@ -217,6 +271,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
|
||||||
|
|
||||||
char relationKind = get_rel_relkind(distributedTableId);
|
char relationKind = get_rel_relkind(distributedTableId);
|
||||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||||
|
bool missingOk = false;
|
||||||
|
|
||||||
|
|
||||||
/* prevent table from being dropped */
|
/* prevent table from being dropped */
|
||||||
|
@ -314,13 +369,189 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
|
||||||
|
|
||||||
/* after successful repair, we update shard state as healthy*/
|
/* after successful repair, we update shard state as healthy*/
|
||||||
List *placementList = ShardPlacementList(shardId);
|
List *placementList = ShardPlacementList(shardId);
|
||||||
ShardPlacement *placement = ForceSearchShardPlacementInList(placementList,
|
ShardPlacement *placement = SearchShardPlacementInList(placementList, targetNodeName,
|
||||||
targetNodeName,
|
targetNodePort,
|
||||||
targetNodePort);
|
missingOk);
|
||||||
UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
|
UpdateShardPlacementState(placement->placementId, SHARD_STATE_ACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReplicateColocatedShardPlacement replicates the given shard and its
|
||||||
|
* colocated shards from a source node to target node.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
|
int32 sourceNodePort, char *targetNodeName,
|
||||||
|
int32 targetNodePort, char shardReplicationMode)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
|
||||||
|
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||||
|
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||||
|
Oid colocatedTableId = InvalidOid;
|
||||||
|
ListCell *colocatedShardCell = NULL;
|
||||||
|
|
||||||
|
foreach_oid(colocatedTableId, colocatedTableList)
|
||||||
|
{
|
||||||
|
char relationKind = '\0';
|
||||||
|
|
||||||
|
/* check that user has owner rights in all co-located tables */
|
||||||
|
EnsureTableOwner(colocatedTableId);
|
||||||
|
|
||||||
|
relationKind = get_rel_relkind(colocatedTableId);
|
||||||
|
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(colocatedTableId);
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot replicate shard"),
|
||||||
|
errdetail("Table %s is a foreign table. Replicating "
|
||||||
|
"shards backed by foreign tables is "
|
||||||
|
"not supported.", relationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
|
||||||
|
colocatedTableId);
|
||||||
|
|
||||||
|
if (foreignConstraintCommandList != NIL &&
|
||||||
|
PartitionMethod(colocatedTableId) != DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot create foreign key constraint"),
|
||||||
|
errdetail("This shard has foreign constraints on it. "
|
||||||
|
"Citus currently supports "
|
||||||
|
"foreign key constraints only for "
|
||||||
|
"\"citus.shard_replication_factor = 1\"."),
|
||||||
|
errhint("Please change \"citus.shard_replication_factor to "
|
||||||
|
"1\". To learn more about using foreign keys with "
|
||||||
|
"other replication factors, please contact us at "
|
||||||
|
"https://citusdata.com/about/contact_us.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We sort colocatedShardList so that lock operations will not cause any
|
||||||
|
* deadlocks.
|
||||||
|
*/
|
||||||
|
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||||
|
|
||||||
|
BlockWritesToShardList(colocatedShardList);
|
||||||
|
|
||||||
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
|
{
|
||||||
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
|
uint64 colocatedShardId = colocatedShard->shardId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For shard copy, there should be healthy placement in source node and no
|
||||||
|
* placement in the target node.
|
||||||
|
*/
|
||||||
|
EnsureShardCanBeCopied(colocatedShardId, sourceNodeName, sourceNodePort,
|
||||||
|
targetNodeName, targetNodePort);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyColocatedShardPlacement function copies given shard with its co-located
|
||||||
|
* shards.
|
||||||
|
*/
|
||||||
|
CopyColocatedShardPlacement(shardId, sourceNodeName, sourceNodePort,
|
||||||
|
targetNodeName, targetNodePort);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyColocatedShardPlacement copies a shard along with its co-located shards from a
|
||||||
|
* source node to target node. CopyShardPlacement does not make any checks about state
|
||||||
|
* of the shards. It is caller's responsibility to make those checks if they are
|
||||||
|
* necessary.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CopyColocatedShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
|
||||||
|
char *targetNodeName, int32 targetNodePort)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||||
|
ListCell *colocatedShardCell = NULL;
|
||||||
|
|
||||||
|
/* iterate through the colocated shards and copy each */
|
||||||
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
|
{
|
||||||
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
|
bool includeDataCopy = true;
|
||||||
|
|
||||||
|
if (PartitionedTable(colocatedShard->relationId))
|
||||||
|
{
|
||||||
|
/* partitioned tables contain no data */
|
||||||
|
includeDataCopy = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName,
|
||||||
|
sourceNodePort, includeDataCopy);
|
||||||
|
char *tableOwner = TableOwner(colocatedShard->relationId);
|
||||||
|
|
||||||
|
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
||||||
|
tableOwner, ddlCommandList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Once all shards are created, we can recreate relationships between shards.
|
||||||
|
*
|
||||||
|
* Iterate through the colocated shards and create the foreign constraints and
|
||||||
|
* attach child tables to their parents in a partitioning hierarchy.
|
||||||
|
*
|
||||||
|
* Note: After implementing foreign constraints from distributed to reference
|
||||||
|
* tables, we have decided to not create foreign constraints from hash
|
||||||
|
* distributed to reference tables at this stage for nonblocking rebalancer.
|
||||||
|
* We just create the co-located ones here. We add the foreign constraints
|
||||||
|
* from hash distributed to reference tables after being completely done with
|
||||||
|
* the copy procedure inside LogicallyReplicateShards. The reason is that,
|
||||||
|
* the reference tables have placements in both source and target workers and
|
||||||
|
* the copied shard would get updated twice because of a cascading DML coming
|
||||||
|
* from both of the placements.
|
||||||
|
*/
|
||||||
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
|
{
|
||||||
|
List *colocatedShardForeignConstraintCommandList = NIL;
|
||||||
|
List *referenceTableForeignConstraintList = NIL;
|
||||||
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
|
char *tableOwner = TableOwner(colocatedShard->relationId);
|
||||||
|
|
||||||
|
CopyShardForeignConstraintCommandListGrouped(colocatedShard,
|
||||||
|
&
|
||||||
|
colocatedShardForeignConstraintCommandList,
|
||||||
|
&referenceTableForeignConstraintList);
|
||||||
|
|
||||||
|
List *commandList = list_concat(colocatedShardForeignConstraintCommandList,
|
||||||
|
referenceTableForeignConstraintList);
|
||||||
|
|
||||||
|
if (PartitionTable(colocatedShard->relationId))
|
||||||
|
{
|
||||||
|
char *attachPartitionCommand =
|
||||||
|
GenerateAttachShardPartitionCommand(colocatedShard);
|
||||||
|
|
||||||
|
commandList = lappend(commandList, attachPartitionCommand);
|
||||||
|
}
|
||||||
|
|
||||||
|
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
||||||
|
tableOwner, commandList);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* finally insert the placements to pg_dist_placement */
|
||||||
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
|
{
|
||||||
|
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
|
||||||
|
uint64 colocatedShardId = colocatedShard->shardId;
|
||||||
|
uint32 groupId = GroupForNode(targetNodeName, targetNodePort);
|
||||||
|
|
||||||
|
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
|
||||||
|
SHARD_STATE_ACTIVE, ShardLength(colocatedShardId),
|
||||||
|
groupId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
|
* CopyPartitionShardsCommandList gets a shardInterval which is a shard that
|
||||||
* belongs to partitioned table (this is asserted).
|
* belongs to partitioned table (this is asserted).
|
||||||
|
@ -369,19 +600,23 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
|
||||||
const char *targetNodeName, int32 targetNodePort)
|
const char *targetNodeName, int32 targetNodePort)
|
||||||
{
|
{
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementList(shardId);
|
||||||
|
bool missingSourceOk = false;
|
||||||
|
bool missingTargetOk = false;
|
||||||
|
|
||||||
ShardPlacement *sourcePlacement = ForceSearchShardPlacementInList(shardPlacementList,
|
ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
sourceNodeName,
|
sourceNodeName,
|
||||||
sourceNodePort);
|
sourceNodePort,
|
||||||
|
missingSourceOk);
|
||||||
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
|
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
errmsg("source placement must be in active state")));
|
errmsg("source placement must be in active state")));
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardPlacement *targetPlacement = ForceSearchShardPlacementInList(shardPlacementList,
|
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
targetNodeName,
|
targetNodeName,
|
||||||
targetNodePort);
|
targetNodePort,
|
||||||
|
missingTargetOk);
|
||||||
if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
|
if (targetPlacement->shardState != SHARD_STATE_INACTIVE)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
@ -391,46 +626,78 @@ EnsureShardCanBeRepaired(int64 shardId, const char *sourceNodeName, int32 source
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SearchShardPlacementInList searches a provided list for a shard placement with the
|
* EnsureShardCanBeCopied checks if the given shard has a healthy placement in the source
|
||||||
* specified node name and port. This function returns NULL if no such
|
* node and no placements in the target node.
|
||||||
* placement exists in the provided list.
|
|
||||||
*/
|
*/
|
||||||
ShardPlacement *
|
static void
|
||||||
SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
|
EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNodePort,
|
||||||
uint32 nodePort)
|
const char *targetNodeName, int32 targetNodePort)
|
||||||
{
|
{
|
||||||
ShardPlacement *shardPlacement = NULL;
|
List *shardPlacementList = ShardPlacementList(shardId);
|
||||||
foreach_ptr(shardPlacement, shardPlacementList)
|
bool missingSourceOk = false;
|
||||||
|
bool missingTargetOk = true;
|
||||||
|
|
||||||
|
ShardPlacement *sourcePlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
|
sourceNodeName,
|
||||||
|
sourceNodePort,
|
||||||
|
missingSourceOk);
|
||||||
|
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
|
||||||
{
|
{
|
||||||
if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
nodePort == shardPlacement->nodePort)
|
errmsg("source placement must be in active state")));
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
|
targetNodeName,
|
||||||
|
targetNodePort,
|
||||||
|
missingTargetOk);
|
||||||
|
if (targetPlacement != NULL)
|
||||||
{
|
{
|
||||||
return shardPlacement;
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg("shard " INT64_FORMAT " already exists in the target node",
|
||||||
|
shardId)));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ForceSearchShardPlacementInList searches a provided list for a shard
|
* SearchShardPlacementInList searches a provided list for a shard placement with the
|
||||||
* placement with the specified node name and port. This function throws an
|
* specified node name and port. If missingOk is set to true, this function returns NULL
|
||||||
* error if no such placement exists in the provided list.
|
* if no such placement exists in the provided list, otherwise it throws an error.
|
||||||
*/
|
*/
|
||||||
ShardPlacement *
|
ShardPlacement *
|
||||||
ForceSearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
|
SearchShardPlacementInList(List *shardPlacementList, const char *nodeName,
|
||||||
uint32 nodePort)
|
uint32 nodePort, bool missingOk)
|
||||||
{
|
{
|
||||||
ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, nodeName,
|
ListCell *shardPlacementCell = NULL;
|
||||||
nodePort);
|
ShardPlacement *matchingPlacement = NULL;
|
||||||
if (placement == NULL)
|
|
||||||
|
foreach(shardPlacementCell, shardPlacementList)
|
||||||
{
|
{
|
||||||
|
ShardPlacement *shardPlacement = lfirst(shardPlacementCell);
|
||||||
|
|
||||||
|
if (strncmp(nodeName, shardPlacement->nodeName, MAX_NODE_LENGTH) == 0 &&
|
||||||
|
nodePort == shardPlacement->nodePort)
|
||||||
|
{
|
||||||
|
matchingPlacement = shardPlacement;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (matchingPlacement == NULL)
|
||||||
|
{
|
||||||
|
if (missingOk)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
|
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
|
||||||
errmsg("could not find placement matching \"%s:%d\"",
|
errmsg("could not find placement matching \"%s:%d\"",
|
||||||
nodeName, nodePort),
|
nodeName, nodePort),
|
||||||
errhint("Confirm the placement still exists and try again.")));
|
errhint("Confirm the placement still exists and try again.")));
|
||||||
}
|
}
|
||||||
return placement;
|
|
||||||
|
return matchingPlacement;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -456,10 +723,6 @@ CopyShardCommandList(ShardInterval *shardInterval, const char *sourceNodeName,
|
||||||
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
|
copyShardToNodeCommandsList = list_concat(copyShardToNodeCommandsList,
|
||||||
tableRecreationCommandList);
|
tableRecreationCommandList);
|
||||||
|
|
||||||
/*
|
|
||||||
* The caller doesn't want to include the COPY command, perhaps using
|
|
||||||
* logical replication to copy the data.
|
|
||||||
*/
|
|
||||||
if (includeDataCopy)
|
if (includeDataCopy)
|
||||||
{
|
{
|
||||||
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
|
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
|
||||||
|
|
|
@ -270,8 +270,10 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
||||||
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
|
CopyShardCommandList(shardInterval, srcNodeName, srcNodePort, includeData);
|
||||||
|
|
||||||
List *shardPlacementList = ShardPlacementList(shardId);
|
List *shardPlacementList = ShardPlacementList(shardId);
|
||||||
|
bool missingWorkerOk = true;
|
||||||
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
|
||||||
nodeName, nodePort);
|
nodeName, nodePort,
|
||||||
|
missingWorkerOk);
|
||||||
char *tableOwner = TableOwner(shardInterval->relationId);
|
char *tableOwner = TableOwner(shardInterval->relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -165,9 +165,7 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt
|
||||||
List **
|
List **
|
||||||
referenceTableForeignConstraintList);
|
referenceTableForeignConstraintList);
|
||||||
extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
||||||
const char *nodeName, uint32 nodePort);
|
const char *nodeName, uint32 nodePort,
|
||||||
extern ShardPlacement * ForceSearchShardPlacementInList(List *shardPlacementList,
|
bool missingOk);
|
||||||
const char *nodeName,
|
|
||||||
uint32 nodePort);
|
|
||||||
|
|
||||||
#endif /* MASTER_PROTOCOL_H */
|
#endif /* MASTER_PROTOCOL_H */
|
||||||
|
|
|
@ -1,18 +1,21 @@
|
||||||
Parsed test spec with 2 sessions
|
Parsed test spec with 2 sessions
|
||||||
|
|
||||||
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
|
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-load-cache:
|
step s1-load-cache:
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -30,7 +33,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-update:
|
step s1-update:
|
||||||
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -43,7 +46,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -55,18 +58,21 @@ nodeport success result
|
||||||
57638 t 5
|
57638 t 5
|
||||||
|
|
||||||
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
|
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-load-cache:
|
step s1-load-cache:
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -84,7 +90,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-delete:
|
step s1-delete:
|
||||||
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
DELETE FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -97,7 +103,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -109,15 +115,18 @@ nodeport success result
|
||||||
57638 t
|
57638 t
|
||||||
|
|
||||||
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
|
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-load-cache:
|
step s1-load-cache:
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -135,7 +144,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -148,7 +157,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -160,15 +169,18 @@ nodeport success result
|
||||||
57638 t 10
|
57638 t 10
|
||||||
|
|
||||||
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
|
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-load-cache:
|
step s1-load-cache:
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -186,7 +198,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-copy:
|
step s1-copy:
|
||||||
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -199,7 +211,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -211,15 +223,18 @@ nodeport success result
|
||||||
57638 t 5
|
57638 t 5
|
||||||
|
|
||||||
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
|
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-load-cache:
|
step s1-load-cache:
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -237,7 +252,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-ddl:
|
step s1-ddl:
|
||||||
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -250,7 +265,7 @@ step s2-print-index-count:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||||
ORDER BY
|
ORDER BY
|
||||||
nodeport;
|
nodeport;
|
||||||
|
|
||||||
|
@ -262,15 +277,18 @@ nodeport success result
|
||||||
57638 t 1
|
57638 t 1
|
||||||
|
|
||||||
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
|
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -288,7 +306,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-update:
|
step s1-update:
|
||||||
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -301,7 +319,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -313,15 +331,18 @@ nodeport success result
|
||||||
57638 t 5
|
57638 t 5
|
||||||
|
|
||||||
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
|
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -339,7 +360,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-delete:
|
step s1-delete:
|
||||||
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
DELETE FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -352,7 +373,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -364,12 +385,15 @@ nodeport success result
|
||||||
57638 t
|
57638 t
|
||||||
|
|
||||||
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
|
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -387,7 +411,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-insert:
|
step s1-insert:
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -400,7 +424,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -412,12 +436,15 @@ nodeport success result
|
||||||
57638 t 10
|
57638 t 10
|
||||||
|
|
||||||
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
|
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -435,7 +462,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-copy:
|
step s1-copy:
|
||||||
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -448,7 +475,7 @@ step s2-print-content:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -460,12 +487,15 @@ nodeport success result
|
||||||
57638 t 5
|
57638 t 5
|
||||||
|
|
||||||
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
|
starting permutation: s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
step s1-select:
|
step s1-select:
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
count
|
count
|
||||||
|
|
||||||
|
@ -483,7 +513,7 @@ master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
step s1-ddl:
|
step s1-ddl:
|
||||||
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x);
|
||||||
<waiting ...>
|
<waiting ...>
|
||||||
step s2-commit:
|
step s2-commit:
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -496,7 +526,7 @@ step s2-print-index-count:
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||||
ORDER BY
|
ORDER BY
|
||||||
nodeport;
|
nodeport;
|
||||||
|
|
||||||
|
@ -506,3 +536,367 @@ nodeport success result
|
||||||
57637 t 1
|
57637 t 1
|
||||||
57638 t 1
|
57638 t 1
|
||||||
57638 t 1
|
57638 t 1
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-update-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-update-copy-table:
|
||||||
|
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-update-copy-table: <... completed>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-delete-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-delete-copy-table:
|
||||||
|
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-delete-copy-table: <... completed>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-insert-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-insert-copy-table:
|
||||||
|
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-insert-copy-table: <... completed>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-copy-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-copy-copy-table:
|
||||||
|
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-copy-copy-table: <... completed>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-ddl-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-ddl-copy-table:
|
||||||
|
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
||||||
|
<waiting ...>
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-ddl-copy-table: <... completed>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-copy-placement s1-select-copy-table s2-commit s1-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-select-copy-table:
|
||||||
|
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-update-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-update-copy-table:
|
||||||
|
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-copy-placement: <... completed>
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-delete-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-delete-copy-table:
|
||||||
|
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-copy-placement: <... completed>
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-insert-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-insert-copy-table:
|
||||||
|
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-copy-placement: <... completed>
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-copy-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-copy-copy-table:
|
||||||
|
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-copy-placement: <... completed>
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-ddl-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-ddl-copy-table:
|
||||||
|
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
||||||
|
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-copy-placement: <... completed>
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-select-copy-table s2-copy-placement s1-commit s2-commit
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-select-copy-table:
|
||||||
|
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
step s2-copy-placement:
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
|
||||||
|
master_copy_shard_placement
|
||||||
|
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication
|
||||||
|
CREATE SCHEMA mcsp;
|
||||||
|
SET search_path TO mcsp;
|
||||||
|
SET citus.next_shard_id TO 8139000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'statement';
|
||||||
|
CREATE TABLE ref_table(a int);
|
||||||
|
SELECT create_reference_table('ref_table');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE data (
|
||||||
|
key text primary key,
|
||||||
|
value text not null,
|
||||||
|
check (value <> '')
|
||||||
|
);
|
||||||
|
CREATE INDEX ON data (value);
|
||||||
|
SELECT create_distributed_table('data','key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE history (
|
||||||
|
key text not null,
|
||||||
|
t timestamptz not null,
|
||||||
|
value text not null
|
||||||
|
) PARTITION BY RANGE (t);
|
||||||
|
CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
|
||||||
|
CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
|
||||||
|
SELECT create_distributed_table('history','key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO data VALUES ('key-1', 'value-1');
|
||||||
|
INSERT INTO data VALUES ('key-2', 'value-2');
|
||||||
|
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
|
||||||
|
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
|
||||||
|
-- verify we error out if no healthy placement exists at source
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
ERROR: could not find placement matching "localhost:xxxxx"
|
||||||
|
HINT: Confirm the placement still exists and try again.
|
||||||
|
-- verify we error out if source and destination are the same
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
ERROR: shard xxxxx already exists in the target node
|
||||||
|
-- verify we error out if target already contains a healthy placement
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
ERROR: shard xxxxx already exists in the target node
|
||||||
|
-- replicate shard that contains key-1
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
do_repair := false);
|
||||||
|
master_copy_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- forcefully mark the old replica as inactive
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||||
|
WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port;
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||||
|
WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port;
|
||||||
|
-- should still have all data available thanks to new replica
|
||||||
|
SELECT count(*) FROM data;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM history;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test we can not replicate MX tables
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE mx_table(a int);
|
||||||
|
SELECT create_distributed_table('mx_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('mx_table', '1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied
|
||||||
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
stop_metadata_sync_to_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA mcsp CASCADE;
|
|
@ -200,7 +200,7 @@ test: multi_complex_count_distinct multi_select_distinct
|
||||||
test: multi_modifications
|
test: multi_modifications
|
||||||
test: multi_distribution_metadata
|
test: multi_distribution_metadata
|
||||||
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
|
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list multi_repair_shards
|
||||||
test: multi_upsert multi_simple_queries multi_data_types
|
test: multi_upsert multi_simple_queries multi_data_types master_copy_shard_placement
|
||||||
# multi_utilities cannot be run in parallel with other tests because it checks
|
# multi_utilities cannot be run in parallel with other tests because it checks
|
||||||
# global locks
|
# global locks
|
||||||
test: multi_utilities
|
test: multi_utilities
|
||||||
|
|
|
@ -4,16 +4,21 @@ setup
|
||||||
{
|
{
|
||||||
SET citus.shard_count TO 2;
|
SET citus.shard_count TO 2;
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
CREATE TABLE test_repair_placement_vs_modification (x int, y int);
|
||||||
|
SELECT create_distributed_table('test_repair_placement_vs_modification', 'x');
|
||||||
|
|
||||||
|
SELECT get_shard_id_for_distribution_column('test_repair_placement_vs_modification', 5) INTO selected_shard;
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
CREATE TABLE test_copy_placement_vs_modification (x int, y int);
|
CREATE TABLE test_copy_placement_vs_modification (x int, y int);
|
||||||
SELECT create_distributed_table('test_copy_placement_vs_modification', 'x');
|
SELECT create_distributed_table('test_copy_placement_vs_modification', 'x');
|
||||||
|
|
||||||
SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
teardown
|
teardown
|
||||||
{
|
{
|
||||||
DROP TABLE test_copy_placement_vs_modification;
|
DROP TABLE test_repair_placement_vs_modification;
|
||||||
DROP TABLE selected_shard;
|
DROP TABLE selected_shard;
|
||||||
|
DROP TABLE test_copy_placement_vs_modification;
|
||||||
}
|
}
|
||||||
|
|
||||||
session "s1"
|
session "s1"
|
||||||
|
@ -24,39 +29,69 @@ step "s1-begin"
|
||||||
SET LOCAL citus.select_opens_transaction_block TO off;
|
SET LOCAL citus.select_opens_transaction_block TO off;
|
||||||
}
|
}
|
||||||
|
|
||||||
// since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements
|
// since test_repair_placement_vs_modification has rep > 1 simple select query doesn't hit all placements
|
||||||
// hence not all placements are cached
|
// hence not all placements are cached
|
||||||
step "s1-load-cache"
|
step "s1-load-cache"
|
||||||
{
|
{
|
||||||
TRUNCATE test_copy_placement_vs_modification;
|
TRUNCATE test_repair_placement_vs_modification;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-insert"
|
step "s1-insert"
|
||||||
{
|
{
|
||||||
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
INSERT INTO test_repair_placement_vs_modification VALUES (5, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-update"
|
step "s1-update"
|
||||||
{
|
{
|
||||||
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
UPDATE test_repair_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-delete"
|
step "s1-delete"
|
||||||
{
|
{
|
||||||
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
DELETE FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-select"
|
step "s1-select"
|
||||||
{
|
{
|
||||||
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
SELECT count(*) FROM test_repair_placement_vs_modification WHERE x = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-ddl"
|
step "s1-ddl"
|
||||||
{
|
{
|
||||||
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
CREATE INDEX test_repair_placement_vs_modification_index ON test_repair_placement_vs_modification(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
step "s1-copy"
|
step "s1-copy"
|
||||||
|
{
|
||||||
|
COPY test_repair_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-insert-copy-table"
|
||||||
|
{
|
||||||
|
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-update-copy-table"
|
||||||
|
{
|
||||||
|
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-delete-copy-table"
|
||||||
|
{
|
||||||
|
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-select-copy-table"
|
||||||
|
{
|
||||||
|
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-ddl-copy-table"
|
||||||
|
{
|
||||||
|
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-copy-copy-table"
|
||||||
{
|
{
|
||||||
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo 1,1 && echo 2,2 && echo 3,3 && echo 4,4 && echo 5,5' WITH CSV;
|
||||||
}
|
}
|
||||||
|
@ -83,6 +118,13 @@ step "s2-repair-placement"
|
||||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
SELECT master_copy_shard_placement((SELECT * FROM selected_shard), 'localhost', 57637, 'localhost', 57638);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s2-copy-placement"
|
||||||
|
{
|
||||||
|
SELECT master_copy_shard_placement((SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5)),
|
||||||
|
'localhost', 57637, 'localhost', 57638,
|
||||||
|
do_repair := false, transfer_mode := 'block_writes');
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-commit"
|
step "s2-commit"
|
||||||
{
|
{
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -93,7 +135,7 @@ step "s2-print-content"
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select y from %s WHERE x = 5')
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT * FROM selected_shard)
|
shardid IN (SELECT * FROM selected_shard)
|
||||||
ORDER BY
|
ORDER BY
|
||||||
|
@ -105,7 +147,7 @@ step "s2-print-index-count"
|
||||||
SELECT
|
SELECT
|
||||||
nodeport, success, result
|
nodeport, success, result
|
||||||
FROM
|
FROM
|
||||||
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
run_command_on_placements('test_repair_placement_vs_modification', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
|
||||||
ORDER BY
|
ORDER BY
|
||||||
nodeport;
|
nodeport;
|
||||||
}
|
}
|
||||||
|
@ -126,3 +168,19 @@ permutation "s1-insert" "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-b
|
||||||
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content"
|
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-insert" "s2-commit" "s1-commit" "s2-print-content"
|
||||||
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content"
|
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-copy" "s2-commit" "s1-commit" "s2-print-content"
|
||||||
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count"
|
permutation "s1-begin" "s1-select" "s2-set-placement-inactive" "s2-begin" "s2-repair-placement" "s1-ddl" "s2-commit" "s1-commit" "s2-print-index-count"
|
||||||
|
|
||||||
|
// verify that copy placement (do_repair := false) blocks other operations, except SELECT
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-update-copy-table" "s2-commit" "s1-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-delete-copy-table" "s2-commit" "s1-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-insert-copy-table" "s2-commit" "s1-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-copy-copy-table" "s2-commit" "s1-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-ddl-copy-table" "s2-commit" "s1-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-copy-placement" "s1-select-copy-table" "s2-commit" "s1-commit"
|
||||||
|
|
||||||
|
// verify that copy placement (do_repair := false) is blocked by other operations, except SELECT
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-update-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-delete-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-insert-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-copy-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-ddl-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-select-copy-table" "s2-copy-placement" "s1-commit" "s2-commit"
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
-- Tests for master_copy_shard_placement, which can be used for adding replicas in statement-based replication
|
||||||
|
CREATE SCHEMA mcsp;
|
||||||
|
SET search_path TO mcsp;
|
||||||
|
SET citus.next_shard_id TO 8139000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'statement';
|
||||||
|
|
||||||
|
CREATE TABLE ref_table(a int);
|
||||||
|
SELECT create_reference_table('ref_table');
|
||||||
|
|
||||||
|
CREATE TABLE data (
|
||||||
|
key text primary key,
|
||||||
|
value text not null,
|
||||||
|
check (value <> '')
|
||||||
|
);
|
||||||
|
CREATE INDEX ON data (value);
|
||||||
|
SELECT create_distributed_table('data','key');
|
||||||
|
|
||||||
|
CREATE TABLE history (
|
||||||
|
key text not null,
|
||||||
|
t timestamptz not null,
|
||||||
|
value text not null
|
||||||
|
) PARTITION BY RANGE (t);
|
||||||
|
CREATE TABLE history_p1 PARTITION OF history FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
|
||||||
|
CREATE TABLE history_p2 PARTITION OF history FOR VALUES FROM ('2020-01-01') TO ('2021-01-01');
|
||||||
|
SELECT create_distributed_table('history','key');
|
||||||
|
|
||||||
|
INSERT INTO data VALUES ('key-1', 'value-1');
|
||||||
|
INSERT INTO data VALUES ('key-2', 'value-2');
|
||||||
|
|
||||||
|
INSERT INTO history VALUES ('key-1', '2020-02-01', 'old');
|
||||||
|
INSERT INTO history VALUES ('key-1', '2019-10-01', 'older');
|
||||||
|
|
||||||
|
-- verify we error out if no healthy placement exists at source
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
-- verify we error out if source and destination are the same
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
-- verify we error out if target already contains a healthy placement
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
(SELECT shardid FROM pg_dist_shard WHERE logicalrelid='ref_table'::regclass::oid),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
-- replicate shard that contains key-1
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('data', 'key-1'),
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
-- forcefully mark the old replica as inactive
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||||
|
WHERE shardid = get_shard_id_for_distribution_column('data', 'key-1') AND nodeport = :worker_2_port;
|
||||||
|
|
||||||
|
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||||
|
WHERE shardid = get_shard_id_for_distribution_column('history', 'key-1') AND nodeport = :worker_2_port;
|
||||||
|
|
||||||
|
-- should still have all data available thanks to new replica
|
||||||
|
SELECT count(*) FROM data;
|
||||||
|
SELECT count(*) FROM history;
|
||||||
|
|
||||||
|
-- test we can not replicate MX tables
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
|
CREATE TABLE mx_table(a int);
|
||||||
|
SELECT create_distributed_table('mx_table', 'a');
|
||||||
|
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('mx_table', '1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA mcsp CASCADE;
|
Loading…
Reference in New Issue