From 0f72d7f706cff4c13fd284cdbf5047bffb9df5ee Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Sun, 24 Jul 2022 16:34:16 +0530 Subject: [PATCH] Rename connection params --- .../connection/connection_configuration.c | 2 +- .../connection/connection_management.c | 18 +++++++----------- .../distributed/operations/shard_split.c | 5 ++--- .../replication/multi_logical_replication.c | 2 +- .../shardsplit_logical_replication.c | 2 ++ .../distributed/connection_management.h | 14 ++++++++++---- .../shardsplit_logical_replication.h | 2 ++ 7 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index ff4e7d04e..63dfebf8b 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -377,7 +377,7 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, authParamsIdx++; } - if (key->replication) + if (key->replicationConnParam) { connKeywords[authParamsIdx] = MemoryContextStrdup(context, "replication"); connValues[authParamsIdx] = MemoryContextStrdup(context, "database"); diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index ce177e978..56e84d063 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -290,13 +290,13 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); } - if (flags & EXCLUSIVE_AND_REPLICATION) + if (flags & REQUIRE_REPLICATION_CONNECTION_PARAM) { - key.replication = true; + key.replicationConnParam = true; } else { - key.replication = false; + key.replicationConnParam = false; } if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) @@ -356,10 +356,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); connection->initilizationState = POOL_STATE_NOT_INITIALIZED; - if (flags & EXCLUSIVE_AND_REPLICATION) - { - connection->claimedExclusively = true; - } dlist_push_tail(entry->connections, &connection->connectionNode); /* these two flags are by nature cannot happen at the same time */ @@ -679,7 +675,7 @@ CloseConnection(MultiConnection *connection) strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; - key.replication = connection->replication; + key.replicationConnParam = connection->requiresReplicationOption; strlcpy(key.user, connection->user, NAMEDATALEN); strlcpy(key.database, connection->database, NAMEDATALEN); @@ -1224,7 +1220,7 @@ ConnectionHashHash(const void *key, Size keysize) hash = hash_combine(hash, hash_uint32(entry->port)); hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); - hash = hash_combine(hash, hash_uint32(entry->replication)); + hash = hash_combine(hash, hash_uint32(entry->replicationConnParam)); return hash; } @@ -1238,7 +1234,7 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || ca->port != cb->port || - ca->replication != cb->replication || + ca->replicationConnParam != cb->replicationConnParam || strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || strncmp(ca->database, cb->database, NAMEDATALEN) != 0) { @@ -1266,7 +1262,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN); - connection->replication = key->replication; + connection->requiresReplicationOption = key->replicationConnParam; connection->pgConn = PQconnectStartParams((const char **) entry->keywords, (const char **) entry->values, diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index dab28ac29..153f29729 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -771,7 +771,7 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); ddlCommandList = lappend(ddlCommandList, beginTransaction->data); - /* Set snapshot */ + /* Set snapshot for non-blocking shard split. */ if (snapShotName != NULL) { StringInfo snapShotString = makeStringInfo(); @@ -779,7 +779,6 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, quote_literal_cstr( snapShotName)); ddlCommandList = lappend(ddlCommandList, snapShotString->data); - printf("Sameer final string snapshotted:%s\n", snapShotString->data); } ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); @@ -1584,7 +1583,7 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, { /*Create Template replication slot */ int connectionFlags = FORCE_NEW_CONNECTION; - connectionFlags |= EXCLUSIVE_AND_REPLICATION; + connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode-> diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index c631f234d..8f150c9ac 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -2068,7 +2068,7 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, } -/*Refactor this for ShardMove too.*/ +/*TODO(saawasek):Refactor this for ShardMove too.*/ void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, char *databaseName, diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 11b4c8808..6813fe824 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -2,6 +2,8 @@ * * shardsplit_logical_replication.c * + * Function definitions for logically replicating split children. + * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 2e6b62e71..627a59865 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -100,9 +100,15 @@ enum MultiConnectionMode WAIT_FOR_CONNECTION = 1 << 7, /* - * Force Replication flags + * Use the flag to start a connection for streaming replication. + * This flag constructs additional libpq connection parameters needed for streaming + * replication protocol. It adds 'replication=database' param which instructs + * the backend to go into logical replication walsender mode. + * https://www.postgresql.org/docs/current/protocol-replication.html + * + * This is need to run 'CREATE_REPLICATION_SLOT' command. */ - EXCLUSIVE_AND_REPLICATION = 1 << 8 + REQUIRE_REPLICATION_CONNECTION_PARAM = 1 << 8 }; @@ -193,7 +199,7 @@ typedef struct MultiConnection uint64 copyBytesWrittenSinceLastFlush; /* replication option */ - bool replication; + bool requiresReplicationOption; MultiConnectionStructInitializationState initilizationState; } MultiConnection; @@ -215,7 +221,7 @@ typedef struct ConnectionHashKey int32 port; char user[NAMEDATALEN]; char database[NAMEDATALEN]; - bool replication; + bool replicationConnParam; } ConnectionHashKey; /* hash entry */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 3c2a2b5e6..58801401f 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -2,6 +2,8 @@ * * shardsplit_logical_replication.h * + * Function declarations for logically replicating split children. + * * Copyright (c) Citus Data, Inc. * *-------------------------------------------------------------------------