Sequence and add node fix

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-12 15:24:09 +03:00
parent 5204db187b
commit ad67942cda
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
13 changed files with 92 additions and 166 deletions

View File

@ -86,7 +86,6 @@ char *EnableManualMetadataChangesForUser = "";
static List * GetDistributedTableMetadataEvents(Oid relationId);
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
int colocationId);
static char * LocalGroupIdUpdateCommand(int32 groupId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
@ -1188,7 +1187,7 @@ PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string.
*/
static char *
char *
LocalGroupIdUpdateCommand(int32 groupId)
{
StringInfo updateCommand = makeStringInfo();

View File

@ -110,9 +110,9 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpObjectMetadata(WorkerNode *workerNode);
static void AdjustSequenceLimits(WorkerNode *workerNode);
static void ClearDistributedObjectsFromNode(WorkerNode *workerNode);
static void ClearDistributedTablesFromNode(WorkerNode *workerNode);
static void UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -247,12 +247,6 @@ citus_add_node(PG_FUNCTION_ARGS)
bool nodeAlreadyExists = false;
nodeMetadata.groupId = PG_GETARG_INT32(2);
if (!EnableDependencyCreation)
{
ereport(ERROR, (errmsg("citus.enable_object_propagation must be on to "
"add node")));
}
/*
* During tests this function is called before nodeRole and nodeCluster have been
* created.
@ -734,49 +728,6 @@ SetUpObjectMetadata(WorkerNode *workerNode)
}
/*
* AdjustSequenceLimits adjusts the limits of sequences on the given node
*/
static void
AdjustSequenceLimits(WorkerNode *workerNode)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
List *metadataSnapshotCommandList = NIL;
/* create the list of tables whose metadata will be created */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* after all tables are created, create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);
}
metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION,
metadataSnapshotCommandList);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
ENABLE_DDL_PROPAGATION);
char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
metadataSnapshotCommandList);
}
/*
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
* pg_dist_object entries on the new node.
@ -866,6 +817,24 @@ DistributedObjectMetadataSyncCommandList(void)
}
/*
* UpdatePgDistLocalGroupOnNode updates the pg_dist_local_group on the given node
*/
static void
UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode)
{
/* generate and add the local group id's update query */
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
List *localGroupIdUpdateCommandList = list_make1(localGroupIdUpdateCommand);
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
CitusExtensionOwnerName(),
localGroupIdUpdateCommandList);
}
/*
* ClearDistributedTablesFromNode clear (shell) distributed tables from the given node.
*/
@ -951,6 +920,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
{
ClearDistributedTablesFromNode(newWorkerNode);
PropagateNodeWideObjects(newWorkerNode);
UpdatePgDistLocalGroupOnNode(newWorkerNode);
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
SetUpMultipleDistributedTableIntegrations(newWorkerNode);
@ -1035,12 +1005,6 @@ citus_activate_node(PG_FUNCTION_ARGS)
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
if (!EnableDependencyCreation)
{
ereport(ERROR, (errmsg("citus.enable_object_propagation must be on to "
"activate node")));
}
WorkerNode *workerNode = ModifiableWorkerNode(text_to_cstring(nodeNameText),
nodePort);
ActivateNode(workerNode->workerName, workerNode->workerPort);
@ -1222,6 +1186,12 @@ ActivateNode(char *nodeName, int nodePort)
*/
EnsureSuperUser();
if (!EnableDependencyCreation)
{
ereport(ERROR, (errmsg("citus.enable_object_propagation must be on to "
"add an active node")));
}
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -1255,6 +1225,7 @@ ActivateNode(char *nodeName, int nodePort)
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(isActive));
// TODO: Once all tests will be enabled for MX, we can remove sync by default check
bool syncMetadata = EnableMetadataSyncByDefault && NodeIsPrimary(workerNode);
if (syncMetadata)
@ -1273,11 +1244,10 @@ ActivateNode(char *nodeName, int nodePort)
{
StartMetadataSyncToNode(nodeName, nodePort);
if (!NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
if (!NodeIsCoordinator(workerNode))
{
ClearDistributedObjectsFromNode(workerNode);
SetUpObjectMetadata(workerNode);
AdjustSequenceLimits(workerNode);
}
}

View File

@ -35,6 +35,7 @@
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
@ -45,6 +46,7 @@
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "storage/lmgr.h"
@ -461,18 +463,38 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
" SEQUENCE command string")));
}
/*
* If sequence with the same name exist for different type, it must have been
* stayed on that node after a rollbacked create_distributed_table operation.
* We must drop it first to create the sequence with the correct type.
*/
CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode;
char *sequenceName = createSequenceStatement->sequence->relname;
char *sequenceSchema = createSequenceStatement->sequence->schemaname;
RangeVar *sequenceRange = makeRangeVar(sequenceSchema, sequenceName, -1);
Oid sequenceRelationId = RangeVarGetRelid(sequenceRange, AccessShareLock, true);
if (sequenceRelationId != InvalidOid)
{
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceRelationId);
if (pgSequenceForm->seqtypid != sequenceTypeId)
{
StringInfo dropSequenceString = makeStringInfo();
char *qualifiedSequenceName = quote_qualified_identifier(sequenceSchema, sequenceName);
appendStringInfoString(dropSequenceString, "DROP SEQUENCE ");
appendStringInfoString(dropSequenceString, qualifiedSequenceName);
appendStringInfoString(dropSequenceString, ";");
ExecuteQueryViaSPI(dropSequenceString->data, SPI_OK_UTILITY);
}
}
/* run the CREATE SEQUENCE command */
ProcessUtilityParseTree(commandNode, commandString, PROCESS_UTILITY_QUERY, NULL,
None_Receiver, NULL);
CommandCounterIncrement();
CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode;
char *sequenceName = createSequenceStatement->sequence->relname;
char *sequenceSchema = createSequenceStatement->sequence->schemaname;
createSequenceStatement = (CreateSeqStmt *) commandNode;
Oid sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence,
sequenceRelationId = RangeVarGetRelid(createSequenceStatement->sequence,
AccessShareLock, false);
Assert(sequenceRelationId != InvalidOid);

View File

@ -31,6 +31,7 @@ typedef enum
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void);
extern char * LocalGroupIdUpdateCommand(int32 groupId);
extern bool ShouldSyncTableMetadata(Oid relationId);
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
extern List * MetadataCreateCommands(void);

View File

@ -256,7 +256,6 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_
ERROR: permission denied for function master_update_node
-- try to manipulate node metadata via privileged user
SET ROLE node_metadata_user;
SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
@ -474,8 +473,7 @@ SELECT run_command_on_workers('DELETE FROM pg_dist_node WHERE nodeport=' || :'wo
(1 row)
SELECT * FROM cluster_management_test;
NOTICE: there is a shard placement in node group 6 but there are no nodes in that group
ERROR: no active placements were found for shard 1220001
ERROR: there is a shard placement in node group 6 but there are no nodes in that group
-- clean-up
SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group;
DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group;

View File

@ -669,15 +669,15 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing ALTER TABLE fix_idx_names.p2 OWNER TO postgres
+NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
+DETAIL: on server postgres@localhost:57638 connectionId: 1
+NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
+DETAIL: on server postgres@localhost:57637 connectionId: 2
+NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
+DETAIL: on server postgres@localhost:57638 connectionId: 1
+NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
+DETAIL: on server postgres@localhost:57637 connectionId: 2
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
DETAIL: on server postgres@localhost:57638 connectionId: xxxxxxx
NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('table', ARRAY['fix_idx_names', 'p2']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
DETAIL: on server postgres@localhost:57637 connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:57638 connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:57637 connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_add_partition_metadata ('fix_idx_names.p2'::regclass, 'h', 'dist_col', 1370000, 's')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT citus_internal_add_partition_metadata ('fix_idx_names.p2'::regclass, 'h', 'dist_col', 1370000, 's')

View File

@ -130,72 +130,6 @@ ALTER SEQUENCE seq_0 AS bigint;
ERROR: Altering a distributed sequence is currently not supported.
ALTER SEQUENCE seq_0_local_table AS bigint;
ERROR: Altering a distributed sequence is currently not supported.
-- we can change other things like increment
-- if metadata is not synced to workers
BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
CREATE SEQUENCE seq_13;
CREATE SEQUENCE seq_13_local_table;
CREATE TABLE seq_test_13 (x int, y int);
CREATE TABLE seq_test_13_local_table (x int, y int);
SELECT create_distributed_table('seq_test_13','x');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('seq_test_13_local_table');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
ALTER TABLE seq_test_13 ADD COLUMN z int DEFAULT nextval('seq_13');
ALTER TABLE seq_test_13_local_table ADD COLUMN z int DEFAULT nextval('seq_13_local_table');
ALTER SEQUENCE seq_13 INCREMENT BY 2;
ALTER SEQUENCE seq_13_local_table INCREMENT BY 2;
\d seq_13
Sequence "sequence_default.seq_13"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 2 | no | 1
\d seq_13_local_table
Sequence "sequence_default.seq_13_local_table"
Type | Start | Minimum | Maximum | Increment | Cycles? | Cache
---------------------------------------------------------------------
integer | 1 | 1 | 2147483647 | 2 | no | 1
-- check that we can add serial pseudo-type columns
-- when metadata is not synced to workers
TRUNCATE seq_test_0;
ALTER TABLE seq_test_0 ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0 ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0 ADD COLUMN w10 serial;
ALTER TABLE seq_test_0 ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0 ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0 ADD COLUMN w21 serial8;
TRUNCATE seq_test_0_local_table;
ALTER TABLE seq_test_0_local_table ADD COLUMN w00 smallserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w01 serial2;
ALTER TABLE seq_test_0_local_table ADD COLUMN w10 serial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w11 serial4;
ALTER TABLE seq_test_0_local_table ADD COLUMN w20 bigserial;
ALTER TABLE seq_test_0_local_table ADD COLUMN w21 serial8;
ROLLBACK;
-- check alter column type precaution
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ERROR: cannot execute ALTER COLUMN TYPE .. command because the column involves a default coming from a sequence
@ -216,12 +150,6 @@ SELECT create_distributed_table('seq_test_4','x');
CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4');
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SEQUENCE seq_4 CASCADE;
NOTICE: drop cascades to default value for column a of table seq_test_4
TRUNCATE seq_test_4;
@ -770,10 +698,10 @@ SELECT create_reference_table('seq_test_10');
INSERT INTO seq_test_10 VALUES (0);
CREATE TABLE seq_test_11 (col0 int, col1 bigint DEFAULT nextval('seq_11'::text));
-- works but doesn't create seq_11 in the workers
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
SELECT citus_activate_node('localhost', :worker_1_port);
citus_activate_node
---------------------------------------------------------------------
1
(1 row)
-- works because there is no dependency created between seq_11 and seq_test_10
@ -812,10 +740,10 @@ SELECT create_distributed_table('seq_test_12', 'col0');
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
SELECT citus_activate_node('localhost', :worker_1_port);
citus_activate_node
---------------------------------------------------------------------
1
(1 row)
INSERT INTO seq_test_12 VALUES ('hello0') RETURNING *;
@ -918,10 +846,10 @@ ERROR: nextval: reached maximum value of sequence "seq_14" (32767)
\c - - - :master_port
SET citus.shard_replication_factor TO 1;
SET search_path = sequence_default, public;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
SELECT citus_activate_node('localhost', :worker_1_port);
citus_activate_node
---------------------------------------------------------------------
1
(1 row)
SELECT undistribute_table('seq_test_12');

View File

@ -164,6 +164,15 @@ HINT: Use SELECT rebalance_table_shards(); to balance shards data between worke
(1 row)
COMMIT;
-- After adding and removing non-metadata synced worker node, shell table
-- can stay on the remote node. So we are deleting it manually.
-- TODO: Update the test once sync by default guc will be removed
SELECT run_command_on_workers($$DROP TABLE single_node.test$$);
run_command_on_workers
----------------------------------
(localhost,57637,t,"DROP TABLE")
(1 row)
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);
?column?

View File

@ -467,7 +467,6 @@ SELECT shardid, nodename, nodeport
-- disable the first node
SET client_min_messages TO ERROR;
DROP FOREIGN TABLE foreign_table_to_distribute;
\set VERBOSITY terse
SELECT master_disable_node('localhost', :worker_1_port);
SELECT public.wait_until_metadata_sync(30000);

View File

@ -625,7 +625,6 @@ SELECT shardid, nodename, nodeport
(6 rows)
-- add the node back
SET client_min_messages TO ERROR;
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------

View File

@ -120,9 +120,8 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port + 1);
SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port);
SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- show that non-admin role can not activate a node
-- try to manipulate node metadata via privileged user
SET ROLE node_metadata_user;
SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
BEGIN;
SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port);

View File

@ -55,9 +55,6 @@ ALTER SEQUENCE seq_0_local_table AS bigint;
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE bigint;
ALTER TABLE seq_test_0 ALTER COLUMN z TYPE smallint;
-- TODO: Sequences stay there after rollback!
-- TODO: Talk with Onder about adjusting sequence limit
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE bigint;
ALTER TABLE seq_test_0_local_table ALTER COLUMN z TYPE smallint;

View File

@ -87,6 +87,11 @@ BEGIN;
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
COMMIT;
-- After adding and removing non-metadata synced worker node, shell table
-- can stay on the remote node. So we are deleting it manually.
-- TODO: Update the test once sync by default guc will be removed
SELECT run_command_on_workers($$DROP TABLE single_node.test$$);
-- we don't need this node anymore
SELECT 1 FROM master_remove_node('localhost', :worker_1_port);