mirror of https://github.com/citusdata/citus.git
Merge pull request #2224 from citusdata/set_local_via_c
Implement C interface for setting GUCpull/2227/head
commit
0c47d16e8e
|
@ -47,10 +47,44 @@
|
||||||
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
||||||
|
|
||||||
|
|
||||||
/* ocal function forward declarations */
|
/* local function forward declarations */
|
||||||
|
static void SetLocalMultiShardModifyModeToSequential(void);
|
||||||
static Relation StubRelation(TupleDesc tupleDescriptor);
|
static Relation StubRelation(TupleDesc tupleDescriptor);
|
||||||
|
|
||||||
|
|
||||||
|
/* exports for SQL callable functions */
|
||||||
|
PG_FUNCTION_INFO_V1(set_local_multi_shard_modify_mode_to_sequential);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* set_local_multi_shard_modify_mode_to_sequential is a SQL
|
||||||
|
* interface for testing SetLocalMultiShardModifyModeToSequential().
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
set_local_multi_shard_modify_mode_to_sequential(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
SetLocalMultiShardModifyModeToSequential();
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SetLocalMultiShardModifyModeToSequential simply a C interface for
|
||||||
|
* setting the following:
|
||||||
|
* SET LOCAL citus.multi_shard_modify_mode = 'sequential';
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
SetLocalMultiShardModifyModeToSequential()
|
||||||
|
{
|
||||||
|
WarnNoTransactionChain(true, "SET LOCAL");
|
||||||
|
|
||||||
|
set_config_option("citus.multi_shard_modify_mode", "sequential",
|
||||||
|
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
|
||||||
|
GUC_ACTION_LOCAL, true, 0, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
|
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
|
||||||
* given Citus scan node and returns it. It returns null if all tuples are read
|
* given Citus scan node and returns it. It returns null if all tuples are read
|
||||||
|
|
|
@ -51,6 +51,10 @@ BEGIN
|
||||||
END;
|
END;
|
||||||
$$
|
$$
|
||||||
LANGUAGE 'plpgsql' IMMUTABLE;
|
LANGUAGE 'plpgsql' IMMUTABLE;
|
||||||
|
CREATE OR REPLACE FUNCTION set_local_multi_shard_modify_mode_to_sequential()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STABLE STRICT
|
||||||
|
AS 'citus', $$set_local_multi_shard_modify_mode_to_sequential$$;
|
||||||
-- disbable 2PC recovery since our tests will check that
|
-- disbable 2PC recovery since our tests will check that
|
||||||
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
|
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
@ -66,6 +70,14 @@ SELECT create_distributed_table('test_table', 'a');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- not useful if not in transaction
|
||||||
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
|
WARNING: SET LOCAL can only be used in transaction blocks
|
||||||
|
set_local_multi_shard_modify_mode_to_sequential
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- we should see #worker transactions
|
-- we should see #worker transactions
|
||||||
-- when sequential mode is used
|
-- when sequential mode is used
|
||||||
SET citus.multi_shard_modify_mode TO 'sequential';
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
@ -406,7 +418,12 @@ INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
-- now switch to sequential mode to enable a successful TRUNCATE
|
-- now switch to sequential mode to enable a successful TRUNCATE
|
||||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
|
set_local_multi_shard_modify_mode_to_sequential
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
TRUNCATE multi_shard_modify_test;
|
TRUNCATE multi_shard_modify_test;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- see that all the data successfully removed
|
-- see that all the data successfully removed
|
||||||
|
@ -451,7 +468,12 @@ INSERT INTO multi_shard_modify_test SELECT i, i::text, i FROM generate_series(0,
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
||||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
|
set_local_multi_shard_modify_mode_to_sequential
|
||||||
|
-------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- see that all the data successfully inserted
|
-- see that all the data successfully inserted
|
||||||
|
@ -471,10 +493,11 @@ SELECT pg_reload_conf();
|
||||||
|
|
||||||
SET search_path TO 'public';
|
SET search_path TO 'public';
|
||||||
DROP SCHEMA test_seq_ddl CASCADE;
|
DROP SCHEMA test_seq_ddl CASCADE;
|
||||||
NOTICE: drop cascades to 9 other objects
|
NOTICE: drop cascades to 10 other objects
|
||||||
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
|
DETAIL: drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_worker_count()
|
||||||
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
|
drop cascades to function test_seq_ddl.distributed_2pcs_are_equal_to_placement_count()
|
||||||
drop cascades to function test_seq_ddl.no_distributed_2pcs()
|
drop cascades to function test_seq_ddl.no_distributed_2pcs()
|
||||||
|
drop cascades to function test_seq_ddl.set_local_multi_shard_modify_mode_to_sequential()
|
||||||
drop cascades to table test_seq_ddl.test_table
|
drop cascades to table test_seq_ddl.test_table
|
||||||
drop cascades to table test_seq_ddl.ref_test
|
drop cascades to table test_seq_ddl.ref_test
|
||||||
drop cascades to table test_seq_ddl.test_table_rep_2
|
drop cascades to table test_seq_ddl.test_table_rep_2
|
||||||
|
|
|
@ -57,6 +57,11 @@ END;
|
||||||
$$
|
$$
|
||||||
LANGUAGE 'plpgsql' IMMUTABLE;
|
LANGUAGE 'plpgsql' IMMUTABLE;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION set_local_multi_shard_modify_mode_to_sequential()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STABLE STRICT
|
||||||
|
AS 'citus', $$set_local_multi_shard_modify_mode_to_sequential$$;
|
||||||
|
|
||||||
-- disbable 2PC recovery since our tests will check that
|
-- disbable 2PC recovery since our tests will check that
|
||||||
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
|
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
@ -64,6 +69,9 @@ SELECT pg_reload_conf();
|
||||||
CREATE TABLE test_table(a int, b int);
|
CREATE TABLE test_table(a int, b int);
|
||||||
SELECT create_distributed_table('test_table', 'a');
|
SELECT create_distributed_table('test_table', 'a');
|
||||||
|
|
||||||
|
-- not useful if not in transaction
|
||||||
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
|
|
||||||
-- we should see #worker transactions
|
-- we should see #worker transactions
|
||||||
-- when sequential mode is used
|
-- when sequential mode is used
|
||||||
SET citus.multi_shard_modify_mode TO 'sequential';
|
SET citus.multi_shard_modify_mode TO 'sequential';
|
||||||
|
@ -212,7 +220,7 @@ BEGIN;
|
||||||
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
|
||||||
-- now switch to sequential mode to enable a successful TRUNCATE
|
-- now switch to sequential mode to enable a successful TRUNCATE
|
||||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
TRUNCATE multi_shard_modify_test;
|
TRUNCATE multi_shard_modify_test;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
@ -237,7 +245,7 @@ BEGIN;
|
||||||
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
INSERT INTO multi_shard_modify_test VALUES (1,'1',1), (2,'2',2), (3,'3',3), (4,'4',4);
|
||||||
|
|
||||||
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
-- now switch to sequential mode to enable a successful INSERT .. SELECT
|
||||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
SELECT set_local_multi_shard_modify_mode_to_sequential();
|
||||||
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
INSERT INTO multi_shard_modify_test SELECT * FROM multi_shard_modify_test;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue