mirror of https://github.com/citusdata/citus.git
Add stop_metadata_sync_to_node UDF
parent
b94647c3bc
commit
afbb5ffb31
|
@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
|
||||||
6.1-1 6.1-2
|
6.1-1 6.1-2 6.1-3
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -99,6 +99,8 @@ $(EXTENSION)--6.1-1.sql: $(EXTENSION)--6.0-18.sql $(EXTENSION)--6.0-18--6.1-1.sq
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql
|
$(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
/* citus--6.1-2--6.1-3.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE FUNCTION stop_metadata_sync_to_node(nodename text, nodeport integer)
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$stop_metadata_sync_to_node$$;
|
||||||
|
COMMENT ON FUNCTION stop_metadata_sync_to_node(nodename text, nodeport integer)
|
||||||
|
IS 'stop metadata sync to node';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.1-2'
|
default_version = '6.1-3'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -43,11 +43,12 @@
|
||||||
|
|
||||||
|
|
||||||
static char * LocalGroupIdUpdateCommand(uint32 groupId);
|
static char * LocalGroupIdUpdateCommand(uint32 groupId);
|
||||||
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort);
|
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
|
||||||
static char * TruncateTriggerCreateCommand(Oid relationId);
|
static char * TruncateTriggerCreateCommand(Oid relationId);
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
||||||
|
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -111,7 +112,35 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
||||||
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
|
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
|
||||||
recreateMetadataSnapshotCommandList);
|
recreateMetadataSnapshotCommandList);
|
||||||
|
|
||||||
MarkNodeHasMetadata(nodeNameString, nodePort);
|
MarkNodeHasMetadata(nodeNameString, nodePort, true);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
|
||||||
|
* to false in pg_dist_node table, thus indicating that the specified worker node does not
|
||||||
|
* receive DDL changes anymore and cannot be used for issuing mx queries.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||||
|
int32 nodePort = PG_GETARG_INT32(1);
|
||||||
|
char *nodeNameString = text_to_cstring(nodeName);
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
EnsureSuperUser();
|
||||||
|
|
||||||
|
workerNode = FindWorkerNode(nodeNameString, nodePort);
|
||||||
|
if (workerNode == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("node (%s,%d) does not exist", nodeNameString, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
MarkNodeHasMetadata(nodeNameString, nodePort, false);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -539,7 +568,7 @@ LocalGroupIdUpdateCommand(uint32 groupId)
|
||||||
* pg_dist_node to true.
|
* pg_dist_node to true.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
MarkNodeHasMetadata(char *nodeName, int32 nodePort)
|
MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
|
||||||
{
|
{
|
||||||
const bool indexOK = false;
|
const bool indexOK = false;
|
||||||
const int scanKeyCount = 2;
|
const int scanKeyCount = 2;
|
||||||
|
@ -573,7 +602,7 @@ MarkNodeHasMetadata(char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
memset(replace, 0, sizeof(replace));
|
||||||
|
|
||||||
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(true);
|
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
|
||||||
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
|
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
|
||||||
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
|
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-17';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-18';
|
ALTER EXTENSION citus UPDATE TO '6.0-18';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM pg_depend AS pgd,
|
FROM pg_depend AS pgd,
|
||||||
|
|
|
@ -382,6 +382,32 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
|
||||||
f
|
f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
|
||||||
|
hasmetadata
|
||||||
|
-------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
stop_metadata_sync_to_node
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
|
||||||
|
hasmetadata
|
||||||
|
-------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Cleanup
|
-- Cleanup
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
DROP TABLE mx_testing_schema.mx_test_table;
|
DROP TABLE mx_testing_schema.mx_test_table;
|
||||||
|
@ -391,6 +417,17 @@ DELETE FROM pg_dist_shard;
|
||||||
DELETE FROM pg_dist_shard_placement;
|
DELETE FROM pg_dist_shard_placement;
|
||||||
\d mx_testing_schema.mx_test_table
|
\d mx_testing_schema.mx_test_table
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE mx_testing_schema.mx_test_table;
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
UPDATE pg_dist_node SET hasmetadata=false;
|
stop_metadata_sync_to_node
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
|
stop_metadata_sync_to_node
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE mx_testing_schema.mx_test_table CASCADE;
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
|
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
|
||||||
|
|
|
@ -60,6 +60,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-17';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-18';
|
ALTER EXTENSION citus UPDATE TO '6.0-18';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
ALTER EXTENSION citus UPDATE TO '6.1-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
ALTER EXTENSION citus UPDATE TO '6.1-2';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.1-3';
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
|
|
|
@ -131,6 +131,13 @@ ROLLBACK;
|
||||||
|
|
||||||
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
|
||||||
|
|
||||||
|
-- Check that stop_metadata_sync_to_node function sets hasmetadata of the node to false
|
||||||
|
\c - - - :master_port
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
|
||||||
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
|
||||||
|
|
||||||
-- Cleanup
|
-- Cleanup
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
DROP TABLE mx_testing_schema.mx_test_table;
|
DROP TABLE mx_testing_schema.mx_test_table;
|
||||||
|
@ -141,7 +148,8 @@ DELETE FROM pg_dist_shard_placement;
|
||||||
\d mx_testing_schema.mx_test_table
|
\d mx_testing_schema.mx_test_table
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP TABLE mx_testing_schema.mx_test_table;
|
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
UPDATE pg_dist_node SET hasmetadata=false;
|
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
|
DROP TABLE mx_testing_schema.mx_test_table CASCADE;
|
||||||
|
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
|
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;
|
||||||
|
|
Loading…
Reference in New Issue