activate_node_snapshot

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-17 10:50:21 +03:00
parent 6fd5f8e658
commit 1ebc2fd6c5
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
6 changed files with 59 additions and 55 deletions

View File

@ -107,12 +107,9 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ResyncMetadataCommandList();
static List * MetadataSetupCommandList(); static List * MetadataSetupCommandList();
static List * ClearMetadataCommandList(); static List * ClearMetadataCommandList();
static List * ClearShellTablesCommandList(); static List * ClearShellTablesCommandList();
static List * RecreateDistributedTablesWithDependenciesCommandList(
WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static List * MultipleDistributedTableIntegrationsCommandList(); static List * MultipleDistributedTableIntegrationsCommandList();
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -778,7 +775,7 @@ PropagateNodeWideObjectsCommandList()
* RecreateDistributedTablesWithDependenciesCommandList return command list to recreate * RecreateDistributedTablesWithDependenciesCommandList return command list to recreate
* distributed tables with command list. * distributed tables with command list.
*/ */
static List * List *
RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode) RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode)
{ {
List *commandList = NIL; List *commandList = NIL;
@ -788,8 +785,10 @@ RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode)
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand( commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(
workerNode->groupId))); workerNode->groupId)));
commandList = list_concat(commandList, ReplicateAllDependenciesToNodeCommandList( commandList = list_concat(commandList,
workerNode->workerName, workerNode->workerPort)); ReplicateAllDependenciesToNodeCommandList(
workerNode->workerName,
workerNode->workerPort));
commandList = list_concat(commandList, commandList = list_concat(commandList,
MultipleDistributedTableIntegrationsCommandList()); MultipleDistributedTableIntegrationsCommandList());
@ -831,8 +830,8 @@ ClearMetadataCommandList()
* ResyncMetadataCommandList returns the command list to resync all the * ResyncMetadataCommandList returns the command list to resync all the
* distributed table related metadata. * distributed table related metadata.
*/ */
static List * List *
ResyncMetadataCommandList() ResyncMetadataCommandList(void)
{ {
List *resyncMetadataCommandList = NIL; List *resyncMetadataCommandList = NIL;
@ -876,8 +875,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
Assert(superuser()); Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction( SendMetadataCommandListToWorkerInCoordinatedTransaction(
newWorkerNode->workerName, newWorkerNode->workerName,
newWorkerNode-> newWorkerNode->workerPort,
workerPort,
CurrentUserName(), CurrentUserName(),
commandList); commandList);
} }

View File

@ -20,6 +20,7 @@
#include "distributed/maintenanced.h" #include "distributed/maintenanced.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/latch.h" #include "storage/latch.h"
@ -28,45 +29,63 @@
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_metadata_snapshot); PG_FUNCTION_INFO_V1(activate_node_snapshot);
PG_FUNCTION_INFO_V1(wait_until_metadata_sync); PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
PG_FUNCTION_INFO_V1(trigger_metadata_sync); PG_FUNCTION_INFO_V1(trigger_metadata_sync);
PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync); PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync);
/* /*
* master_metadata_snapshot prints all the queries that are required * activate_node_snapshot prints all the queries that are required
* to generate a metadata snapshot. * to activate a node.
*/ */
Datum Datum
master_metadata_snapshot(PG_FUNCTION_ARGS) activate_node_snapshot(PG_FUNCTION_ARGS)
{ {
/*
* Activate node commands are created using the given worker node,
* so we are using first primary worker node just for test purposes.
*/
WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode();
List *recreateTablesCommands = RecreateDistributedTablesWithDependenciesCommandList(
dummyWorkerNode);
List *dropSnapshotCommands = MetadataDropCommands(); List *dropSnapshotCommands = MetadataDropCommands();
List *createSnapshotCommands = MetadataCreateCommands(); List *createSnapshotCommands = MetadataCreateCommands();
List *snapshotCommandList = NIL; List *metadataUpdateCommandList = ResyncMetadataCommandList();
int snapshotCommandIndex = 0; List *activateNodeCommandList = NIL;
int activateNodeCommandIndex = 0;
Oid ddlCommandTypeId = TEXTOID; Oid ddlCommandTypeId = TEXTOID;
snapshotCommandList = list_concat(snapshotCommandList, dropSnapshotCommands); activateNodeCommandList = list_concat(activateNodeCommandList,
snapshotCommandList = list_concat(snapshotCommandList, createSnapshotCommands); recreateTablesCommands);
activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,
createSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,
metadataUpdateCommandList);
int snapshotCommandCount = list_length(snapshotCommandList); int activateNodeCommandCount = list_length(activateNodeCommandList);
Datum *snapshotCommandDatumArray = palloc0(snapshotCommandCount * sizeof(Datum)); Datum *activateNodeCommandDatumArray = palloc0(activateNodeCommandCount *
sizeof(Datum));
const char *metadataSnapshotCommand = NULL; const char *activateNodeSnapshotCommand = NULL;
foreach_ptr(metadataSnapshotCommand, snapshotCommandList) foreach_ptr(activateNodeSnapshotCommand, activateNodeCommandList)
{ {
Datum metadataSnapshotCommandDatum = CStringGetTextDatum(metadataSnapshotCommand); Datum activateNodeSnapshotCommandDatum = CStringGetTextDatum(
activateNodeSnapshotCommand);
snapshotCommandDatumArray[snapshotCommandIndex] = metadataSnapshotCommandDatum; activateNodeCommandDatumArray[activateNodeCommandIndex] =
snapshotCommandIndex++; activateNodeSnapshotCommandDatum;
activateNodeCommandIndex++;
} }
ArrayType *snapshotCommandArrayType = DatumArrayToArrayType(snapshotCommandDatumArray, ArrayType *activateNodeCommandArrayType = DatumArrayToArrayType(
snapshotCommandCount, activateNodeCommandDatumArray,
ddlCommandTypeId); activateNodeCommandCount,
ddlCommandTypeId);
PG_RETURN_ARRAYTYPE_P(snapshotCommandArrayType); PG_RETURN_ARRAYTYPE_P(activateNodeCommandArrayType);
} }

View File

@ -103,6 +103,9 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value); Datum value);
extern uint32 CountPrimariesWithMetadata(void); extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void); extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * RecreateDistributedTablesWithDependenciesCommandList(
WorkerNode *workerNode);
extern List * ResyncMetadataCommandList(void);
/* Function declarations for worker node utilities */ /* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);

View File

@ -25,13 +25,6 @@ SELECT create_distributed_table('notices', 'id');
(1 row) (1 row)
INSERT INTO notices VALUES (1, 'hello world'); INSERT INTO notices VALUES (1, 'hello world');
-- Create the necessary test utility function
CREATE OR REPLACE FUNCTION master_metadata_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION master_metadata_snapshot()
IS 'commands to create the metadata snapshot';
CREATE FUNCTION notice(text) CREATE FUNCTION notice(text)
RETURNS void RETURNS void
LANGUAGE plpgsql AS $$ LANGUAGE plpgsql AS $$

View File

@ -19,15 +19,6 @@ CREATE TABLE notices (
SELECT create_distributed_table('notices', 'id'); SELECT create_distributed_table('notices', 'id');
INSERT INTO notices VALUES (1, 'hello world'); INSERT INTO notices VALUES (1, 'hello world');
-- Create the necessary test utility function
CREATE OR REPLACE FUNCTION master_metadata_snapshot()
RETURNS text[]
LANGUAGE C STRICT
AS 'citus';
COMMENT ON FUNCTION master_metadata_snapshot()
IS 'commands to create the metadata snapshot';
CREATE FUNCTION notice(text) CREATE FUNCTION notice(text)
RETURNS void RETURNS void
LANGUAGE plpgsql AS $$ LANGUAGE plpgsql AS $$

View File

@ -20,20 +20,20 @@ SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function -- Create the necessary test utility function
CREATE FUNCTION master_metadata_snapshot() CREATE FUNCTION activate_node_snapshot()
RETURNS text[] RETURNS text[]
LANGUAGE C STRICT LANGUAGE C STRICT
AS 'citus'; AS 'citus';
COMMENT ON FUNCTION master_metadata_snapshot() COMMENT ON FUNCTION activate_node_snapshot()
IS 'commands to create the metadata snapshot'; IS 'commands to activate node snapshot';
-- Show that none of the existing tables are qualified to be MX tables -- Show that none of the existing tables are qualified to be MX tables
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands, -- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries and reference tables -- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- this function is dropped in Citus10, added here for tests -- this function is dropped in Citus10, added here for tests
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass, CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
@ -61,26 +61,26 @@ reset citus.shard_replication_factor;
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass; UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass;
-- Show that the created MX table is included in the metadata snapshot -- Show that the created MX table is included in the metadata snapshot
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- Show that CREATE INDEX commands are included in the metadata snapshot -- Show that CREATE INDEX commands are included in the metadata snapshot
CREATE INDEX mx_index ON mx_test_table(col_2); CREATE INDEX mx_index ON mx_test_table(col_2);
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- Show that schema changes are included in the metadata snapshot -- Show that schema changes are included in the metadata snapshot
CREATE SCHEMA mx_testing_schema; CREATE SCHEMA mx_testing_schema;
ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema; ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema;
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- Show that append distributed tables are not included in the metadata snapshot -- Show that append distributed tables are not included in the metadata snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text); CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append'); SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append');
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass; UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass;
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- Show that range distributed tables are not included in the metadata snapshot -- Show that range distributed tables are not included in the metadata snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- Test start_metadata_sync_to_node and citus_activate_node UDFs -- Test start_metadata_sync_to_node and citus_activate_node UDFs
@ -784,7 +784,7 @@ ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1');
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT; ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1'); ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1');
SELECT unnest(master_metadata_snapshot()) order by 1; SELECT unnest(activate_node_snapshot()) order by 1;
-- shouldn't work since test_table is MX -- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial; ALTER TABLE test_table ADD COLUMN id3 bigserial;