mirror of https://github.com/citusdata/citus.git
Add adjust sequence settings and update tests
parent
8006765504
commit
76e1e1fd6b
|
@ -110,6 +110,7 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
|
||||||
*nodeMetadata);
|
*nodeMetadata);
|
||||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||||
static void SetUpObjectMetadata(WorkerNode *workerNode);
|
static void SetUpObjectMetadata(WorkerNode *workerNode);
|
||||||
|
static void AdjustSequenceLimits(WorkerNode *workerNode);
|
||||||
static void ClearDistributedObjectsFromNode(WorkerNode *workerNode);
|
static void ClearDistributedObjectsFromNode(WorkerNode *workerNode);
|
||||||
static void ClearDistributedTablesFromNode(WorkerNode *workerNode);
|
static void ClearDistributedTablesFromNode(WorkerNode *workerNode);
|
||||||
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
|
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
|
||||||
|
@ -734,6 +735,49 @@ SetUpObjectMetadata(WorkerNode *workerNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AdjustSequenceLimits adjusts the limits of sequences on the given node
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AdjustSequenceLimits(WorkerNode *workerNode)
|
||||||
|
{
|
||||||
|
List *distributedTableList = CitusTableList();
|
||||||
|
List *propagatedTableList = NIL;
|
||||||
|
List *metadataSnapshotCommandList = NIL;
|
||||||
|
|
||||||
|
/* create the list of tables whose metadata will be created */
|
||||||
|
CitusTableCacheEntry *cacheEntry = NULL;
|
||||||
|
foreach_ptr(cacheEntry, distributedTableList)
|
||||||
|
{
|
||||||
|
if (ShouldSyncTableMetadata(cacheEntry->relationId))
|
||||||
|
{
|
||||||
|
propagatedTableList = lappend(propagatedTableList, cacheEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* after all tables are created, create the metadata */
|
||||||
|
foreach_ptr(cacheEntry, propagatedTableList)
|
||||||
|
{
|
||||||
|
Oid relationId = cacheEntry->relationId;
|
||||||
|
|
||||||
|
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
|
||||||
|
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||||
|
workerSequenceDDLCommands);
|
||||||
|
}
|
||||||
|
|
||||||
|
metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION,
|
||||||
|
metadataSnapshotCommandList);
|
||||||
|
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||||
|
ENABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
char *currentUser = CurrentUserName();
|
||||||
|
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
|
||||||
|
workerNode->workerPort,
|
||||||
|
currentUser,
|
||||||
|
metadataSnapshotCommandList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
|
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
|
||||||
* pg_dist_object entries on the new node.
|
* pg_dist_object entries on the new node.
|
||||||
|
@ -1214,6 +1258,7 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
{
|
{
|
||||||
ClearDistributedObjectsFromNode(workerNode);
|
ClearDistributedObjectsFromNode(workerNode);
|
||||||
SetUpObjectMetadata(workerNode);
|
SetUpObjectMetadata(workerNode);
|
||||||
|
AdjustSequenceLimits(workerNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -595,13 +595,6 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- show that we are able to propagate objects with multiple item on address arrays
|
|
||||||
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
|
|
||||||
metadata_command
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('server', ARRAY['fake_fdw_server']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- valid distribution with distribution_arg_index
|
-- valid distribution with distribution_arg_index
|
||||||
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
|
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
|
||||||
create_distributed_function
|
create_distributed_function
|
||||||
|
|
|
@ -89,10 +89,10 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' A
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- this should work properly
|
-- this should work properly
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT citus_activate_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
citus_activate_node
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
@ -669,6 +669,14 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
|
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
|
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
|
||||||
|
+NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
|
||||||
|
+DETAIL: on server postgres@localhost:57638 connectionId: 1
|
||||||
|
+NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
|
||||||
|
+DETAIL: on server postgres@localhost:57637 connectionId: 2
|
||||||
|
+NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
|
||||||
|
+DETAIL: on server postgres@localhost:57638 connectionId: 1
|
||||||
|
+NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
|
||||||
|
+DETAIL: on server postgres@localhost:57637 connectionId: 2
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
NOTICE: issuing SELECT citus_internal_add_partition_metadata ('fix_idx_names.p2'::regclass, 'h', 'dist_col', 1370000, 's')
|
NOTICE: issuing SELECT citus_internal_add_partition_metadata ('fix_idx_names.p2'::regclass, 'h', 'dist_col', 1370000, 's')
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
|
|
@ -4301,10 +4301,10 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
-- should work properly - no names clashes
|
-- should work properly - no names clashes
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
SELECT citus_activate_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
citus_activate_node
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
|
|
@ -467,11 +467,10 @@ SELECT shardid, nodename, nodeport
|
||||||
|
|
||||||
-- disable the first node
|
-- disable the first node
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP FOREIGN TABLE foreign_table_to_distribute;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
table pg_dist_node;
|
|
||||||
SELECT master_disable_node('localhost', :worker_1_port);
|
SELECT master_disable_node('localhost', :worker_1_port);
|
||||||
SELECT public.wait_until_metadata_sync(30000);
|
SELECT public.wait_until_metadata_sync(30000);
|
||||||
table pg_dist_node;
|
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
|
@ -483,7 +482,6 @@ SET citus.shard_replication_factor TO 1;
|
||||||
-- add two new shards and verify they are created at the other node
|
-- add two new shards and verify they are created at the other node
|
||||||
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
||||||
SELECT master_create_empty_shard('numbers_append') AS shardid2 \gset
|
SELECT master_create_empty_shard('numbers_append') AS shardid2 \gset
|
||||||
table pg_dist_node;
|
|
||||||
|
|
||||||
COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid1);
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid1);
|
||||||
5,7
|
5,7
|
||||||
|
|
|
@ -358,9 +358,6 @@ SELECT run_command_on_workers($$SELECT count(*) FROM pg_proc WHERE proname='eq_w
|
||||||
-- valid distribution with distribution_arg_name -- case insensitive
|
-- valid distribution with distribution_arg_name -- case insensitive
|
||||||
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1');
|
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', distribution_arg_name:='VaL1');
|
||||||
|
|
||||||
-- show that we are able to propagate objects with multiple item on address arrays
|
|
||||||
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
|
|
||||||
|
|
||||||
-- valid distribution with distribution_arg_index
|
-- valid distribution with distribution_arg_index
|
||||||
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
|
SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)','$1');
|
||||||
|
|
||||||
|
|
|
@ -51,24 +51,13 @@ SELECT * FROM seq_test_0_local_table ORDER BY 1, 2 LIMIT 5;
|
||||||
ALTER SEQUENCE seq_0 AS bigint;
|
ALTER SEQUENCE seq_0 AS bigint;
|
||||||
ALTER SEQUENCE seq_0_local_table AS bigint;
|
ALTER SEQUENCE seq_0_local_table AS bigint;
|
||||||
|
|
||||||
-- we can't change sequences as we mark them as distributed
|
|
||||||
-- even if metadata sync is stopped
|
|
||||||
BEGIN;
|
|
||||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
|
||||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|
||||||
CREATE SEQUENCE seq_13;
|
|
||||||
CREATE TABLE seq_test_13 (x int, y int);
|
|
||||||
SELECT create_distributed_table('seq_test_13','x');
|
|
||||||
ALTER TABLE seq_test_13 ADD COLUMN z int DEFAULT nextval('seq_13');
|
|
||||||
|
|
||||||
ALTER SEQUENCE seq_13 INCREMENT BY 2;
|
|
||||||
|
|
||||||
ROLLBACK;
|
|
||||||
|
|
||||||
-- check alter column type precaution
|
-- check alter column type precaution
|
||||||
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
|
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
|
||||||
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
|
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
|
||||||
|
|
||||||
|
-- TODO: Sequences stay there after rollback!
|
||||||
|
-- TODO: Talk with Onder about adjusting sequence limit
|
||||||
|
|
||||||
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint;
|
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint;
|
||||||
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;
|
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue