Merge remote-tracking branch 'upstream/multi-tenant-monitoring' into multi-tenant-monitoring-annotation-parsing

pull/6763/head
Gokhan Gulbiz 2023-03-10 16:29:12 +03:00
commit 64b6d7a44e
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
15 changed files with 84 additions and 73 deletions

View File

@ -11,6 +11,7 @@
#include "postgres.h"
#include "unistd.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/log_utils.h"
#include "distributed/listutils.h"
#include "distributed/jsonbutils.h"
@ -43,6 +44,9 @@ clock_t attributeToTenantStart = { 0 };
const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory for multi tenant monitor";
char *tenantTrancheName = "Tenant Tranche";
char *monitorTrancheName = "Multi Tenant Monitor Tranche";
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static void UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor,
@ -95,7 +99,7 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
//!!!!!!!LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
monitor->periodStart = monitor->periodStart +
((monitoringTime - monitor->periodStart) /
@ -135,7 +139,7 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
//!!!!!!!LWLockRelease(&monitor->lock);
LWLockRelease(&monitor->lock);
PG_RETURN_VOID();
}
@ -178,7 +182,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
}
else
{
//Assert(attributeToTenant == NULL);
/*Assert(attributeToTenant == NULL); */
}
/*DetachSegment(); */
@ -252,7 +256,7 @@ AttributeMetricsIfApplicable()
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
//!!!!!!!LWLockAcquire(&monitor->lock, LW_SHARED);
LWLockAcquire(&monitor->lock, LW_SHARED);
monitor->periodStart = monitor->periodStart +
((queryTime - monitor->periodStart) /
@ -267,7 +271,7 @@ AttributeMetricsIfApplicable()
}
TenantStats *tenantStats = &monitor->tenants[tenantIndex];
//!!!!!!!LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
UpdatePeriodsIfNecessary(monitor, tenantStats);
tenantStats->lastQueryTime = queryTime;
@ -284,9 +288,10 @@ AttributeMetricsIfApplicable()
* After updating the score we might need to change the rank of the tenant in the monitor
*/
while (tenantIndex != 0 &&
monitor->tenants[tenantIndex - 1].score < tenantStats->score)
monitor->tenants[tenantIndex - 1].score <
monitor->tenants[tenantIndex].score)
{
//!!!!!!!LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE);
LWLockAcquire(&monitor->tenants[tenantIndex - 1].lock, LW_EXCLUSIVE);
ReduceScoreIfNecessary(monitor, &monitor->tenants[tenantIndex - 1],
queryTime);
@ -295,10 +300,11 @@ AttributeMetricsIfApplicable()
monitor->tenants[tenantIndex] = monitor->tenants[tenantIndex - 1];
monitor->tenants[tenantIndex - 1] = tempTenant;
//!!!!!!!LWLockRelease(&monitor->tenants[tenantIndex - 1].lock);
LWLockRelease(&monitor->tenants[tenantIndex].lock);
tenantIndex--;
}
tenantStats = &monitor->tenants[tenantIndex];
if (attributeCommandType == CMD_SELECT)
{
@ -313,8 +319,8 @@ AttributeMetricsIfApplicable()
tenantStats->totalInsertTime += cpu_time_used;
}
//!!!!!!!LWLockRelease(&tenantStats->lock);
//!!!!!!!LWLockRelease(&monitor->lock);
LWLockRelease(&monitor->lock);
LWLockRelease(&tenantStats->lock);
/*
* We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit,
@ -324,21 +330,22 @@ AttributeMetricsIfApplicable()
*/
if (monitor->tenantCount >= CitusStatsTenantsLimit * 3)
{
//!!!!!!!LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
monitor->tenantCount = CitusStatsTenantsLimit * 2;
//!!!!!!!LWLockRelease(&monitor->lock);
LWLockRelease(&monitor->lock);
}
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{
ereport(NOTICE, (errmsg(
"total select count = %d, total CPU time = %f to tenant: %s",
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f "
"to tenant: %s",
tenantStats->selectCount,
tenantStats->totalSelectTime,
tenantStats->tenantAttribute)));
}
}
//attributeToTenant = NULL;
/*attributeToTenant = NULL; */
}
@ -449,11 +456,11 @@ CreateSharedMemoryForMultiTenantMonitor()
return monitor;
}
char *trancheName = "Multi Tenant Monitor Tranche";
monitor->namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->namedLockTranche.trancheName = monitorTrancheName;
LWLockRegisterTranche(monitor->namedLockTranche.trancheId, trancheName);
LWLockRegisterTranche(monitor->namedLockTranche.trancheId,
monitor->namedLockTranche.trancheName);
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
return monitor;
@ -519,15 +526,15 @@ CreateTenantStats(MultiTenantMonitor *monitor)
{
int tenantIndex = monitor->tenantCount;
strcpy(monitor->tenants[tenantIndex].tenantAttribute, attributeToTenant);
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId;
char *trancheName = "Tenant Tranche";
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->tenants[tenantIndex].namedLockTranche.trancheName = tenantTrancheName;
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId,
trancheName);
monitor->tenants[tenantIndex].namedLockTranche.trancheName);
LWLockInitialize(&monitor->tenants[tenantIndex].lock,
monitor->tenants[tenantIndex].namedLockTranche.trancheId);

View File

@ -25,7 +25,7 @@ SELECT citus.clear_network_traffic();
---- test multiple statements spanning multiple shards,
---- at each significant point. These transactions are 2pc
-- fail at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -54,7 +54,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -83,7 +83,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- fail at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -110,7 +110,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -137,7 +137,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- fail at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -163,7 +163,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ SELECT create_reference_table('reference_table');
-- (d) multi-row INSERT that hits multiple shards in multiple workers
-- (e) multi-row INSERT to a reference table
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -26,7 +26,7 @@ SELECT COUNT(*) FROM ref_table;
(1 row)
-- verify behavior of single INSERT; should fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -41,7 +41,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=5;
(1 row)
-- verify behavior of UPDATE ... RETURNING; should not execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -56,7 +56,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=7;
(1 row)
-- verify fix to #2214; should raise error and fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -21,7 +21,7 @@ CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -20,7 +20,7 @@ SELECT create_distributed_table('mod_test', 'key');
(1 row)
-- verify behavior of single INSERT; should mark shard as failed
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -52,7 +52,7 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
INSERT INTO mod_test VALUES (2, 6);
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -78,7 +78,7 @@ WHERE shardid IN (
TRUNCATE mod_test;
-- verify behavior of multi-statement modifications to a single shard
-- should fail the transaction and never mark placements inactive
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -23,7 +23,7 @@ SELECT create_distributed_table('select_test', 'key');
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
mitmproxy
---------------------------------------------------------------------
@ -45,7 +45,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
-- kill after first SELECT; txn should fail as INSERT triggers
-- 2PC (and placementis not marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
mitmproxy
---------------------------------------------------------------------
@ -66,7 +66,7 @@ TRUNCATE select_test;
-- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -77,7 +77,7 @@ ERROR: canceling statement due to user request
SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -107,7 +107,7 @@ SELECT citus.mitmproxy('conn.allow()');
TRUNCATE select_test;
-- cancel the second query
-- error after second SELECT; txn should fail
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -126,7 +126,7 @@ SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request
COMMIT;
-- error after second SELECT; txn should fails the transaction
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
mitmproxy
---------------------------------------------------------------------
@ -144,7 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
COMMIT;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
mitmproxy
---------------------------------------------------------------------
@ -173,7 +173,7 @@ SELECT create_distributed_table('select_test', 'key');
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
mitmproxy
---------------------------------------------------------------------
@ -188,7 +188,7 @@ SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------

View File

@ -121,6 +121,8 @@ ORDER BY 1;
function citus_stat_activity()
function citus_stat_statements()
function citus_stat_statements_reset()
function citus_stats_tenants(boolean)
function citus_stats_tenants_storage()
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_task_wait(bigint,citus_task_status)
@ -316,7 +318,9 @@ ORDER BY 1;
view citus_shards_on_worker
view citus_stat_activity
view citus_stat_statements
view citus_stats_tenants
view citus_stats_tenants_storage
view pg_dist_shard_placement
view time_partitions
(310 rows)
(314 rows)

View File

@ -21,7 +21,7 @@ SELECT citus.clear_network_traffic();
---- at each significant point. These transactions are 2pc
-- fail at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -35,7 +35,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -49,7 +49,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- fail at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -63,7 +63,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -77,7 +77,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- fail at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -91,7 +91,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;

View File

@ -30,7 +30,7 @@ SELECT create_reference_table('reference_table');
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3);
-- this test is broken, see https://github.com/citusdata/citus/issues/2460

View File

@ -17,19 +17,19 @@ SELECT citus.clear_network_traffic();
SELECT COUNT(*) FROM ref_table;
-- verify behavior of single INSERT; should fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO ref_table VALUES (5, 6);
SELECT COUNT(*) FROM ref_table WHERE key=5;
-- verify behavior of UPDATE ... RETURNING; should not execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
UPDATE ref_table SET key=7 RETURNING value;
SELECT COUNT(*) FROM ref_table WHERE key=7;
-- verify fix to #2214; should raise error and fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
DELETE FROM ref_table WHERE key=5;

View File

@ -19,7 +19,7 @@ CREATE TABLE partitioned_table_0
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO partitioned_table VALUES (0, 0);

View File

@ -8,7 +8,7 @@ CREATE TABLE mod_test (key int, value text);
SELECT create_distributed_table('mod_test', 'key');
-- verify behavior of single INSERT; should mark shard as failed
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO mod_test VALUES (2, 6);
SELECT COUNT(*) FROM mod_test WHERE key=2;
@ -24,7 +24,7 @@ TRUNCATE mod_test;
SELECT citus.mitmproxy('conn.allow()');
INSERT INTO mod_test VALUES (2, 6);
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key;
SELECT COUNT(*) FROM mod_test WHERE value='ok';
@ -38,7 +38,7 @@ TRUNCATE mod_test;
-- verify behavior of multi-statement modifications to a single shard
-- should fail the transaction and never mark placements inactive
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
INSERT INTO mod_test VALUES (2, 6);

View File

@ -13,13 +13,13 @@ SELECT create_distributed_table('select_test', 'key');
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 3;
-- kill after first SELECT; txn should fail as INSERT triggers
-- 2PC (and placementis not marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -35,12 +35,12 @@ TRUNCATE select_test;
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 3;
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -58,7 +58,7 @@ TRUNCATE select_test;
-- cancel the second query
-- error after second SELECT; txn should fail
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -68,7 +68,7 @@ SELECT * FROM select_test WHERE key = 3;
COMMIT;
-- error after second SELECT; txn should fails the transaction
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -77,7 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3;
COMMIT;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT recover_prepared_transactions();
SELECT recover_prepared_transactions();
@ -93,12 +93,12 @@ SELECT create_distributed_table('select_test', 'key');
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;
-- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;