mirror of https://github.com/citusdata/citus.git
Merge pull request #4136 from citusdata/fix/ensure-reference-transfer-mode
expose transfer mode for ensure reference table existencepull/4141/head
commit
959629d3f3
|
@ -44,7 +44,6 @@
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static char LookupShardTransferMode(Oid shardReplicationModeOid);
|
|
||||||
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
static void ErrorIfTableCannotBeReplicated(Oid relationId);
|
||||||
static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
|
static void RepairShardPlacement(int64 shardId, const char *sourceNodeName,
|
||||||
int32 sourceNodePort, const char *targetNodeName,
|
int32 sourceNodePort, const char *targetNodeName,
|
||||||
|
@ -226,7 +225,7 @@ ErrorIfTableCannotBeReplicated(Oid relationId)
|
||||||
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
||||||
* values to a char.
|
* values to a char.
|
||||||
*/
|
*/
|
||||||
static char
|
char
|
||||||
LookupShardTransferMode(Oid shardReplicationModeOid)
|
LookupShardTransferMode(Oid shardReplicationModeOid)
|
||||||
{
|
{
|
||||||
char shardReplicationMode = 0;
|
char shardReplicationMode = 0;
|
||||||
|
@ -420,7 +419,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
* Since this a long-running operation we do this after the error checks, but
|
* Since this a long-running operation we do this after the error checks, but
|
||||||
* before taking metadata locks.
|
* before taking metadata locks.
|
||||||
*/
|
*/
|
||||||
EnsureReferenceTablesExistOnAllNodes();
|
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
|
CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort,
|
||||||
|
|
|
@ -77,6 +77,22 @@ replicate_reference_tables(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
EnsureReferenceTablesExistOnAllNodes(void)
|
EnsureReferenceTablesExistOnAllNodes(void)
|
||||||
|
{
|
||||||
|
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EnsureReferenceTablesExistOnAllNodesExtended ensures that a shard placement for every
|
||||||
|
* reference table exists on all nodes. If a node does not have a set of shard placements,
|
||||||
|
* then master_copy_shard_placement is called in a subtransaction to pull the data to the
|
||||||
|
* new node.
|
||||||
|
*
|
||||||
|
* The transferMode is passed on to the implementation of the copy to control the locks
|
||||||
|
* and transferMode.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Prevent this function from running concurrently with itself.
|
* Prevent this function from running concurrently with itself.
|
||||||
|
@ -192,7 +208,7 @@ EnsureReferenceTablesExistOnAllNodes(void)
|
||||||
StringInfo placementCopyCommand =
|
StringInfo placementCopyCommand =
|
||||||
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
|
CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement,
|
||||||
newWorkerNode,
|
newWorkerNode,
|
||||||
TRANSFER_MODE_BLOCK_WRITES);
|
transferMode);
|
||||||
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
|
ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data);
|
||||||
RemoteTransactionCommit(connection);
|
RemoteTransactionCommit(connection);
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,5 +176,6 @@ extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
||||||
extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList,
|
extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList,
|
||||||
const char *nodeName,
|
const char *nodeName,
|
||||||
uint32 nodePort);
|
uint32 nodePort);
|
||||||
|
extern char LookupShardTransferMode(Oid shardReplicationModeOid);
|
||||||
|
|
||||||
#endif /* COORDINATOR_PROTOCOL_H */
|
#endif /* COORDINATOR_PROTOCOL_H */
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
|
||||||
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
||||||
|
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
|
||||||
extern uint32 CreateReferenceTableColocationId(void);
|
extern uint32 CreateReferenceTableColocationId(void);
|
||||||
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
||||||
extern int CompareOids(const void *leftElement, const void *rightElement);
|
extern int CompareOids(const void *leftElement, const void *rightElement);
|
||||||
|
|
Loading…
Reference in New Issue