Remove shardstate leftovers (#6627)

Remove ShardState enum and associated logic.

Co-authored-by: Marco Slot <marco.slot@gmail.com>
Co-authored-by: Ahmet Gedemenli <afgedemenli@gmail.com>
pull/6630/head
Marco Slot 2023-01-19 11:43:58 +03:00 committed by GitHub
parent 44c387b978
commit 64e3fee89b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 34 additions and 341 deletions

View File

@ -1001,10 +1001,6 @@ ActiveShardPlacementOnGroup(int32 groupId, uint64 shardId)
{
return NULL;
}
if (placement->shardState != SHARD_STATE_ACTIVE)
{
return NULL;
}
return placement;
}

View File

@ -1328,25 +1328,18 @@ ShardLength(uint64 shardId)
* NodeGroupHasShardPlacements returns whether any active shards are placed on the group
*/
bool
NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
NodeGroupHasShardPlacements(int32 groupId)
{
const int scanKeyCount = (onlyConsiderActivePlacements ? 2 : 1);
const int scanKeyCount = 1;
const bool indexOK = false;
ScanKeyData scanKey[2];
ScanKeyData scanKey[1];
Relation pgPlacement = table_open(DistPlacementRelationId(),
AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
if (onlyConsiderActivePlacements)
{
ScanKeyInit(&scanKey[1], Anum_pg_dist_placement_shardstate,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(SHARD_STATE_ACTIVE));
}
SysScanDesc scanDescriptor = systable_beginscan(pgPlacement,
DistPlacementGroupidIndexId(),
@ -1381,8 +1374,7 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
shardPlacement->nodePort)));
}
return shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive;
return workerNode->isActive;
}
@ -1671,8 +1663,6 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
datumArray[Anum_pg_dist_placement_shardid - 1]);
shardPlacement->shardLength = DatumGetInt64(
datumArray[Anum_pg_dist_placement_shardlength - 1]);
shardPlacement->shardState = DatumGetUInt32(
datumArray[Anum_pg_dist_placement_shardstate - 1]);
shardPlacement->groupId = DatumGetInt32(
datumArray[Anum_pg_dist_placement_groupid - 1]);
@ -1754,7 +1744,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
}
values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementId);
values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_placement_shardstate - 1] = CharGetDatum(SHARD_STATE_ACTIVE);
values[Anum_pg_dist_placement_shardstate - 1] = Int32GetDatum(1);
values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);

View File

@ -2066,7 +2066,7 @@ AddNodeMetadata(char *nodeName, int32 nodePort,
*/
if (nodeMetadata->groupId != COORDINATOR_GROUP_ID && CoordinatorAddedAsWorkerNode() &&
ActivePrimaryNonCoordinatorNodeCount() == 0 &&
NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID, true))
NodeGroupHasShardPlacements(COORDINATOR_GROUP_ID))
{
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();

View File

@ -423,7 +423,6 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
placement->shardId = groupPlacement->shardId;
placement->shardLength = groupPlacement->shardLength;
placement->shardState = groupPlacement->shardState;
placement->nodeId = worker->nodeId;
placement->nodeName = pstrdup(worker->workerName);
placement->nodePort = worker->workerPort;

View File

@ -188,7 +188,6 @@ ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShar
{
Oid relationId = sourceShard->relationId;
ListCell *colocatedTableCell = NULL;
ListCell *colocatedShardCell = NULL;
/* checks for table ownership and foreign tables */
List *colocatedTableList = ColocatedTableList(relationId);
@ -213,32 +212,6 @@ ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShar
"is not supported.")));
}
}
/* check shards with inactive placements */
List *colocatedShardList = ColocatedShardIntervalList(sourceShard);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(colocatedShardCell);
uint64 shardId = shardInterval->shardId;
ListCell *shardPlacementCell = NULL;
List *shardPlacementList = ShardPlacementListSortedByWorker(shardId);
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
if (placement->shardState != SHARD_STATE_ACTIVE)
{
char *relationName = get_rel_name(shardInterval->relationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot %s %s because relation "
"\"%s\" has an inactive shard placement "
"for the shard %lu",
SplitOperationName[splitOperation],
SplitTargetName[splitOperation],
relationName, shardId)));
}
}
}
}

View File

@ -1663,15 +1663,8 @@ EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, int32 sourceNo
{
List *shardPlacementList = ShardPlacementList(shardId);
ShardPlacement *sourcePlacement = SearchShardPlacementInListOrError(
shardPlacementList,
sourceNodeName,
sourceNodePort);
if (sourcePlacement->shardState != SHARD_STATE_ACTIVE)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("source placement must be in active state")));
}
/* error if the source shard placement does not exist */
SearchShardPlacementInListOrError(shardPlacementList, sourceNodeName, sourceNodePort);
ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,

View File

@ -85,18 +85,6 @@ run_try_drop_marked_resources(PG_FUNCTION_ARGS)
}
/*
* IsActiveTestShardPlacement checks if the dummy shard placement created in tests
* are labelled as active. Note that this function does not check if the worker is also
* active, because the dummy test workers are not registered as actual workers.
*/
static inline bool
IsActiveTestShardPlacement(ShardPlacement *shardPlacement)
{
return shardPlacement->shardState == SHARD_STATE_ACTIVE;
}
/*
* shard_placement_rebalance_array returns a list of operations which can make a
* cluster consisting of given shard placements and worker nodes balanced with
@ -150,9 +138,7 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS)
if (shardPlacementTestInfo->nextColocationGroup)
{
shardPlacementList = SortList(shardPlacementList, CompareShardPlacements);
shardPlacementListList = lappend(shardPlacementListList,
FilterShardPlacementList(shardPlacementList,
IsActiveTestShardPlacement));
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
shardPlacementList = NIL;
}
shardPlacementList = lappend(shardPlacementList,
@ -304,8 +290,7 @@ shard_placement_replication_array(PG_FUNCTION_ARGS)
shardPlacementTestInfo->placement);
}
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveTestShardPlacement);
List *activeShardPlacementList = shardPlacementList;
/* sort the lists to make the function more deterministic */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
@ -358,8 +343,6 @@ JsonArrayToShardPlacementTestInfoList(ArrayType *shardPlacementJsonArrayObject)
placementJson, FIELD_NAME_SHARD_ID, placementIndex + 1);
uint64 shardLength = JsonFieldValueUInt64Default(
placementJson, FIELD_NAME_SHARD_LENGTH, 1);
int shardState = JsonFieldValueUInt32Default(
placementJson, FIELD_NAME_SHARD_STATE, SHARD_STATE_ACTIVE);
char *nodeName = JsonFieldValueString(placementJson, FIELD_NAME_NODE_NAME);
if (nodeName == NULL)
{
@ -379,7 +362,6 @@ JsonArrayToShardPlacementTestInfoList(ArrayType *shardPlacementJsonArrayObject)
placementTestInfo->placement = palloc0(sizeof(ShardPlacement));
placementTestInfo->placement->shardId = shardId;
placementTestInfo->placement->shardLength = shardLength;
placementTestInfo->placement->shardState = shardState;
placementTestInfo->placement->nodeName = pstrdup(nodeName);
placementTestInfo->placement->nodePort = nodePort;
placementTestInfo->placement->placementId = placementId;

View File

@ -221,7 +221,6 @@ CopyNodeShardPlacement(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(placementId);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardLength);
COPY_SCALAR_FIELD(shardState);
COPY_SCALAR_FIELD(groupId);
COPY_STRING_FIELD(nodeName);
COPY_SCALAR_FIELD(nodePort);
@ -240,7 +239,6 @@ CopyNodeGroupShardPlacement(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(placementId);
COPY_SCALAR_FIELD(shardId);
COPY_SCALAR_FIELD(shardLength);
COPY_SCALAR_FIELD(shardState);
COPY_SCALAR_FIELD(groupId);
}

View File

@ -425,7 +425,6 @@ OutShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, ShardState);
WRITE_INT_FIELD(groupId);
WRITE_STRING_FIELD(nodeName);
WRITE_UINT_FIELD(nodePort);
@ -446,7 +445,6 @@ OutGroupShardPlacement(OUTFUNC_ARGS)
WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, ShardState);
WRITE_INT_FIELD(groupId);
}

View File

@ -399,16 +399,6 @@ ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
leftShardId, leftRelationName,
rightShardId, rightRelationName)));
}
/* we also don't allow colocated shards to be in different shard states */
if (leftPlacement->shardState != rightPlacement->shardState)
{
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
leftRelationName, rightRelationName),
errdetail("%s and %s have shard placements in "
"different shard states.",
leftRelationName, rightRelationName)));
}
}
}
}

View File

@ -77,7 +77,6 @@ typedef struct GroupShardPlacement
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;
ShardState shardState;
int32 groupId;
} GroupShardPlacement;
@ -92,7 +91,6 @@ typedef struct ShardPlacement
uint64 placementId;
uint64 shardId;
uint64 shardLength;
ShardState shardState;
int32 groupId;
/* the rest of the fields aren't from pg_dist_placement */
@ -286,8 +284,7 @@ extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId,
bool onlyConsiderActivePlacements);
extern bool NodeGroupHasShardPlacements(int32 groupId);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(

View File

@ -25,18 +25,6 @@
#define INVALID_SHARD_ID 0
#define INVALID_PLACEMENT_ID 0
/*
* ShardState represents last known states of shards on a given node.
*
* The numbers assigned per state used for historical reason and should
* not be changed since they correspond to shardstate in pg_dist_placement.
*/
typedef enum
{
SHARD_STATE_INVALID_FIRST = 0,
SHARD_STATE_ACTIVE = 1,
} ShardState;
/* Function declarations to extend names in DDL commands */
extern void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId);

View File

@ -252,6 +252,9 @@ s/NOTICE: issuing WITH placement_data\(shardid, shardlength, groupid, placement
s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g
s/issuing SELECT pg_cancel_backend\([0-9]+::integer\)/issuing SELECT pg_cancel_backend(xxxxx::integer)/g
# shard_rebalancer output for flaky nodeIds
s/issuing SELECT citus_copy_shard_placement\(43[0-9]+,[0-9]+,[0-9]+,'block_writes'\)/issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')/g
# node id in run_command_on_all_nodes warning
s/Error on node with node id [0-9]+/Error on node with node id xxxxx/g

View File

@ -26,7 +26,7 @@ test_file_name = args['test_name']
use_base_schedule = args['use_base_schedule']
use_whole_schedule_line = args['use_whole_schedule_line']
test_files_to_skip = ['multi_cluster_management', 'multi_extension', 'multi_test_helpers']
test_files_to_skip = ['multi_cluster_management', 'multi_extension', 'multi_test_helpers', 'multi_insert_select']
test_files_to_run_without_schedule = ['single_node_enterprise']
if not (test_file_name or test_file_path):
@ -81,6 +81,8 @@ elif "isolation" in test_schedule:
test_schedule = 'base_isolation_schedule'
elif "failure" in test_schedule:
test_schedule = 'failure_base_schedule'
elif "enterprise" in test_schedule:
test_schedule = 'enterprise_minimal_schedule'
elif "split" in test_schedule:
test_schedule = 'minimal_schedule'
elif "mx" in test_schedule:

View File

@ -0,0 +1,5 @@
test: single_node_enterprise
test: multi_test_helpers multi_test_helpers_superuser
test: multi_cluster_management
test: multi_test_catalog_views
test: multi_data_types

View File

@ -114,13 +114,6 @@ SELECT load_shard_placement_array(540001, false);
{localhost:xxxxx,localhost:xxxxx}
(1 row)
-- only one of which is active
SELECT load_shard_placement_array(540001, true);
load_shard_placement_array
---------------------------------------------------------------------
{localhost:xxxxx}
(1 row)
-- should see error for non-existent shard
SELECT load_shard_placement_array(540001, false);
load_shard_placement_array

View File

@ -2025,47 +2025,6 @@ DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned
DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- if a single shard of the SELECT is unhealty, the query should fail
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Insert query cannot be executed on all placements for shard xxxxx
-- this should also fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Insert query cannot be executed on all placements for shard xxxxx
-- but this should work given that it hits different shard
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 6;
DEBUG: Skipping target shard interval 13300000 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300006 raw_events_second WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) 6) AND (raw_events_second.user_id IS NOT NULL))
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- mark the unhealthy placement as healthy again for the next tests
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
-- now that we should show that it works if one of the target shard interval is not healthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300000 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should work
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE (raw_events_second.user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300005 raw_events_second WHERE (raw_events_second.user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300006 raw_events_second WHERE (raw_events_second.user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300007 raw_events_second WHERE (raw_events_second.user_id IS NOT NULL)
SET client_min_messages TO INFO;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should also work
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT raw_events_second.user_id, raw_events_second."time", raw_events_second.value_1, raw_events_second.value_2, raw_events_second.value_3, raw_events_second.value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE ((raw_events_second.user_id OPERATOR(pg_catalog.=) 5) AND (raw_events_second.user_id IS NOT NULL))
DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- now do some tests with varchars
INSERT INTO insert_select_varchar_test VALUES ('test_1', 10);
INSERT INTO insert_select_varchar_test VALUES ('test_2', 30);

View File

@ -2025,47 +2025,6 @@ DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned
DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- if a single shard of the SELECT is unhealty, the query should fail
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Insert query cannot be executed on all placements for shard xxxxx
-- this should also fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Insert query cannot be executed on all placements for shard xxxxx
-- but this should work given that it hits different shard
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 6;
DEBUG: Skipping target shard interval 13300000 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300006 raw_events_second WHERE ((user_id OPERATOR(pg_catalog.=) 6) AND (user_id IS NOT NULL))
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- mark the unhealthy placement as healthy again for the next tests
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
-- now that we should show that it works if one of the target shard interval is not healthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300000 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should work
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE (user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300005 raw_events_second WHERE (user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300006 raw_events_second WHERE (user_id IS NOT NULL)
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300007 raw_events_second WHERE (user_id IS NOT NULL)
SET client_min_messages TO INFO;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should also work
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id, "time", value_1, value_2, value_3, value_4) SELECT user_id, "time", value_1, value_2, value_3, value_4 FROM public.raw_events_second_13300004 raw_events_second WHERE ((user_id OPERATOR(pg_catalog.=) 5) AND (user_id IS NOT NULL))
DEBUG: Skipping target shard interval 13300001 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300002 since SELECT query for it pruned away
DEBUG: Skipping target shard interval 13300003 since SELECT query for it pruned away
SET client_min_messages TO INFO;
-- now do some tests with varchars
INSERT INTO insert_select_varchar_test VALUES ('test_1', 10);
INSERT INTO insert_select_varchar_test VALUES ('test_2', 30);

View File

@ -577,23 +577,6 @@ UPDATE users_test_table
SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
-- Test with inactive shard-placement
-- manually set shardstate of one placement of users_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
-- manually set shardstate of one placement of events_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: cannot find a worker that has active placements for all shards in the query
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
-- Subquery must return single value to use it with comparison operators
UPDATE users_test_table as utt
SET value_1 = 3
@ -901,6 +884,8 @@ SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
(1 row)
DROP TABLE users_test_table;
DROP TABLE events_test_table_local;
DROP TABLE events_test_table;
DROP TABLE events_test_table_2;
DROP TABLE events_reference_copy_table;
DROP TABLE users_reference_copy_table;

View File

@ -415,7 +415,7 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport, shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid BETWEEN 1230000 AND 1399999 ORDER BY nodeport, shardid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------------------------------------------------------------------
1230011 | 1 | 0 | localhost | 57637 | 100011
@ -665,13 +665,6 @@ SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1997-08-08';
1
(1 row)
-- test with invalid shard placements
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-08-08', shard_transfer_mode => 'block_writes');
ERROR: cannot isolate tenant because relation "lineitem_date" has an inactive shard placement for the shard xxxxx
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE nodeport = :worker_1_port;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
DROP TABLE lineitem_date;

View File

@ -427,7 +427,7 @@ SELECT * FROM pg_dist_shard
orders_streaming | 1230047 | t | 2147483647 | 2147483647
(24 rows)
SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport, shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid BETWEEN 1230000 AND 1399999 ORDER BY nodeport, shardid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------------------------------------------------------------------
1230011 | 1 | 0 | localhost | 57637 | 100011
@ -709,13 +709,6 @@ SELECT public.wait_for_resource_cleanup();
(1 row)
-- test with invalid shard placements
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-08-08', shard_transfer_mode => 'force_logical');
ERROR: cannot isolate tenant because relation "lineitem_date" has an inactive shard placement for the shard xxxxx
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE nodeport = :worker_1_port;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
DROP TABLE lineitem_date;

View File

@ -221,7 +221,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433101,16,18,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -244,7 +244,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433102,18,16,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -267,7 +267,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433103,16,18,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -290,7 +290,7 @@ NOTICE: issuing SET LOCAL citus.shard_count TO '4';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET LOCAL citus.shard_replication_factor TO '2';
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_copy_shard_placement(433104,18,16,'block_writes')
NOTICE: issuing SELECT citus_copy_shard_placement(43xxxx,xx,xx,'block_writes')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing COMMIT
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
@ -459,23 +459,6 @@ SELECT unnest(shard_placement_replication_array(
{"updatetype":2,"shardid":2,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(2 rows)
-- Check that shard_placement_replication_array returns a shard copy operation
-- for each of the inactive shards.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "shardstate":3, "nodename":"hostname1"}',
'{"shardid":1, "shardstate":3, "nodename":"hostname2"}',
'{"shardid":2, "nodename":"hostname2"}']::json[],
2
));
unnest
---------------------------------------------------------------------
{"updatetype":2,"shardid":1,"sourcename":"hostname1","sourceport":5432,"targetname":"hostname2","targetport":5432}
{"updatetype":2,"shardid":2,"sourcename":"hostname2","sourceport":5432,"targetname":"hostname1","targetport":5432}
(2 rows)
-- Check that shard_placement_replication_array errors out if all placements of
-- a shard are placed on inactive nodes.
SELECT unnest(shard_placement_replication_array(

View File

@ -102,9 +102,6 @@ SELECT load_shard_interval_array(540005, 0);
-- should see two placements
SELECT load_shard_placement_array(540001, false);
-- only one of which is active
SELECT load_shard_placement_array(540001, true);
-- should see error for non-existent shard
SELECT load_shard_placement_array(540001, false);

View File

@ -1560,42 +1560,6 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
SET client_min_messages TO INFO;
-- if a single shard of the SELECT is unhealty, the query should fail
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
-- this should also fail
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
-- but this should work given that it hits different shard
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 6;
SET client_min_messages TO INFO;
-- mark the unhealthy placement as healthy again for the next tests
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 13300004 AND nodeport = :worker_1_port;
-- now that we should show that it works if one of the target shard interval is not healthy
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 13300000 AND nodeport = :worker_1_port;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should work
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
SET client_min_messages TO INFO;
truncate raw_events_first;
SET client_min_messages TO DEBUG2;
-- this should also work
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 5;
SET client_min_messages TO INFO;
-- now do some tests with varchars
INSERT INTO insert_select_varchar_test VALUES ('test_1', 10);
INSERT INTO insert_select_varchar_test VALUES ('test_2', 30);

View File

@ -476,24 +476,6 @@ SET value_1 = 7
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id AND TRUE;
-- Test with inactive shard-placement
-- manually set shardstate of one placement of users_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440000;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
-- manually set shardstate of one placement of events_test_table as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1440004;
UPDATE users_test_table
SET value_2 = 5
FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440000;
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE shardid = 1440004;
-- Subquery must return single value to use it with comparison operators
UPDATE users_test_table as utt
SET value_1 = 3
@ -735,6 +717,8 @@ DELETE FROM users_test_table WHERE user_id = 3 or user_id = 5;
SELECT COUNT(*) FROM users_test_table WHERE user_id = 3 or user_id = 5;
DROP TABLE users_test_table;
DROP TABLE events_test_table_local;
DROP TABLE events_test_table;
DROP TABLE events_test_table_2;
DROP TABLE events_reference_copy_table;
DROP TABLE users_reference_copy_table;

View File

@ -216,7 +216,7 @@ SELECT * FROM pg_dist_shard
WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass
ORDER BY shardminvalue::BIGINT, logicalrelid;
SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport, shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid BETWEEN 1230000 AND 1399999 ORDER BY nodeport, shardid;
-- test failing foreign constraints after multiple tenant isolation
\COPY lineitem_streaming FROM STDIN WITH DELIMITER '|'
@ -317,15 +317,6 @@ SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1997-07-30';
SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1998-01-15';
SELECT count(*) FROM lineitem_date WHERE l_shipdate = '1997-08-08';
-- test with invalid shard placements
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-08-08', shard_transfer_mode => 'block_writes');
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE nodeport = :worker_1_port;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
DROP TABLE lineitem_date;

View File

@ -221,7 +221,7 @@ SELECT * FROM pg_dist_shard
WHERE logicalrelid = 'lineitem_streaming'::regclass OR logicalrelid = 'orders_streaming'::regclass
ORDER BY shardminvalue::BIGINT, logicalrelid;
SELECT * FROM pg_dist_shard_placement WHERE shardid >= 1230000 ORDER BY nodeport, shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid BETWEEN 1230000 AND 1399999 ORDER BY nodeport, shardid;
-- test failing foreign constraints after multiple tenant isolation
\COPY lineitem_streaming FROM STDIN WITH DELIMITER '|'
@ -331,15 +331,6 @@ SELECT isolate_tenant_to_new_shard('text_column', 'hello', shard_transfer_mode =
SELECT * FROM text_column;
SELECT public.wait_for_resource_cleanup();
-- test with invalid shard placements
\c - postgres - :master_port
SET search_path to "Tenant Isolation";
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port;
SELECT isolate_tenant_to_new_shard('lineitem_date', '1997-08-08', shard_transfer_mode => 'force_logical');
UPDATE pg_dist_shard_placement SET shardstate = 1 WHERE nodeport = :worker_1_port;
\c - mx_isolation_role_ent - :master_port
SET search_path to "Tenant Isolation";
DROP TABLE lineitem_date;

View File

@ -247,19 +247,6 @@ SELECT unnest(shard_placement_replication_array(
2
));
-- Check that shard_placement_replication_array returns a shard copy operation
-- for each of the inactive shards.
SELECT unnest(shard_placement_replication_array(
ARRAY['{"node_name": "hostname1"}',
'{"node_name": "hostname2"}']::json[],
ARRAY['{"shardid":1, "nodename":"hostname1"}',
'{"shardid":2, "shardstate":3, "nodename":"hostname1"}',
'{"shardid":1, "shardstate":3, "nodename":"hostname2"}',
'{"shardid":2, "nodename":"hostname2"}']::json[],
2
));
-- Check that shard_placement_replication_array errors out if all placements of
-- a shard are placed on inactive nodes.