Merge pull request #6137 from citusdata/marcocitus/tenant-isolation

pull/6119/head
Marco Slot 2022-08-08 13:56:02 +02:00 committed by GitHub
commit cc2afb4b63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 583 additions and 1073 deletions

View File

@ -47,23 +47,6 @@ PG_FUNCTION_INFO_V1(isolate_tenant_to_new_shard);
PG_FUNCTION_INFO_V1(worker_hash);
/* local function forward declarations */
static uint64 SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum);
static void CreateSplitOffShards(ShardInterval *sourceShard, int hashedValue,
List **splitOffShardList, int *isolatedShardId);
static List * ShardTemplateList(ShardInterval *sourceShard, int hashedValue,
int *isolatedShardIndex);
static ShardInterval * CreateSplitOffShardFromTemplate(ShardInterval *shardTemplate,
Oid relationId);
static List * SplitOffCommandList(ShardInterval *sourceShard,
ShardInterval *splitOffShard);
static void ExecuteCommandListOnPlacements(List *commandList, List *placementList);
static void InsertSplitOffShardMetadata(List *splitOffShardList,
List *sourcePlacementList);
static void CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList);
static void ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList);
/*
* isolate_tenant_to_new_shard isolates a tenant to its own shard by spliting
* the current matching shard.
@ -77,7 +60,6 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
Datum inputDatum = PG_GETARG_DATUM(1);
text *cascadeOptionText = PG_GETARG_TEXT_P(2);
ListCell *colocatedTableCell = NULL;
EnsureTableOwner(relationId);
@ -91,44 +73,9 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
"is only support for hash distributed tables")));
}
if (PartitionedTable(relationId))
{
char *sourceRelationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot isolate shard placement of '%s', because it "
"is a partitioned table", sourceRelationName),
errdetail("Citus does not support isolating placements of "
"partitioned tables.")));
}
List *colocatedTableList = ColocatedTableList(relationId);
int colocatedTableCount = list_length(colocatedTableList);
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
/*
* At the moment, Citus does not support copying a shard if that shard's
* relation is in a colocation group with a partitioned table or partition.
*/
if (colocatedTableId != relationId &&
PartitionedTable(colocatedTableId))
{
char *sourceRelationName = get_rel_name(relationId);
char *colocatedRelationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot isolate shard placement of '%s', because it "
"is a partitioned table", colocatedRelationName),
errdetail("In colocation group of '%s', a partitioned "
"relation exists: '%s'. Citus does not support "
"isolating placements of partitioned tables.",
sourceRelationName, colocatedRelationName)));
}
}
Oid inputDataType = get_fn_expr_argtype(fcinfo->flinfo, 1);
char *tenantIdString = DatumToString(inputDatum, inputDataType);
@ -162,9 +109,70 @@ isolate_tenant_to_new_shard(PG_FUNCTION_ARGS)
ereport(ERROR, (errmsg("tenant does not have a shard")));
}
uint64 isolatedShardId = SplitShardByValue(sourceShard, tenantIdDatum);
int shardMinValue = DatumGetInt32(sourceShard->minValue);
int shardMaxValue = DatumGetInt32(sourceShard->maxValue);
if (shardMinValue == shardMaxValue)
{
char *tableName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("table %s has already been isolated for the given value",
quote_identifier(tableName)))));
}
PG_RETURN_INT64(isolatedShardId);
List *sourcePlacementList = ActiveShardPlacementList(sourceShard->shardId);
if (list_length(sourcePlacementList) > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot isolate tenants when using shard replication")));
}
ShardPlacement *sourceShardPlacement = linitial(sourcePlacementList);
/* get hash function name */
FmgrInfo *hashFunction = cacheEntry->hashFunction;
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, tenantIdDatum);
int hashedValue = DatumGetInt32(hashedValueDatum);
List *shardSplitPointsList = NIL;
/*
* If the hash value lies at one of the boundaries, we only have a single
* split point.
*/
if (hashedValue == shardMinValue)
{
shardSplitPointsList = list_make1_int(hashedValue);
}
else if (hashedValue == shardMaxValue)
{
shardSplitPointsList = list_make1_int(hashedValue - 1);
}
else
{
shardSplitPointsList = list_make2_int(hashedValue - 1, hashedValue);
}
/* we currently place the isolated hash value into the same node */
int sourceNodeId = sourceShardPlacement->nodeId;
List *nodeIdsForPlacementList = list_make2_int(sourceNodeId, sourceNodeId);
if (list_length(shardSplitPointsList) > 1)
{
nodeIdsForPlacementList = lappend_int(nodeIdsForPlacementList, sourceNodeId);
}
SplitShard(BLOCKING_SPLIT,
ISOLATE_TENANT_TO_NEW_SHARD,
sourceShard->shardId,
shardSplitPointsList,
nodeIdsForPlacementList);
cacheEntry = GetCitusTableCacheEntry(relationId);
ShardInterval *newShard = FindShardInterval(tenantIdDatum, cacheEntry);
PG_RETURN_INT64(newShard->shardId);
}
@ -199,498 +207,3 @@ worker_hash(PG_FUNCTION_ARGS)
PG_RETURN_INT32(hashedValueDatum);
}
/*
* SplitShardByValue gets a shard and a value which is in the range of
* distribution column of this shard. Then, it splits this shard and all its
* colocated shards into three; the lower range, the given value itself, and
* the upper range. Finally, it returns the id of the shard which is created
* for the given value.
*/
static uint64
SplitShardByValue(ShardInterval *sourceShard, Datum distributionValueDatum)
{
Oid relationId = sourceShard->relationId;
int isolatedShardId = 0;
List *splitOffShardList = NIL;
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot isolate a tenant after other modifications "
"in the same transaction")));
}
/* sort the tables to avoid deadlocks */
List *colocatedTableList = ColocatedTableList(relationId);
colocatedTableList = SortList(colocatedTableList, CompareOids);
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
{
/*
* Block concurrent DDL / TRUNCATE commands on the relation. Similarly,
* block concurrent citus_move_shard_placement()/isolate_tenant_to_new_shard()
* on any shard of the same relation. This is OK for now since
* we're executing shard moves/splits sequentially anyway.
*/
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
}
/* get colocated shard list */
List *colocatedShardList = ColocatedShardIntervalList(sourceShard);
/* get locks */
BlockWritesToShardList(colocatedShardList);
ErrorIfCannotSplitShard(ISOLATE_TENANT_TO_NEW_SHARD, sourceShard);
/* get hash function name */
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
FmgrInfo *hashFunction = cacheEntry->hashFunction;
/* get hashed value of the distribution value */
Datum hashedValueDatum = FunctionCall1(hashFunction, distributionValueDatum);
int hashedValue = DatumGetInt32(hashedValueDatum);
/* create a list of nodes with source shard placements */
List *sourcePlacementList = ActiveShardPlacementList(sourceShard->shardId);
/* create new shards in a separate transaction and commit them */
CreateSplitOffShards(sourceShard, hashedValue, &splitOffShardList, &isolatedShardId);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(colocatedShardList);
/* insert new metadata */
InsertSplitOffShardMetadata(splitOffShardList, sourcePlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitOffShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignConstraints(splitOffShardList, sourcePlacementList);
CitusInvalidateRelcacheByRelid(DistShardRelationId());
return isolatedShardId;
}
/*
* CreateForeignConstraints creates the foreign constraints on the newly
* created shards via the tenant isolation.
*
* The function treats foreign keys to reference tables and foreign keys to
* co-located distributed tables differently. The former one needs to be
* executed over a single connection to prevent self-deadlocks. The latter
* one can be executed in parallel if there are multiple replicas.
*/
static void
CreateForeignConstraints(List *splitOffShardList, List *sourcePlacementList)
{
ListCell *splitOffShardCell = NULL;
List *colocatedShardForeignConstraintCommandList = NIL;
List *referenceTableForeignConstraintList = NIL;
foreach(splitOffShardCell, splitOffShardList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(splitOffShardCell);
List *currentColocatedForeignKeyList = NIL;
List *currentReferenceForeignKeyList = NIL;
CopyShardForeignConstraintCommandListGrouped(splitOffShard,
&currentColocatedForeignKeyList,
&currentReferenceForeignKeyList);
colocatedShardForeignConstraintCommandList =
list_concat(colocatedShardForeignConstraintCommandList,
currentColocatedForeignKeyList);
referenceTableForeignConstraintList =
list_concat(referenceTableForeignConstraintList,
currentReferenceForeignKeyList);
}
/*
* We can use parallel connections to while creating co-located foreign keys
* if the source placement .
* However, foreign keys to reference tables need to be created using a single
* connection per worker to prevent self-deadlocks.
*/
if (colocatedShardForeignConstraintCommandList != NIL)
{
ExecuteCommandListOnPlacements(colocatedShardForeignConstraintCommandList,
sourcePlacementList);
}
if (referenceTableForeignConstraintList != NIL)
{
ListCell *shardPlacementCell = NULL;
foreach(shardPlacementCell, sourcePlacementList)
{
ShardPlacement *shardPlacement =
(ShardPlacement *) lfirst(shardPlacementCell);
char *nodeName = shardPlacement->nodeName;
int32 nodePort = shardPlacement->nodePort;
/*
* We're using the connections that we've used for dropping the
* source placements within the same coordinated transaction.
*/
ExecuteCommandListOnWorker(nodeName, nodePort,
referenceTableForeignConstraintList);
}
}
}
/*
* ExecuteCommandListOnWorker executes the command on the given node within
* the coordinated 2PC.
*/
static void
ExecuteCommandListOnWorker(char *nodeName, int nodePort, List *commandList)
{
ListCell *commandCell = NULL;
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorker(nodeName, nodePort, command);
}
}
/*
* CreateSplitOffShards gets a shard and a hashed value to pick the split point.
* First, it creates templates to create new shards. Then, for every colocated
* shard, it creates new split shards data and physically creates them on the
* worker nodes. This function returns newly created split off shards and the
* matching shard id for the source shard and hashed value via passed parameters.
*/
static void
CreateSplitOffShards(ShardInterval *sourceShard, int hashedValue,
List **splitOffShardList, int *isolatedShardId)
{
List *nodeCommandList = NIL;
ListCell *sourceColocatedShardCell = NULL;
int isolatedShardIndex = 0;
List *sourceColocatedShardList = ColocatedShardIntervalList(sourceShard);
List *shardTemplateList = ShardTemplateList(sourceShard, hashedValue,
&isolatedShardIndex);
foreach(sourceColocatedShardCell, sourceColocatedShardList)
{
ShardInterval *sourceColocatedShard =
(ShardInterval *) lfirst(sourceColocatedShardCell);
Oid relationId = sourceColocatedShard->relationId;
ListCell *templateShardCell = NULL;
int currentShardIndex = 0;
foreach(templateShardCell, shardTemplateList)
{
ShardInterval *templateShard = (ShardInterval *) lfirst(templateShardCell);
ShardInterval *splitOffShard = CreateSplitOffShardFromTemplate(templateShard,
relationId);
List *splitOffCommandList = SplitOffCommandList(sourceColocatedShard,
splitOffShard);
nodeCommandList = list_concat(nodeCommandList, splitOffCommandList);
/* check if this is the isolated shard for the given table */
if (splitOffShard->relationId == sourceShard->relationId &&
currentShardIndex == isolatedShardIndex)
{
(*isolatedShardId) = splitOffShard->shardId;
}
/* add newly created split off shards to list */
(*splitOffShardList) = lappend(*splitOffShardList, splitOffShard);
currentShardIndex++;
}
}
List *sourcePlacementList = ActiveShardPlacementList(sourceShard->shardId);
ExecuteCommandListOnPlacements(nodeCommandList, sourcePlacementList);
}
/*
* ShardTemplateList creates shard templates with new min and max values from
* the given shard and the split point which is the given hashed value.
* It returns the list of shard templates, and passes the isolated shard index
* via isolatedShardIndex parameter.
*/
static List *
ShardTemplateList(ShardInterval *sourceShard, int hashedValue, int *isolatedShardIndex)
{
List *shardTemplateList = NIL;
/* get min and max values of the source shard */
int32 shardMinValue = DatumGetInt32(sourceShard->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShard->maxValue);
(*isolatedShardIndex) = 0;
/* add a shard template for lower range if exists */
if (shardMinValue < hashedValue)
{
ShardInterval *lowerRangeShard = CopyShardInterval(sourceShard);
lowerRangeShard->minValue = Int32GetDatum(shardMinValue);
lowerRangeShard->maxValue = Int32GetDatum(hashedValue - 1);
shardTemplateList = lappend(shardTemplateList, lowerRangeShard);
(*isolatedShardIndex) = 1;
}
/* add shard template for the isolated value */
ShardInterval *isolatedShard = CopyShardInterval(sourceShard);
isolatedShard->minValue = Int32GetDatum(hashedValue);
isolatedShard->maxValue = Int32GetDatum(hashedValue);
shardTemplateList = lappend(shardTemplateList, isolatedShard);
/* add a shard template for upper range if exists */
if (shardMaxValue > hashedValue)
{
ShardInterval *upperRangeShard = CopyShardInterval(sourceShard);
upperRangeShard->minValue = Int32GetDatum(hashedValue + 1);
upperRangeShard->maxValue = Int32GetDatum(shardMaxValue);
shardTemplateList = lappend(shardTemplateList, upperRangeShard);
}
if (list_length(shardTemplateList) == 1)
{
char *tableName = get_rel_name(sourceShard->relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already been isolated for the "
"given value", tableName)));
}
return shardTemplateList;
}
/*
* CreateSplitOffShardFromTemplate creates a new split off shard from the given
* shard template by creating a new shard id and setting the relation id.
*/
static ShardInterval *
CreateSplitOffShardFromTemplate(ShardInterval *shardTemplate, Oid relationId)
{
ShardInterval *splitOffShard = CopyShardInterval(shardTemplate);
/* set new shard id and the relation id */
splitOffShard->shardId = GetNextShardId();
splitOffShard->relationId = relationId;
return splitOffShard;
}
/*
* SplitOffCommandList creates a command list to run on worker nodes to create
* new split off shard from the source shard.
*/
static List *
SplitOffCommandList(ShardInterval *sourceShard, ShardInterval *splitOffShard)
{
List *splitOffCommandList = NIL;
bool includeSequenceDefaults = false;
Oid relationId = sourceShard->relationId;
Var *partitionKey = DistPartitionKey(relationId);
Assert(partitionKey != NULL);
const char *partitionColumnName = get_attname(relationId,
partitionKey->varattno, false);
const char *quotedPartitionColumnName = quote_identifier(partitionColumnName);
char *splitOffShardName = ConstructQualifiedShardName(splitOffShard);
char *sourceShardName = ConstructQualifiedShardName(sourceShard);
int32 shardMinValue = DatumGetInt32(splitOffShard->minValue);
int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue);
List *tableCreationCommandList =
GetPreLoadTableCreationCommands(relationId, includeSequenceDefaults, NULL);
tableCreationCommandList = WorkerApplyShardDDLCommandList(tableCreationCommandList,
splitOffShard->shardId);
splitOffCommandList = list_concat(splitOffCommandList, tableCreationCommandList);
StringInfo splitOffShardCommand = makeStringInfo();
appendStringInfo(splitOffShardCommand,
"INSERT INTO %s SELECT * FROM %s WHERE "
"worker_hash(%s) >= %d AND worker_hash(%s) <= %d",
splitOffShardName, sourceShardName, quotedPartitionColumnName,
shardMinValue, quotedPartitionColumnName, shardMaxValue);
splitOffCommandList = lappend(splitOffCommandList, splitOffShardCommand->data);
List *indexCommandList = GetPostLoadTableCreationCommands(relationId, true, true);
indexCommandList = WorkerApplyShardDDLCommandList(indexCommandList,
splitOffShard->shardId);
splitOffCommandList = list_concat(splitOffCommandList, indexCommandList);
return splitOffCommandList;
}
/*
* ExecuteCommandListOnPlacements runs the given command list on the nodes of
* the given shard placement list. First, it creates connections. Then it sends
* commands one by one. For every command, first it send the command to all
* connections and then checks the results. This helps to run long running
* commands in parallel. Finally, it sends commit messages to all connections
* and close them.
*/
static void
ExecuteCommandListOnPlacements(List *commandList, List *placementList)
{
List *workerConnectionList = NIL;
ListCell *workerConnectionCell = NULL;
ListCell *shardPlacementCell = NULL;
ListCell *commandCell = NULL;
/* create connections and start transactions */
foreach(shardPlacementCell, placementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
char *nodeName = shardPlacement->nodeName;
int32 nodePort = shardPlacement->nodePort;
int connectionFlags = FORCE_NEW_CONNECTION;
char *currentUser = CurrentUserName();
/* create a new connection */
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName,
nodePort,
currentUser,
NULL);
/* mark connection as critical ans start transaction */
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBegin(workerConnection);
/* add connection to the list */
workerConnectionList = lappend(workerConnectionList, workerConnection);
}
/* send and check results for every command one by one */
foreach(commandCell, commandList)
{
char *command = lfirst(commandCell);
/* first only send the command */
foreach(workerConnectionCell, workerConnectionList)
{
MultiConnection *workerConnection =
(MultiConnection *) lfirst(workerConnectionCell);
int querySent = SendRemoteCommand(workerConnection, command);
if (querySent == 0)
{
ReportConnectionError(workerConnection, ERROR);
}
}
/* then check the result separately to run long running commands in parallel */
foreach(workerConnectionCell, workerConnectionList)
{
MultiConnection *workerConnection =
(MultiConnection *) lfirst(workerConnectionCell);
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(workerConnection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(workerConnection, result, ERROR);
}
PQclear(result);
ForgetResults(workerConnection);
}
}
/* finally commit each transaction and close connections */
foreach(workerConnectionCell, workerConnectionList)
{
MultiConnection *workerConnection =
(MultiConnection *) lfirst(workerConnectionCell);
RemoteTransactionCommit(workerConnection);
CloseConnection(workerConnection);
}
}
/*
* InsertSplitOffShardMetadata inserts new shard and shard placement data into
* catolog tables both the coordinator and mx nodes.
*/
static void
InsertSplitOffShardMetadata(List *splitOffShardList, List *sourcePlacementList)
{
List *syncedShardList = NIL;
ListCell *shardCell = NULL;
ListCell *commandCell = NULL;
/* add new metadata */
foreach(shardCell, splitOffShardList)
{
ShardInterval *splitOffShard = (ShardInterval *) lfirst(shardCell);
Oid relationId = splitOffShard->relationId;
uint64 shardId = splitOffShard->shardId;
char storageType = splitOffShard->storageType;
ListCell *shardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(splitOffShard->minValue);
int32 shardMaxValue = DatumGetInt32(splitOffShard->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValueText,
shardMaxValueText);
/* split off shard placement metadata */
foreach(shardPlacementCell, sourcePlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardSize = 0;
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, SHARD_STATE_ACTIVE,
shardSize, placement->groupId);
}
if (ShouldSyncTableMetadata(relationId))
{
syncedShardList = lappend(syncedShardList, splitOffShard);
}
}
/* send commands to synced nodes one by one */
List *splitOffShardMetadataCommandList = ShardListInsertCommand(syncedShardList);
foreach(commandCell, splitOffShardMetadataCommandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkersWithMetadata(command);
}
}

View File

@ -575,6 +575,9 @@ BlockingShardSplit(SplitOperation splitOperation,
}
PG_CATCH();
{
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
@ -1447,6 +1450,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
}
PG_CATCH();
{
/* end ongoing transactions to enable us to clean up */
ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);

View File

@ -10,6 +10,7 @@ SET SEARCH_PATH = tenant_isolation;
SET citus.shard_count TO 2;
SET citus.next_shard_id TO 300;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SELECT pg_backend_pid() as pid \gset
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
@ -59,7 +60,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on colocated table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").after(1).cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -68,17 +69,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: canceling statement due to user request
-- failure on colocated table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_2").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cancellation on colocated table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_2").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -94,8 +94,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cancellation on colocated table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_2 ADD CONSTRAINT").after(2).cancel(' || :pid || ')');
mitmproxy
@ -116,7 +115,7 @@ SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
-- cancellation on table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").after(1).cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -125,17 +124,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: canceling statement due to user request
-- failure on table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_1").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cancellation on table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_1").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -151,8 +149,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_1
(1 row)
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- cancellation on table constraints
SELECT citus.mitmproxy('conn.onQuery(query="ALTER TABLE tenant_isolation.table_1 ADD CONSTRAINT").after(2).cancel(' || :pid || ')');
mitmproxy
@ -323,8 +320,8 @@ DO LANGUAGE plpgsql
$$
BEGIN
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
EXCEPTION WHEN OTHERS THEN
RAISE 'Command failed to execute';
EXCEPTION WHEN OTHERS THEN
RAISE 'Command failed to execute';
END;
$$;
ERROR: Command failed to execute

View File

@ -7,10 +7,10 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -19,7 +19,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -27,10 +27,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -38,26 +38,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-update:
UPDATE isolation_table SET value = 5 WHERE id = 5;
UPDATE isolation_table SET value = 5 WHERE id = 5;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-update: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -80,10 +80,10 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -92,7 +92,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -100,10 +100,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -111,26 +111,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-delete:
DELETE FROM isolation_table WHERE id = 5;
DELETE FROM isolation_table WHERE id = 5;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-delete: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -153,7 +153,7 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-begin:
BEGIN;
@ -162,7 +162,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -170,10 +170,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -181,26 +181,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-insert: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -222,7 +222,7 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-begin:
BEGIN;
@ -231,7 +231,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -239,10 +239,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -250,26 +250,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-copy:
COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-copy: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -291,7 +291,7 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-begin:
BEGIN;
@ -300,7 +300,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -308,10 +308,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -319,25 +319,25 @@ isolate_tenant_to_new_shard
(1 row)
step s1-ddl:
CREATE INDEX test_table_index ON isolation_table(id);
CREATE INDEX test_table_index ON isolation_table(id);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-ddl: <... completed>
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -352,12 +352,12 @@ id|value
(0 rows)
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport|success|result
---------------------------------------------------------------------
@ -375,7 +375,7 @@ create_distributed_table
(1 row)
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -384,7 +384,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -392,10 +392,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -403,26 +403,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-update:
UPDATE isolation_table SET value = 5 WHERE id = 5;
UPDATE isolation_table SET value = 5 WHERE id = 5;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-update: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -445,7 +445,7 @@ create_distributed_table
(1 row)
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -454,7 +454,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -462,10 +462,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -473,26 +473,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-delete:
DELETE FROM isolation_table WHERE id = 5;
DELETE FROM isolation_table WHERE id = 5;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-delete: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -521,7 +521,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -529,10 +529,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -540,26 +540,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-insert: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -587,7 +587,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -595,10 +595,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -606,26 +606,26 @@ isolate_tenant_to_new_shard
(1 row)
step s1-copy:
COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
COPY isolation_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-copy: <... completed>
ERROR: could not find valid entry for shard xxxxx
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -653,7 +653,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-select:
SELECT count(*) FROM isolation_table WHERE id = 5;
SELECT count(*) FROM isolation_table WHERE id = 5;
count
---------------------------------------------------------------------
@ -661,10 +661,10 @@ count
(1 row)
step s2-begin:
BEGIN;
BEGIN;
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -672,25 +672,25 @@ isolate_tenant_to_new_shard
(1 row)
step s1-ddl:
CREATE INDEX test_table_index ON isolation_table(id);
CREATE INDEX test_table_index ON isolation_table(id);
<waiting ...>
step s2-commit:
COMMIT;
COMMIT;
step s1-ddl: <... completed>
step s1-commit:
COMMIT;
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
@ -705,12 +705,12 @@ id|value
(0 rows)
step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
SELECT
nodeport, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from pg_indexes WHERE tablename = ''%s''')
ORDER BY
nodeport;
nodeport|success|result
---------------------------------------------------------------------
@ -728,10 +728,10 @@ create_distributed_table
(1 row)
step s1-load-cache:
TRUNCATE isolation_table;
TRUNCATE isolation_table;
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -740,7 +740,7 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 2);
SELECT isolate_tenant_to_new_shard('isolation_table', 2);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
@ -748,37 +748,30 @@ isolate_tenant_to_new_shard
(1 row)
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
<waiting ...>
step s1-commit:
COMMIT;
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1500067
(1 row)
ERROR: could not acquire the lock required to split public.isolation_table
step s1-commit:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500066|t | 0
57637|1500067|t | 1
57637|1500068|t | 0
57637|1500061|t | 1
57638|1500063|t | 0
57638|1500064|t | 0
57638|1500065|t | 0
(6 rows)
(4 rows)
id|value
---------------------------------------------------------------------
@ -793,7 +786,7 @@ create_distributed_table
(1 row)
step s1-insert:
INSERT INTO isolation_table VALUES (5, 10);
INSERT INTO isolation_table VALUES (5, 10);
step s1-begin:
BEGIN;
@ -802,45 +795,38 @@ step s1-begin:
SET citus.select_opens_transaction_block TO false;
step s1-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 2);
SELECT isolate_tenant_to_new_shard('isolation_table', 2);
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1500073
1500070
(1 row)
step s2-isolate-tenant:
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
<waiting ...>
step s1-commit:
COMMIT;
SELECT isolate_tenant_to_new_shard('isolation_table', 5);
step s2-isolate-tenant: <... completed>
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1500076
(1 row)
ERROR: could not acquire the lock required to split public.isolation_table
step s1-commit:
COMMIT;
step s2-print-cluster:
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
-- row count per shard
SELECT
nodeport, shardid, success, result
FROM
run_command_on_placements('isolation_table', 'select count(*) from %s')
ORDER BY
nodeport, shardid;
-- rows
SELECT id, value FROM isolation_table ORDER BY id, value;
nodeport|shardid|success|result
---------------------------------------------------------------------
57637|1500075|t | 0
57637|1500076|t | 1
57637|1500077|t | 0
57638|1500072|t | 0
57638|1500073|t | 0
57638|1500074|t | 0
(6 rows)
57637|1500067|t | 1
57638|1500069|t | 0
57638|1500070|t | 0
57638|1500071|t | 0
(4 rows)
id|value
---------------------------------------------------------------------

View File

@ -392,6 +392,7 @@ SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 9;
(1 row)
SET citus.shard_count TO 9;
SET citus.shard_replication_factor TO 1;
CREATE TABLE shard_split_table (a int, b int);
SELECT create_distributed_table ('shard_split_table', 'a');
create_distributed_table

View File

@ -309,6 +309,7 @@ DROP TABLE stage_postgres;
CREATE SCHEMA multiuser_schema;
CREATE TABLE multiuser_schema.hash_table(a int, b int);
CREATE TABLE multiuser_schema.reference_table(a int, b int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('multiuser_schema.hash_table', 'a', colocate_with => 'none');
create_distributed_table
---------------------------------------------------------------------
@ -323,14 +324,10 @@ ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109090 | t | f
localhost | 57637 | 109091 | t | f
localhost | 57637 | 109092 | t | f
localhost | 57637 | 109093 | t | f
localhost | 57638 | 109090 | t | f
localhost | 57638 | 109091 | t | f
localhost | 57638 | 109092 | t | f
localhost | 57638 | 109093 | t | f
(8 rows)
(4 rows)
-- grant select
GRANT SELECT ON ALL TABLES IN SCHEMA multiuser_schema TO read_access;
@ -340,14 +337,10 @@ ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109090 | t | t
localhost | 57637 | 109091 | t | t
localhost | 57637 | 109092 | t | t
localhost | 57637 | 109093 | t | t
localhost | 57638 | 109090 | t | t
localhost | 57638 | 109091 | t | t
localhost | 57638 | 109092 | t | t
localhost | 57638 | 109093 | t | t
(8 rows)
(4 rows)
-- distribute the second table
SELECT create_reference_table('multiuser_schema.reference_table');
@ -378,14 +371,10 @@ ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109095 | t | f
localhost | 57637 | 109096 | t | f
localhost | 57637 | 109097 | t | f
localhost | 57637 | 109098 | t | f
localhost | 57638 | 109095 | t | f
localhost | 57638 | 109096 | t | f
localhost | 57638 | 109097 | t | f
localhost | 57638 | 109098 | t | f
(8 rows)
(4 rows)
-- grant select again, verify it is granted
GRANT SELECT ON ALL TABLES IN SCHEMA multiuser_schema TO read_access;
@ -394,14 +383,10 @@ ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109095 | t | t
localhost | 57637 | 109096 | t | t
localhost | 57637 | 109097 | t | t
localhost | 57637 | 109098 | t | t
localhost | 57638 | 109095 | t | t
localhost | 57638 | 109096 | t | t
localhost | 57638 | 109097 | t | t
localhost | 57638 | 109098 | t | t
(8 rows)
(4 rows)
-- verify isolate tenant carries grants
SELECT isolate_tenant_to_new_shard('multiuser_schema.hash_table', 5);
@ -414,19 +399,13 @@ SELECT * FROM run_command_on_placements('multiuser_schema.hash_table', $$ select
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109091 | t | t
localhost | 57637 | 109092 | t | t
localhost | 57637 | 109093 | t | t
localhost | 57637 | 109099 | t | t
localhost | 57637 | 109100 | t | t
localhost | 57637 | 109101 | t | t
localhost | 57638 | 109091 | t | t
localhost | 57638 | 109092 | t | t
localhost | 57638 | 109093 | t | t
localhost | 57638 | 109099 | t | t
localhost | 57638 | 109100 | t | t
localhost | 57638 | 109101 | t | t
(12 rows)
(6 rows)
-- revoke select
REVOKE SELECT ON ALL TABLES IN SCHEMA multiuser_schema FROM read_access;
@ -434,19 +413,13 @@ SELECT * FROM run_command_on_placements('multiuser_schema.hash_table', $$ select
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109091 | t | f
localhost | 57637 | 109092 | t | f
localhost | 57637 | 109093 | t | f
localhost | 57637 | 109099 | t | f
localhost | 57637 | 109100 | t | f
localhost | 57637 | 109101 | t | f
localhost | 57638 | 109091 | t | f
localhost | 57638 | 109092 | t | f
localhost | 57638 | 109093 | t | f
localhost | 57638 | 109099 | t | f
localhost | 57638 | 109100 | t | f
localhost | 57638 | 109101 | t | f
(12 rows)
(6 rows)
-- test multi-schema grants
CREATE SCHEMA multiuser_second_schema;
@ -462,33 +435,23 @@ SELECT * FROM run_command_on_placements('multiuser_schema.hash_table', $$ select
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109091 | t | t
localhost | 57637 | 109092 | t | t
localhost | 57637 | 109093 | t | t
localhost | 57637 | 109099 | t | t
localhost | 57637 | 109100 | t | t
localhost | 57637 | 109101 | t | t
localhost | 57638 | 109091 | t | t
localhost | 57638 | 109092 | t | t
localhost | 57638 | 109093 | t | t
localhost | 57638 | 109099 | t | t
localhost | 57638 | 109100 | t | t
localhost | 57638 | 109101 | t | t
(12 rows)
(6 rows)
SELECT * FROM run_command_on_placements('multiuser_second_schema.hash_table', $$ select has_table_privilege('read_access', '%s', 'select') $$)
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109102 | t | t
localhost | 57637 | 109103 | t | t
localhost | 57637 | 109104 | t | t
localhost | 57637 | 109105 | t | t
localhost | 57638 | 109102 | t | t
localhost | 57638 | 109103 | t | t
localhost | 57638 | 109104 | t | t
localhost | 57638 | 109105 | t | t
(8 rows)
(4 rows)
-- revoke from multiple schemas, verify result
REVOKE SELECT ON ALL TABLES IN SCHEMA multiuser_schema, multiuser_second_schema FROM read_access;
@ -496,33 +459,23 @@ SELECT * FROM run_command_on_placements('multiuser_schema.hash_table', $$ select
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109091 | t | f
localhost | 57637 | 109092 | t | f
localhost | 57637 | 109093 | t | f
localhost | 57637 | 109099 | t | f
localhost | 57637 | 109100 | t | f
localhost | 57637 | 109101 | t | f
localhost | 57638 | 109091 | t | f
localhost | 57638 | 109092 | t | f
localhost | 57638 | 109093 | t | f
localhost | 57638 | 109099 | t | f
localhost | 57638 | 109100 | t | f
localhost | 57638 | 109101 | t | f
(12 rows)
(6 rows)
SELECT * FROM run_command_on_placements('multiuser_second_schema.hash_table', $$ select has_table_privilege('read_access', '%s', 'select') $$)
ORDER BY nodename, nodeport, shardid;
nodename | nodeport | shardid | success | result
---------------------------------------------------------------------
localhost | 57637 | 109102 | t | f
localhost | 57637 | 109103 | t | f
localhost | 57637 | 109104 | t | f
localhost | 57637 | 109105 | t | f
localhost | 57638 | 109102 | t | f
localhost | 57638 | 109103 | t | f
localhost | 57638 | 109104 | t | f
localhost | 57638 | 109105 | t | f
(8 rows)
(4 rows)
DROP SCHEMA multiuser_schema CASCADE;
NOTICE: drop cascades to 3 other objects

View File

@ -228,7 +228,7 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 102, 'CASCADE');
(1 row)
SELECT isolate_tenant_to_new_shard('lineitem_streaming', 102, 'CASCADE');
ERROR: table "lineitem_streaming" has already been isolated for the given value
ERROR: table lineitem_streaming has already been isolated for the given value
COMMIT;
-- test a rollback transaction block
BEGIN;
@ -239,42 +239,38 @@ SELECT isolate_tenant_to_new_shard('orders_streaming', 102, 'CASCADE');
(1 row)
SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230032
(1 row)
ERROR: cannot isolate tenant after other modifications in the same transaction.
ROLLBACK;
-- test a succesfull transaction block
BEGIN;
SELECT isolate_tenant_to_new_shard('orders_streaming', 102, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230038
(1 row)
SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230044
1230032
(1 row)
COMMIT;
SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230038
(1 row)
SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE');
ERROR: table "lineitem_streaming" has already been isolated for the given value
ERROR: table lineitem_streaming has already been isolated for the given value
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE');
ERROR: table "orders_streaming" has already been isolated for the given value
ERROR: table orders_streaming has already been isolated for the given value
-- test corner cases: hash(-1995148554) = -2147483648 and hash(-1686493264) = 2147483647
SELECT isolate_tenant_to_new_shard('lineitem_streaming', -1995148554, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230046
1230040
(1 row)
SELECT isolate_tenant_to_new_shard('orders_streaming', -1686493264, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230053
1230047
(1 row)
SELECT count(*) FROM orders_streaming WHERE o_orderkey = -1995148554;
@ -393,14 +389,14 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
lineitem_streaming | 1230046 | t | -2147483648 | -2147483648
orders_streaming | 1230048 | t | -2147483648 | -2147483648
lineitem_streaming | 1230047 | t | -2147483647 | -136164586
orders_streaming | 1230049 | t | -2147483647 | -136164586
lineitem_streaming | 1230041 | t | -136164585 | -136164585
orders_streaming | 1230044 | t | -136164585 | -136164585
lineitem_streaming | 1230042 | t | -136164584 | -85071815
orders_streaming | 1230045 | t | -136164584 | -85071815
lineitem_streaming | 1230040 | t | -2147483648 | -2147483648
orders_streaming | 1230042 | t | -2147483648 | -2147483648
lineitem_streaming | 1230041 | t | -2147483647 | -136164586
orders_streaming | 1230043 | t | -2147483647 | -136164586
lineitem_streaming | 1230035 | t | -136164585 | -136164585
orders_streaming | 1230038 | t | -136164585 | -136164585
lineitem_streaming | 1230036 | t | -136164584 | -85071815
orders_streaming | 1230039 | t | -136164584 | -85071815
lineitem_streaming | 1230011 | t | -85071814 | -85071814
orders_streaming | 1230014 | t | -85071814 | -85071814
lineitem_streaming | 1230012 | t | -85071813 | -1
@ -409,14 +405,14 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230007 | t | 0 | 108199380
lineitem_streaming | 1230005 | t | 108199381 | 108199381
orders_streaming | 1230008 | t | 108199381 | 108199381
lineitem_streaming | 1230034 | t | 108199382 | 412880111
orders_streaming | 1230037 | t | 108199382 | 412880111
lineitem_streaming | 1230035 | t | 412880112 | 412880112
orders_streaming | 1230038 | t | 412880112 | 412880112
lineitem_streaming | 1230050 | t | 412880113 | 2147483646
orders_streaming | 1230052 | t | 412880113 | 2147483646
lineitem_streaming | 1230051 | t | 2147483647 | 2147483647
orders_streaming | 1230053 | t | 2147483647 | 2147483647
lineitem_streaming | 1230028 | t | 108199382 | 412880111
orders_streaming | 1230031 | t | 108199382 | 412880111
lineitem_streaming | 1230029 | t | 412880112 | 412880112
orders_streaming | 1230032 | t | 412880112 | 412880112
lineitem_streaming | 1230044 | t | 412880113 | 2147483646
orders_streaming | 1230046 | t | 412880113 | 2147483646
lineitem_streaming | 1230045 | t | 2147483647 | 2147483647
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport, shardid;
@ -426,32 +422,32 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport
1230012 | 1 | 0 | localhost | 57637 | 100012
1230014 | 1 | 0 | localhost | 57637 | 100014
1230015 | 1 | 0 | localhost | 57637 | 100015
1230035 | 1 | 0 | localhost | 57637 | 100035
1230036 | 1 | 0 | localhost | 57637 | 100036
1230038 | 1 | 0 | localhost | 57637 | 100038
1230039 | 1 | 0 | localhost | 57637 | 100039
1230040 | 1 | 0 | localhost | 57637 | 100040
1230041 | 1 | 0 | localhost | 57637 | 100041
1230042 | 1 | 0 | localhost | 57637 | 100042
1230044 | 1 | 0 | localhost | 57637 | 100044
1230045 | 1 | 0 | localhost | 57637 | 100045
1230046 | 1 | 0 | localhost | 57637 | 100046
1230047 | 1 | 0 | localhost | 57637 | 100047
1230048 | 1 | 0 | localhost | 57637 | 100048
1230049 | 1 | 0 | localhost | 57637 | 100049
1230043 | 1 | 0 | localhost | 57637 | 100043
1230004 | 1 | 0 | localhost | 57638 | 100004
1230005 | 1 | 0 | localhost | 57638 | 100005
1230007 | 1 | 0 | localhost | 57638 | 100007
1230008 | 1 | 0 | localhost | 57638 | 100008
1230034 | 1 | 0 | localhost | 57638 | 100034
1230035 | 1 | 0 | localhost | 57638 | 100035
1230037 | 1 | 0 | localhost | 57638 | 100037
1230038 | 1 | 0 | localhost | 57638 | 100038
1230050 | 1 | 0 | localhost | 57638 | 100050
1230051 | 1 | 0 | localhost | 57638 | 100051
1230052 | 1 | 0 | localhost | 57638 | 100052
1230053 | 1 | 0 | localhost | 57638 | 100053
1230028 | 1 | 0 | localhost | 57638 | 100028
1230029 | 1 | 0 | localhost | 57638 | 100029
1230031 | 1 | 0 | localhost | 57638 | 100031
1230032 | 1 | 0 | localhost | 57638 | 100032
1230044 | 1 | 0 | localhost | 57638 | 100044
1230045 | 1 | 0 | localhost | 57638 | 100045
1230046 | 1 | 0 | localhost | 57638 | 100046
1230047 | 1 | 0 | localhost | 57638 | 100047
(24 rows)
-- test failing foreign constraints after multiple tenant isolation
\COPY lineitem_streaming FROM STDIN WITH DELIMITER '|'
ERROR: insert or update on table "lineitem_streaming_1230050" violates foreign key constraint "test_constraint_1230050"
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230052".
ERROR: insert or update on table "lineitem_streaming_1230044" violates foreign key constraint "test_constraint_1230044"
DETAIL: Key (l_orderkey)=(128) is not present in table "orders_streaming_1230046".
-- connect to the worker node with metadata
\c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation";
@ -503,28 +499,22 @@ SET citus.override_table_visibility TO false;
Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230028 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230029 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230030 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230046 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230047 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230031 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230032 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230033 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230044 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230045 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230048 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230049 | table | mx_isolation_role_ent
(20 rows)
Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent
(14 rows)
\c - postgres - :worker_1_port
SET search_path to "Tenant Isolation";
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='orders_streaming_1230045'::regclass;
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='orders_streaming_1230039'::regclass;
Column | Type | Modifiers
---------------------------------------------------------------------
o_orderkey | bigint | not null
@ -546,14 +536,14 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
lineitem_streaming | 1230046 | t | -2147483648 | -2147483648
orders_streaming | 1230048 | t | -2147483648 | -2147483648
lineitem_streaming | 1230047 | t | -2147483647 | -136164586
orders_streaming | 1230049 | t | -2147483647 | -136164586
lineitem_streaming | 1230041 | t | -136164585 | -136164585
orders_streaming | 1230044 | t | -136164585 | -136164585
lineitem_streaming | 1230042 | t | -136164584 | -85071815
orders_streaming | 1230045 | t | -136164584 | -85071815
lineitem_streaming | 1230040 | t | -2147483648 | -2147483648
orders_streaming | 1230042 | t | -2147483648 | -2147483648
lineitem_streaming | 1230041 | t | -2147483647 | -136164586
orders_streaming | 1230043 | t | -2147483647 | -136164586
lineitem_streaming | 1230035 | t | -136164585 | -136164585
orders_streaming | 1230038 | t | -136164585 | -136164585
lineitem_streaming | 1230036 | t | -136164584 | -85071815
orders_streaming | 1230039 | t | -136164584 | -85071815
lineitem_streaming | 1230011 | t | -85071814 | -85071814
orders_streaming | 1230014 | t | -85071814 | -85071814
lineitem_streaming | 1230012 | t | -85071813 | -1
@ -562,14 +552,14 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230007 | t | 0 | 108199380
lineitem_streaming | 1230005 | t | 108199381 | 108199381
orders_streaming | 1230008 | t | 108199381 | 108199381
lineitem_streaming | 1230034 | t | 108199382 | 412880111
orders_streaming | 1230037 | t | 108199382 | 412880111
lineitem_streaming | 1230035 | t | 412880112 | 412880112
orders_streaming | 1230038 | t | 412880112 | 412880112
lineitem_streaming | 1230050 | t | 412880113 | 2147483646
orders_streaming | 1230052 | t | 412880113 | 2147483646
lineitem_streaming | 1230051 | t | 2147483647 | 2147483647
orders_streaming | 1230053 | t | 2147483647 | 2147483647
lineitem_streaming | 1230028 | t | 108199382 | 412880111
orders_streaming | 1230031 | t | 108199382 | 412880111
lineitem_streaming | 1230029 | t | 412880112 | 412880112
orders_streaming | 1230032 | t | 412880112 | 412880112
lineitem_streaming | 1230044 | t | 412880113 | 2147483646
orders_streaming | 1230046 | t | 412880113 | 2147483646
lineitem_streaming | 1230045 | t | 2147483647 | 2147483647
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
-- return to master node
@ -633,23 +623,11 @@ SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1997-08-08';
(1 row)
SELECT isolate_tenant_to_new_shard('lineitem_date', '1998-05-26');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230057
(1 row)
ERROR: cannot isolate tenants when using shard replication
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-07-30');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230060
(1 row)
ERROR: cannot isolate tenants when using shard replication
SELECT isolate_tenant_to_new_shard('lineitem_date', '1998-01-15');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230063
(1 row)
ERROR: cannot isolate tenants when using shard replication
SELECT count(*) FROM lineitem_date;
count
---------------------------------------------------------------------
@ -685,7 +663,7 @@ SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1997-08-08';
SET search_path to "Tenant Isolation";
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-08-08');
ERROR: cannot isolate tenant because relation "lineitem_date" has an inactive shard placement for the shard xxxxx
ERROR: cannot split shard because relation "lineitem_date" has an inactive shard placement for the shard xxxxx
HINT: Use master_copy_shard_placement UDF to repair the inactive shard placement.
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE nodeport = :worker_1_port;
\c - mx_isolation_role_ent - :master_port
@ -709,14 +687,14 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
lineitem_streaming | 1230046 | t | -2147483648 | -2147483648
orders_streaming | 1230048 | t | -2147483648 | -2147483648
lineitem_streaming | 1230047 | t | -2147483647 | -136164586
orders_streaming | 1230049 | t | -2147483647 | -136164586
lineitem_streaming | 1230041 | t | -136164585 | -136164585
orders_streaming | 1230044 | t | -136164585 | -136164585
lineitem_streaming | 1230042 | t | -136164584 | -85071815
orders_streaming | 1230045 | t | -136164584 | -85071815
lineitem_streaming | 1230040 | t | -2147483648 | -2147483648
orders_streaming | 1230042 | t | -2147483648 | -2147483648
lineitem_streaming | 1230041 | t | -2147483647 | -136164586
orders_streaming | 1230043 | t | -2147483647 | -136164586
lineitem_streaming | 1230035 | t | -136164585 | -136164585
orders_streaming | 1230038 | t | -136164585 | -136164585
lineitem_streaming | 1230036 | t | -136164584 | -85071815
orders_streaming | 1230039 | t | -136164584 | -85071815
lineitem_streaming | 1230011 | t | -85071814 | -85071814
orders_streaming | 1230014 | t | -85071814 | -85071814
lineitem_streaming | 1230012 | t | -85071813 | -1
@ -725,14 +703,14 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230007 | t | 0 | 108199380
lineitem_streaming | 1230005 | t | 108199381 | 108199381
orders_streaming | 1230008 | t | 108199381 | 108199381
lineitem_streaming | 1230034 | t | 108199382 | 412880111
orders_streaming | 1230037 | t | 108199382 | 412880111
lineitem_streaming | 1230035 | t | 412880112 | 412880112
orders_streaming | 1230038 | t | 412880112 | 412880112
lineitem_streaming | 1230050 | t | 412880113 | 2147483646
orders_streaming | 1230052 | t | 412880113 | 2147483646
lineitem_streaming | 1230051 | t | 2147483647 | 2147483647
orders_streaming | 1230053 | t | 2147483647 | 2147483647
lineitem_streaming | 1230028 | t | 108199382 | 412880111
orders_streaming | 1230031 | t | 108199382 | 412880111
lineitem_streaming | 1230029 | t | 412880112 | 412880112
orders_streaming | 1230032 | t | 412880112 | 412880112
lineitem_streaming | 1230044 | t | 412880113 | 2147483646
orders_streaming | 1230046 | t | 412880113 | 2147483646
lineitem_streaming | 1230045 | t | 2147483647 | 2147483647
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
-- test failure scenarios with triggers on workers
@ -758,24 +736,18 @@ SET citus.override_table_visibility TO false;
Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230028 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230029 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230030 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230046 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230047 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230031 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230032 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230033 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230044 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230045 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230048 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230049 | table | mx_isolation_role_ent
(20 rows)
Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent
(14 rows)
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
@ -793,24 +765,18 @@ SET citus.override_table_visibility TO false;
Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230028 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230029 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230030 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230046 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230047 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230031 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230032 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230033 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230044 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230045 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230048 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230049 | table | mx_isolation_role_ent
(20 rows)
Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent
(14 rows)
DROP EVENT TRIGGER abort_ddl;
-- create a trigger for drops
@ -830,6 +796,12 @@ CREATE EVENT TRIGGER abort_drop ON sql_drop
SET search_path to "Tenant Isolation";
\set VERBOSITY terse
SELECT isolate_tenant_to_new_shard('orders_streaming', 104, 'CASCADE');
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
WARNING: command DROP TABLE is disabled
ERROR: command DROP TABLE is disabled
\set VERBOSITY default
-- check if metadata is changed
@ -838,14 +810,14 @@ SELECT * FROM pg_dist_shard
ORDER BY shardminvalue::BIGINT, logicalrelid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
lineitem_streaming | 1230046 | t | -2147483648 | -2147483648
orders_streaming | 1230048 | t | -2147483648 | -2147483648
lineitem_streaming | 1230047 | t | -2147483647 | -136164586
orders_streaming | 1230049 | t | -2147483647 | -136164586
lineitem_streaming | 1230041 | t | -136164585 | -136164585
orders_streaming | 1230044 | t | -136164585 | -136164585
lineitem_streaming | 1230042 | t | -136164584 | -85071815
orders_streaming | 1230045 | t | -136164584 | -85071815
lineitem_streaming | 1230040 | t | -2147483648 | -2147483648
orders_streaming | 1230042 | t | -2147483648 | -2147483648
lineitem_streaming | 1230041 | t | -2147483647 | -136164586
orders_streaming | 1230043 | t | -2147483647 | -136164586
lineitem_streaming | 1230035 | t | -136164585 | -136164585
orders_streaming | 1230038 | t | -136164585 | -136164585
lineitem_streaming | 1230036 | t | -136164584 | -85071815
orders_streaming | 1230039 | t | -136164584 | -85071815
lineitem_streaming | 1230011 | t | -85071814 | -85071814
orders_streaming | 1230014 | t | -85071814 | -85071814
lineitem_streaming | 1230012 | t | -85071813 | -1
@ -854,14 +826,14 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230007 | t | 0 | 108199380
lineitem_streaming | 1230005 | t | 108199381 | 108199381
orders_streaming | 1230008 | t | 108199381 | 108199381
lineitem_streaming | 1230034 | t | 108199382 | 412880111
orders_streaming | 1230037 | t | 108199382 | 412880111
lineitem_streaming | 1230035 | t | 412880112 | 412880112
orders_streaming | 1230038 | t | 412880112 | 412880112
lineitem_streaming | 1230050 | t | 412880113 | 2147483646
orders_streaming | 1230052 | t | 412880113 | 2147483646
lineitem_streaming | 1230051 | t | 2147483647 | 2147483647
orders_streaming | 1230053 | t | 2147483647 | 2147483647
lineitem_streaming | 1230028 | t | 108199382 | 412880111
orders_streaming | 1230031 | t | 108199382 | 412880111
lineitem_streaming | 1230029 | t | 412880112 | 412880112
orders_streaming | 1230032 | t | 412880112 | 412880112
lineitem_streaming | 1230044 | t | 412880113 | 2147483646
orders_streaming | 1230046 | t | 412880113 | 2147483646
lineitem_streaming | 1230045 | t | 2147483647 | 2147483647
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
\c - - - :worker_1_port
@ -875,30 +847,24 @@ SET citus.override_table_visibility TO false;
Tenant Isolation | lineitem_streaming | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230011 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230012 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230028 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230029 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230030 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230035 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230036 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230040 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230041 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230046 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230047 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230071 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230072 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230073 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230056 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230057 | table | mx_isolation_role_ent
Tenant Isolation | lineitem_streaming_1230058 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230014 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230015 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230031 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230032 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230033 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230044 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230045 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230048 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230049 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230074 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230075 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230076 | table | mx_isolation_role_ent
(26 rows)
Tenant Isolation | orders_streaming_1230038 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230039 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230042 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230043 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230059 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230060 | table | mx_isolation_role_ent
Tenant Isolation | orders_streaming_1230061 | table | mx_isolation_role_ent
(20 rows)
\c - postgres - :worker_1_port
DROP EVENT TRIGGER abort_drop;
@ -1049,11 +1015,7 @@ INSERT INTO composite_table VALUES ('(1, 2)'::test_composite_type);
INSERT INTO composite_table VALUES ('(1, 3)'::test_composite_type);
INSERT INTO composite_table VALUES ('(1, 4)'::test_composite_type);
SELECT isolate_tenant_to_new_shard('composite_table', '(1, 3)');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230082
(1 row)
ERROR: cannot isolate tenants when using shard replication
SELECT count(*) FROM composite_table WHERE composite_key = '(1, 2)'::test_composite_type;
count
---------------------------------------------------------------------
@ -1113,7 +1075,7 @@ INSERT INTO test_colocated_table_3 SELECT i, i FROM generate_series (0, 100) i;
SELECT isolate_tenant_to_new_shard('test_colocated_table_2', 1, 'CASCADE');
isolate_tenant_to_new_shard
---------------------------------------------------------------------
1230113
1230095
(1 row)
SELECT count(*) FROM test_colocated_table_2;
@ -1132,47 +1094,47 @@ ORDER BY 1, 2;
relname | Constraint | Definition
---------------------------------------------------------------------
test_colocated_table_1 | test_colocated_table_1_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_1_1230087 | test_colocated_table_1_id_fkey_1230087 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230087(id)
test_colocated_table_1_1230089 | test_colocated_table_1_id_fkey_1230089 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230089(id)
test_colocated_table_1_1230069 | test_colocated_table_1_id_fkey_1230069 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230069(id)
test_colocated_table_1_1230071 | test_colocated_table_1_id_fkey_1230071 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230071(id)
test_colocated_table_1_1230073 | test_colocated_table_1_id_fkey_1230073 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230073(id)
test_colocated_table_1_1230091 | test_colocated_table_1_id_fkey_1230091 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230091(id)
test_colocated_table_1_1230109 | test_colocated_table_1_id_fkey_1230109 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230109(id)
test_colocated_table_1_1230110 | test_colocated_table_1_id_fkey_1230110 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230110(id)
test_colocated_table_1_1230111 | test_colocated_table_1_id_fkey_1230111 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230111(id)
test_colocated_table_1_1230092 | test_colocated_table_1_id_fkey_1230092 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230092(id)
test_colocated_table_1_1230093 | test_colocated_table_1_id_fkey_1230093 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230093(id)
test_colocated_table_2 | test_colocated_table_2_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_2 | test_colocated_table_2_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id)
test_colocated_table_2_1230095 | test_colocated_table_2_id_fkey_1230095 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230087(id)
test_colocated_table_2_1230095 | test_colocated_table_2_value_1_fkey_1230095 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230097 | test_colocated_table_2_id_fkey_1230097 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230089(id)
test_colocated_table_2_1230097 | test_colocated_table_2_value_1_fkey_1230097 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230099 | test_colocated_table_2_id_fkey_1230099 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230091(id)
test_colocated_table_2_1230099 | test_colocated_table_2_value_1_fkey_1230099 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230112 | test_colocated_table_2_id_fkey_1230112 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230109(id)
test_colocated_table_2_1230112 | test_colocated_table_2_value_1_fkey_1230112 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230113 | test_colocated_table_2_id_fkey_1230113 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230110(id)
test_colocated_table_2_1230113 | test_colocated_table_2_value_1_fkey_1230113 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230114 | test_colocated_table_2_id_fkey_1230114 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230111(id)
test_colocated_table_2_1230114 | test_colocated_table_2_value_1_fkey_1230114 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_2_1230077 | test_colocated_table_2_id_fkey_1230077 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230069(id)
test_colocated_table_2_1230077 | test_colocated_table_2_value_1_fkey_1230077 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_2_1230079 | test_colocated_table_2_id_fkey_1230079 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230071(id)
test_colocated_table_2_1230079 | test_colocated_table_2_value_1_fkey_1230079 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_2_1230081 | test_colocated_table_2_id_fkey_1230081 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230073(id)
test_colocated_table_2_1230081 | test_colocated_table_2_value_1_fkey_1230081 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_2_1230094 | test_colocated_table_2_id_fkey_1230094 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230091(id)
test_colocated_table_2_1230094 | test_colocated_table_2_value_1_fkey_1230094 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_2_1230095 | test_colocated_table_2_id_fkey_1230095 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230092(id)
test_colocated_table_2_1230095 | test_colocated_table_2_value_1_fkey_1230095 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_2_1230096 | test_colocated_table_2_id_fkey_1230096 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230093(id)
test_colocated_table_2_1230096 | test_colocated_table_2_value_1_fkey_1230096 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3 | test_colocated_table_3_id_fkey | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1(id)
test_colocated_table_3 | test_colocated_table_3_id_fkey1 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2(id)
test_colocated_table_3 | test_colocated_table_3_value_1_fkey | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey(id)
test_colocated_table_3_1230103 | test_colocated_table_3_id_fkey1_1230103 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230095(id)
test_colocated_table_3_1230103 | test_colocated_table_3_id_fkey_1230103 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230087(id)
test_colocated_table_3_1230103 | test_colocated_table_3_value_1_fkey_1230103 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey1_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230097(id)
test_colocated_table_3_1230105 | test_colocated_table_3_id_fkey_1230105 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230089(id)
test_colocated_table_3_1230105 | test_colocated_table_3_value_1_fkey_1230105 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230107 | test_colocated_table_3_id_fkey1_1230107 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230099(id)
test_colocated_table_3_1230107 | test_colocated_table_3_id_fkey_1230107 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230091(id)
test_colocated_table_3_1230107 | test_colocated_table_3_value_1_fkey_1230107 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230115 | test_colocated_table_3_id_fkey1_1230115 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230112(id)
test_colocated_table_3_1230115 | test_colocated_table_3_id_fkey_1230115 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230109(id)
test_colocated_table_3_1230115 | test_colocated_table_3_value_1_fkey_1230115 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230116 | test_colocated_table_3_id_fkey1_1230116 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230113(id)
test_colocated_table_3_1230116 | test_colocated_table_3_id_fkey_1230116 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230110(id)
test_colocated_table_3_1230116 | test_colocated_table_3_value_1_fkey_1230116 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230117 | test_colocated_table_3_id_fkey1_1230117 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230114(id)
test_colocated_table_3_1230117 | test_colocated_table_3_id_fkey_1230117 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230111(id)
test_colocated_table_3_1230117 | test_colocated_table_3_value_1_fkey_1230117 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230084(id)
test_colocated_table_3_1230085 | test_colocated_table_3_id_fkey1_1230085 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230077(id)
test_colocated_table_3_1230085 | test_colocated_table_3_id_fkey_1230085 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230069(id)
test_colocated_table_3_1230085 | test_colocated_table_3_value_1_fkey_1230085 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3_1230087 | test_colocated_table_3_id_fkey1_1230087 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230079(id)
test_colocated_table_3_1230087 | test_colocated_table_3_id_fkey_1230087 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230071(id)
test_colocated_table_3_1230087 | test_colocated_table_3_value_1_fkey_1230087 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3_1230089 | test_colocated_table_3_id_fkey1_1230089 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230081(id)
test_colocated_table_3_1230089 | test_colocated_table_3_id_fkey_1230089 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230073(id)
test_colocated_table_3_1230089 | test_colocated_table_3_value_1_fkey_1230089 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3_1230097 | test_colocated_table_3_id_fkey1_1230097 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230094(id)
test_colocated_table_3_1230097 | test_colocated_table_3_id_fkey_1230097 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230091(id)
test_colocated_table_3_1230097 | test_colocated_table_3_value_1_fkey_1230097 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3_1230098 | test_colocated_table_3_id_fkey1_1230098 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230095(id)
test_colocated_table_3_1230098 | test_colocated_table_3_id_fkey_1230098 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230092(id)
test_colocated_table_3_1230098 | test_colocated_table_3_value_1_fkey_1230098 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
test_colocated_table_3_1230099 | test_colocated_table_3_id_fkey1_1230099 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_2_1230096(id)
test_colocated_table_3_1230099 | test_colocated_table_3_id_fkey_1230099 | FOREIGN KEY (id) REFERENCES "Tenant Isolation".test_colocated_table_1_1230093(id)
test_colocated_table_3_1230099 | test_colocated_table_3_value_1_fkey_1230099 | FOREIGN KEY (value_1) REFERENCES "Tenant Isolation".test_reference_table_fkey_1230066(id)
(42 rows)
\c - mx_isolation_role_ent - :master_port
@ -1190,6 +1152,65 @@ SELECT create_reference_table('ref_table');
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
-- partitioning tests
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create a regular partition
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
-- create a columnar partition
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01') USING columnar;
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('partitioning_test', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$"Tenant Isolation".partitioning_test_2009$$)
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$"Tenant Isolation".partitioning_test_2010$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'partitioning_test'::regclass;
count
---------------------------------------------------------------------
4
(1 row)
SELECT count(*) FROM partitioning_test;
count
---------------------------------------------------------------------
4
(1 row)
-- isolate a value into its own shard
SELECT 1 FROM isolate_tenant_to_new_shard('partitioning_test', 2, 'CASCADE');
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'partitioning_test'::regclass;
count
---------------------------------------------------------------------
6
(1 row)
SELECT count(*) FROM partitioning_test;
count
---------------------------------------------------------------------
4
(1 row)
SET citus.replicate_reference_tables_on_activate TO off;
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupId=>0);

View File

@ -12,6 +12,7 @@ SET SEARCH_PATH = tenant_isolation;
SET citus.shard_count TO 2;
SET citus.next_shard_id TO 300;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SELECT pg_backend_pid() as pid \gset
SELECT citus.mitmproxy('conn.allow()');
@ -41,15 +42,15 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- cancellation on colocated table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").after(1).cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_2").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- failure on colocated table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_2").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- cancellation on colocated table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_2").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(302").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- failure on colocated table constraints
@ -66,15 +67,15 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- cancellation on table creation
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").after(1).cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE tenant_isolation.table_1").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- failure on table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_1").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").kill()');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- cancellation on table population
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO tenant_isolation.table_1").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="worker_split_copy\(300").cancel(' || :pid || ')');
SELECT isolate_tenant_to_new_shard('table_1', 5, 'CASCADE');
-- failure on table constraints

View File

@ -260,6 +260,7 @@ DROP TABLE shard_count_table, shard_count_table_2;
-- ensure there is no colocation group with 9 shards
SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 9;
SET citus.shard_count TO 9;
SET citus.shard_replication_factor TO 1;
CREATE TABLE shard_split_table (a int, b int);
SELECT create_distributed_table ('shard_split_table', 'a');

View File

@ -126,6 +126,7 @@ CREATE SCHEMA multiuser_schema;
CREATE TABLE multiuser_schema.hash_table(a int, b int);
CREATE TABLE multiuser_schema.reference_table(a int, b int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('multiuser_schema.hash_table', 'a', colocate_with => 'none');

View File

@ -169,9 +169,9 @@ ROLLBACK;
-- test a succesfull transaction block
BEGIN;
SELECT isolate_tenant_to_new_shard('orders_streaming', 102, 'CASCADE');
SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE');
COMMIT;
SELECT isolate_tenant_to_new_shard('orders_streaming', 103, 'CASCADE');
SELECT isolate_tenant_to_new_shard('lineitem_streaming', 100, 'CASCADE');
SELECT isolate_tenant_to_new_shard('orders_streaming', 101, 'CASCADE');
@ -254,7 +254,7 @@ SET citus.override_table_visibility TO false;
\c - postgres - :worker_1_port
SET search_path to "Tenant Isolation";
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='orders_streaming_1230045'::regclass;
SELECT "Column", "Type", "Modifiers" FROM public.table_desc WHERE relid='orders_streaming_1230039'::regclass;
\c - mx_isolation_role_ent - :worker_1_port
SET search_path to "Tenant Isolation";
@ -561,6 +561,36 @@ SELECT create_reference_table('ref_table');
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
-- partitioning tests
-- create partitioned table
CREATE TABLE partitioning_test(id int, time date) PARTITION BY RANGE (time);
-- create a regular partition
CREATE TABLE partitioning_test_2009 PARTITION OF partitioning_test FOR VALUES FROM ('2009-01-01') TO ('2010-01-01');
-- create a columnar partition
CREATE TABLE partitioning_test_2010 PARTITION OF partitioning_test FOR VALUES FROM ('2010-01-01') TO ('2011-01-01') USING columnar;
-- load some data and distribute tables
INSERT INTO partitioning_test VALUES (1, '2009-06-06');
INSERT INTO partitioning_test VALUES (2, '2010-07-07');
INSERT INTO partitioning_test_2009 VALUES (3, '2009-09-09');
INSERT INTO partitioning_test_2010 VALUES (4, '2010-03-03');
-- distribute partitioned table
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('partitioning_test', 'id');
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'partitioning_test'::regclass;
SELECT count(*) FROM partitioning_test;
-- isolate a value into its own shard
SELECT 1 FROM isolate_tenant_to_new_shard('partitioning_test', 2, 'CASCADE');
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'partitioning_test'::regclass;
SELECT count(*) FROM partitioning_test;
SET citus.replicate_reference_tables_on_activate TO off;
SET client_min_messages TO WARNING;