mirror of https://github.com/citusdata/citus.git
Merge pull request #4604 from citusdata/copy_single_node
Adaptive connection management for COPY on local nodespull/4631/head^2
commit
fcb1b7f7d5
|
@ -79,6 +79,7 @@
|
|||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/locally_reserved_shared_connections.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
|
@ -214,6 +215,18 @@ typedef struct ShardConnections
|
|||
} ShardConnections;
|
||||
|
||||
|
||||
/*
|
||||
* Represents the state for allowing copy via local
|
||||
* execution.
|
||||
*/
|
||||
typedef enum LocalCopyStatus
|
||||
{
|
||||
LOCAL_COPY_REQUIRED,
|
||||
LOCAL_COPY_OPTIONAL,
|
||||
LOCAL_COPY_DISABLED
|
||||
} LocalCopyStatus;
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void CopyToExistingShards(CopyStmt *copyStatement,
|
||||
QueryCompletionCompat *completionTag);
|
||||
|
@ -323,7 +336,9 @@ static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uin
|
|||
processedRowCount);
|
||||
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
|
||||
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
|
||||
static bool ShouldExecuteCopyLocally(bool isIntermediateResult);
|
||||
static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool
|
||||
isIntermediateResult);
|
||||
static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList);
|
||||
static void LogLocalCopyExecution(uint64 shardId);
|
||||
|
||||
|
||||
|
@ -2076,28 +2091,29 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
|||
|
||||
|
||||
/*
|
||||
* ShouldExecuteCopyLocally returns true if the current copy
|
||||
* operation should be done locally for local placements.
|
||||
* GetLocalCopyStatus returns the status for executing copy locally.
|
||||
* If LOCAL_COPY_DISABLED or LOCAL_COPY_REQUIRED, the caller has to
|
||||
* follow that. Else, the caller may decide to use local or remote
|
||||
* execution depending on other information.
|
||||
*/
|
||||
static bool
|
||||
ShouldExecuteCopyLocally(bool isIntermediateResult)
|
||||
static LocalCopyStatus
|
||||
GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult)
|
||||
{
|
||||
if (!EnableLocalExecution)
|
||||
if (!EnableLocalExecution ||
|
||||
GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
|
||||
{
|
||||
return false;
|
||||
return LOCAL_COPY_DISABLED;
|
||||
}
|
||||
|
||||
/*
|
||||
* Intermediate files are written to a file, and files are visible to all
|
||||
* transactions, and we use a custom copy format for copy therefore we will
|
||||
* use the existing logic for that.
|
||||
*/
|
||||
if (isIntermediateResult)
|
||||
else if (isIntermediateResult)
|
||||
{
|
||||
return false;
|
||||
/*
|
||||
* Intermediate files are written to a file, and files are visible to all
|
||||
* transactions, and we use a custom copy format for copy therefore we will
|
||||
* use the existing logic for that.
|
||||
*/
|
||||
return LOCAL_COPY_DISABLED;
|
||||
}
|
||||
|
||||
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
/*
|
||||
* For various reasons, including the transaction visibility
|
||||
|
@ -2116,12 +2132,35 @@ ShouldExecuteCopyLocally(bool isIntermediateResult)
|
|||
* those placements. That'd help to benefit more from parallelism.
|
||||
*/
|
||||
|
||||
return true;
|
||||
return LOCAL_COPY_REQUIRED;
|
||||
}
|
||||
else if (IsMultiStatementTransaction())
|
||||
{
|
||||
return LOCAL_COPY_REQUIRED;
|
||||
}
|
||||
|
||||
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
|
||||
return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED &&
|
||||
IsMultiStatementTransaction();
|
||||
return LOCAL_COPY_OPTIONAL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIntervalListHasLocalPlacements returns true if any of the input
|
||||
* shard placement has a local placement;
|
||||
*/
|
||||
static bool
|
||||
ShardIntervalListHasLocalPlacements(List *shardIntervalList)
|
||||
{
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
ShardInterval *shardInterval = NULL;
|
||||
foreach_ptr(shardInterval, shardIntervalList)
|
||||
{
|
||||
if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
|
@ -2136,8 +2175,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
{
|
||||
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
|
||||
|
||||
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||
copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult);
|
||||
Oid tableId = copyDest->distributedRelationId;
|
||||
|
||||
char *relationName = get_rel_name(tableId);
|
||||
|
@ -2291,13 +2328,53 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
|
||||
|
||||
/*
|
||||
* For all the primary (e.g., writable) nodes, reserve a shared connection.
|
||||
* We do this upfront because we cannot know which nodes are going to be
|
||||
* accessed. Since the order of the reservation is important, we need to
|
||||
* do it right here. For the details on why the order important, see
|
||||
* the function.
|
||||
* For all the primary (e.g., writable) remote nodes, reserve a shared
|
||||
* connection. We do this upfront because we cannot know which nodes
|
||||
* are going to be accessed. Since the order of the reservation is
|
||||
* important, we need to do it right here. For the details on why the
|
||||
* order important, see EnsureConnectionPossibilityForNodeList().
|
||||
*
|
||||
* We don't need to care about local node because we either get a
|
||||
* connection or use local connection, so it cannot be part of
|
||||
* the starvation. As an edge case, if it cannot get a connection
|
||||
* and cannot switch to local execution (e.g., disabled by user),
|
||||
* COPY would fail hinting the user to change the relevant settiing.
|
||||
*/
|
||||
EnsureConnectionPossibilityForPrimaryNodes();
|
||||
EnsureConnectionPossibilityForRemotePrimaryNodes();
|
||||
|
||||
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||
LocalCopyStatus localCopyStatus =
|
||||
GetLocalCopyStatus(shardIntervalList, isIntermediateResult);
|
||||
if (localCopyStatus == LOCAL_COPY_DISABLED)
|
||||
{
|
||||
copyDest->shouldUseLocalCopy = false;
|
||||
}
|
||||
else if (localCopyStatus == LOCAL_COPY_REQUIRED)
|
||||
{
|
||||
copyDest->shouldUseLocalCopy = true;
|
||||
}
|
||||
else if (localCopyStatus == LOCAL_COPY_OPTIONAL)
|
||||
{
|
||||
/*
|
||||
* At this point, there is no requirements for doing the copy locally.
|
||||
* However, if there are local placements, we can try to reserve
|
||||
* a connection to local node. If we cannot reserve, we can still use
|
||||
* local execution.
|
||||
*
|
||||
* NB: It is not advantageous to use remote execution just with a
|
||||
* single remote connection. In other words, a single remote connection
|
||||
* would not perform better than local execution. However, we prefer to
|
||||
* do this because it is likely that the COPY would get more connections
|
||||
* to parallelize the operation. In the future, we might relax this
|
||||
* requirement and failover to local execution as on connection attempt
|
||||
* failures as the executor does.
|
||||
*/
|
||||
if (ShardIntervalListHasLocalPlacements(shardIntervalList))
|
||||
{
|
||||
bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode();
|
||||
copyDest->shouldUseLocalCopy = !reservedConnection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -3424,6 +3501,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (placement->groupId == GetLocalGroupId())
|
||||
{
|
||||
/*
|
||||
|
@ -3445,7 +3523,6 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
continue;
|
||||
}
|
||||
|
||||
|
||||
CopyConnectionState *connectionState = GetConnectionState(connectionStateHash,
|
||||
connection);
|
||||
|
||||
|
|
|
@ -89,12 +89,15 @@ typedef struct ReservedConnectionHashEntry
|
|||
|
||||
static void StoreAllReservedConnections(Tuplestorestate *tupleStore,
|
||||
TupleDesc tupleDescriptor);
|
||||
static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName,
|
||||
int nodePort, Oid
|
||||
userId, Oid
|
||||
databaseOid,
|
||||
bool *found);
|
||||
static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *hostName,
|
||||
int nodePort,
|
||||
Oid
|
||||
userId, Oid
|
||||
databaseOid,
|
||||
bool *found);
|
||||
static void EnsureConnectionPossibilityForNodeList(List *nodeList);
|
||||
static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode,
|
||||
bool waitForConnection);
|
||||
static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize);
|
||||
static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize);
|
||||
|
||||
|
@ -294,11 +297,11 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId,
|
|||
|
||||
|
||||
/*
|
||||
* EnsureConnectionPossibilityForPrimaryNodes is a wrapper around
|
||||
* EnsureConnectionPossibilityForRemotePrimaryNodes is a wrapper around
|
||||
* EnsureConnectionPossibilityForNodeList.
|
||||
*/
|
||||
void
|
||||
EnsureConnectionPossibilityForPrimaryNodes(void)
|
||||
EnsureConnectionPossibilityForRemotePrimaryNodes(void)
|
||||
{
|
||||
/*
|
||||
* By using NoLock there is a tiny risk of that we miss to reserve a
|
||||
|
@ -306,17 +309,42 @@ EnsureConnectionPossibilityForPrimaryNodes(void)
|
|||
* seem to cause any problems as none of the placements that we are
|
||||
* going to access would be on the new node.
|
||||
*/
|
||||
List *primaryNodeList = ActivePrimaryNodeList(NoLock);
|
||||
|
||||
List *primaryNodeList = ActivePrimaryRemoteNodeList(NoLock);
|
||||
EnsureConnectionPossibilityForNodeList(primaryNodeList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TryConnectionPossibilityForLocalPrimaryNode returns true if the primary
|
||||
* local node is in the metadata an we can reserve a connection for the node.
|
||||
* If not, the function returns false.
|
||||
*/
|
||||
bool
|
||||
TryConnectionPossibilityForLocalPrimaryNode(void)
|
||||
{
|
||||
bool nodeIsInMetadata = false;
|
||||
WorkerNode *localNode =
|
||||
PrimaryNodeForGroup(GetLocalGroupId(), &nodeIsInMetadata);
|
||||
|
||||
if (localNode == NULL)
|
||||
{
|
||||
/*
|
||||
* If the local node is not a primary node, we should not try to
|
||||
* reserve a connection as there cannot be any shards.
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
bool waitForConnection = false;
|
||||
return EnsureConnectionPossibilityForNode(localNode, waitForConnection);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureConnectionPossibilityForNodeList reserves a shared connection
|
||||
* counter per node in the nodeList unless:
|
||||
* - Reservation is needed (see IsReservationPossible())
|
||||
* - there is at least one connection to the node so that we are guranteed
|
||||
* - Reservation is possible/allowed (see IsReservationPossible())
|
||||
* - there is at least one connection to the node so that we are guaranteed
|
||||
* to get a connection
|
||||
* - An earlier call already reserved a connection (e.g., we allow only a
|
||||
* single reservation per backend)
|
||||
|
@ -324,11 +352,6 @@ EnsureConnectionPossibilityForPrimaryNodes(void)
|
|||
static void
|
||||
EnsureConnectionPossibilityForNodeList(List *nodeList)
|
||||
{
|
||||
if (!IsReservationPossible())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* We sort the workerList because adaptive connection management
|
||||
* (e.g., OPTIONAL_CONNECTION) requires any concurrent executions
|
||||
|
@ -342,62 +365,114 @@ EnsureConnectionPossibilityForNodeList(List *nodeList)
|
|||
*/
|
||||
nodeList = SortList(nodeList, CompareWorkerNodes);
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, nodeList)
|
||||
{
|
||||
bool waitForConnection = true;
|
||||
EnsureConnectionPossibilityForNode(workerNode, waitForConnection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureConnectionPossibilityForNode reserves a shared connection
|
||||
* counter per node in the nodeList unless:
|
||||
* - Reservation is possible/allowed (see IsReservationPossible())
|
||||
* - there is at least one connection to the node so that we are guranteed
|
||||
* to get a connection
|
||||
* - An earlier call already reserved a connection (e.g., we allow only a
|
||||
* single reservation per backend)
|
||||
* - waitForConnection is false. When this is false, the function still tries
|
||||
* to ensure connection possibility. If it fails (e.g., we
|
||||
* reached max_shared_pool_size), it doesn't wait to get the connection. Instead,
|
||||
* return false.
|
||||
*/
|
||||
static bool
|
||||
EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection)
|
||||
{
|
||||
if (!IsReservationPossible())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
char *databaseName = get_database_name(MyDatabaseId);
|
||||
Oid userId = GetUserId();
|
||||
char *userName = GetUserNameFromId(userId, false);
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, nodeList)
|
||||
if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort,
|
||||
userName, databaseName) != NULL)
|
||||
{
|
||||
if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort,
|
||||
userName, databaseName) != NULL)
|
||||
{
|
||||
/*
|
||||
* The same user has already an active connection for the node. It
|
||||
* means that the execution can use the same connection, so reservation
|
||||
* is not necessary.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* We are trying to be defensive here by ensuring that the required hash
|
||||
* table entry can be allocated. The main goal is that we don't want to be
|
||||
* in a situation where shared connection counter is incremented but not
|
||||
* the local reserved counter due to out-of-memory.
|
||||
*
|
||||
* Note that shared connection stats operate on the shared memory, and we
|
||||
* pre-allocate all the necessary memory. In other words, it would never
|
||||
* throw out of memory error.
|
||||
* The same user has already an active connection for the node. It
|
||||
* means that the execution can use the same connection, so reservation
|
||||
* is not necessary.
|
||||
*/
|
||||
bool found = false;
|
||||
ReservedConnectionHashEntry *hashEntry =
|
||||
AllocateOrGetReservedConectionEntry(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
userId, MyDatabaseId, &found);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (found)
|
||||
{
|
||||
/*
|
||||
* We have already reserved a connection for this user and database
|
||||
* on the worker. We only allow a single reservation per
|
||||
* transaction block. The reason is that the earlier command (either in
|
||||
* a transaction block or a function call triggered by a single command)
|
||||
* was able to reserve or establish a connection. That connection is
|
||||
* guranteed to be avaliable for us.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
/*
|
||||
* We are trying to be defensive here by ensuring that the required hash
|
||||
* table entry can be allocated. The main goal is that we don't want to be
|
||||
* in a situation where shared connection counter is incremented but not
|
||||
* the local reserved counter due to out-of-memory.
|
||||
*
|
||||
* Note that shared connection stats operate on the shared memory, and we
|
||||
* pre-allocate all the necessary memory. In other words, it would never
|
||||
* throw out of memory error.
|
||||
*/
|
||||
bool found = false;
|
||||
ReservedConnectionHashEntry *hashEntry =
|
||||
AllocateOrGetReservedConnectionEntry(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
userId, MyDatabaseId, &found);
|
||||
|
||||
if (found)
|
||||
{
|
||||
/*
|
||||
* We have already reserved a connection for this user and database
|
||||
* on the worker. We only allow a single reservation per
|
||||
* transaction block. The reason is that the earlier command (either in
|
||||
* a transaction block or a function call triggered by a single command)
|
||||
* was able to reserve or establish a connection. That connection is
|
||||
* guranteed to be available for us.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
|
||||
if (waitForConnection)
|
||||
{
|
||||
/*
|
||||
* Increment the shared counter, we may need to wait if there are
|
||||
* no space left.
|
||||
*/
|
||||
WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort);
|
||||
|
||||
/* locally mark that we have one connection reserved */
|
||||
hashEntry->usedReservation = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool incremented =
|
||||
TryToIncrementSharedConnectionCounter(workerNode->workerName,
|
||||
workerNode->workerPort);
|
||||
if (!incremented)
|
||||
{
|
||||
/*
|
||||
* We could not reserve a connection. First, remove the entry from the
|
||||
* hash. The reason is that we allow single reservation per transaction
|
||||
* block and leaving the entry in the hash would be qualified as there is a
|
||||
* reserved connection to the node.
|
||||
*/
|
||||
bool foundForRemove = false;
|
||||
hash_search(SessionLocalReservedConnections, hashEntry, HASH_REMOVE,
|
||||
&foundForRemove);
|
||||
Assert(foundForRemove);
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* locally mark that we have one connection reserved */
|
||||
hashEntry->usedReservation = false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -442,8 +517,8 @@ IsReservationPossible(void)
|
|||
* the entry.
|
||||
*/
|
||||
static ReservedConnectionHashEntry *
|
||||
AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId,
|
||||
Oid databaseOid, bool *found)
|
||||
AllocateOrGetReservedConnectionEntry(char *hostName, int nodePort, Oid userId,
|
||||
Oid databaseOid, bool *found)
|
||||
{
|
||||
ReservedConnectionHashKey key;
|
||||
|
||||
|
|
|
@ -105,6 +105,7 @@ static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
|
|||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||
static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
|
||||
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
||||
static bool NodeIsLocal(WorkerNode *worker);
|
||||
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||
static bool UnsetMetadataSyncedForAll(void);
|
||||
|
@ -695,6 +696,17 @@ GroupForNode(char *nodeName, int nodePort)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsPrimaryAndLocal returns whether the argument represents the local
|
||||
* primary node.
|
||||
*/
|
||||
bool
|
||||
NodeIsPrimaryAndRemote(WorkerNode *worker)
|
||||
{
|
||||
return NodeIsPrimary(worker) && !NodeIsLocal(worker);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsPrimary returns whether the argument represents a primary node.
|
||||
*/
|
||||
|
@ -713,6 +725,16 @@ NodeIsPrimary(WorkerNode *worker)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsLocal returns whether the argument represents the local node.
|
||||
*/
|
||||
static bool
|
||||
NodeIsLocal(WorkerNode *worker)
|
||||
{
|
||||
return worker->groupId == GetLocalGroupId();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsSecondary returns whether the argument represents a secondary node.
|
||||
*/
|
||||
|
|
|
@ -392,6 +392,18 @@ ActivePrimaryNodeList(LOCKMODE lockMode)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ActivePrimaryRemoteNodeList returns a list of all active primary nodes in
|
||||
* workerNodeHash.
|
||||
*/
|
||||
List *
|
||||
ActivePrimaryRemoteNodeList(LOCKMODE lockMode)
|
||||
{
|
||||
EnsureModificationsCanRun();
|
||||
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeIsPrimaryWorker returns true if the node is a primary worker node.
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort,
|
|||
extern void MarkReservedConnectionUsed(const char *hostName, int nodePort,
|
||||
Oid userId, Oid databaseOid);
|
||||
extern void DeallocateReservedConnections(void);
|
||||
extern void EnsureConnectionPossibilityForPrimaryNodes(void);
|
||||
extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void);
|
||||
extern bool TryConnectionPossibilityForLocalPrimaryNode(void);
|
||||
extern bool IsReservationPossible(void);
|
||||
|
||||
#endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */
|
||||
|
|
|
@ -74,6 +74,7 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
|
|||
extern uint32 ActivePrimaryNodeCount(void);
|
||||
extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
|
||||
extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
|
||||
extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode);
|
||||
extern bool CoordinatorAddedAsWorkerNode(void);
|
||||
extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
|
||||
extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void);
|
||||
|
@ -90,6 +91,7 @@ extern void EnsureCoordinator(void);
|
|||
extern void InsertCoordinatorIfClusterEmpty(void);
|
||||
extern uint32 GroupForNode(char *nodeName, int32 nodePort);
|
||||
extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes);
|
||||
extern bool NodeIsPrimaryAndRemote(WorkerNode *worker);
|
||||
extern bool NodeIsPrimary(WorkerNode *worker);
|
||||
extern bool NodeIsSecondary(WorkerNode *worker);
|
||||
extern bool NodeIsReadable(WorkerNode *worker);
|
||||
|
|
|
@ -1591,6 +1591,16 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
|
|||
7
|
||||
(1 row)
|
||||
|
||||
-- copy can use local execution even if there is no connection available
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY another_schema_table, line 1: "1"
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY another_schema_table, line 2: "2"
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY another_schema_table, line 3: "3"
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY another_schema_table, line 6: "6"
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
@ -1613,6 +1623,11 @@ HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
|||
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||
-- copy fails if local execution is disabled and there is no connection slot
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
ERROR: could not find an available connection
|
||||
HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish
|
||||
CONTEXT: COPY another_schema_table, line 1: "1"
|
||||
-- set the values to originals back
|
||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
|
|
|
@ -22,6 +22,25 @@ SET search_path TO public;
|
|||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||
|
||||
-- get ready for the next test
|
||||
TRUNCATE orders_mx;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- simulate the case where there is no connection slots available
|
||||
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.1);
|
||||
show citus.local_shared_pool_size;
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||
|
||||
-- set it back
|
||||
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||
SELECT pg_reload_conf();
|
||||
SELECT pg_sleep(0.1);
|
||||
show citus.local_shared_pool_size;
|
||||
|
||||
-- These copies were intended to test copying data to single sharded table from
|
||||
-- worker nodes, yet in order to remove broadcast logic related codes we change
|
||||
-- the table to reference table and copy data from master. Should be updated
|
||||
|
|
|
@ -16,6 +16,84 @@ SET search_path TO public;
|
|||
-- and use second worker as well
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||
-- get ready for the next test
|
||||
TRUNCATE orders_mx;
|
||||
\c - - - :worker_2_port
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- simulate the case where there is no connection slots available
|
||||
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
----------
|
||||
|
||||
(1 row)
|
||||
|
||||
show citus.local_shared_pool_size;
|
||||
citus.local_shared_pool_size
|
||||
------------------------------
|
||||
-1
|
||||
(1 row)
|
||||
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
|
||||
NOTICE: executing the copy locally for shard 1220075
|
||||
CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..."
|
||||
NOTICE: executing the copy locally for shard 1220071
|
||||
CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..."
|
||||
NOTICE: executing the copy locally for shard 1220069
|
||||
CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request"
|
||||
NOTICE: executing the copy locally for shard 1220079
|
||||
CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir"
|
||||
NOTICE: executing the copy locally for shard 1220083
|
||||
CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..."
|
||||
NOTICE: executing the copy locally for shard 1220073
|
||||
CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto"
|
||||
NOTICE: executing the copy locally for shard 1220077
|
||||
CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..."
|
||||
NOTICE: executing the copy locally for shard 1220081
|
||||
CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos"
|
||||
\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
|
||||
NOTICE: executing the copy locally for shard 1220079
|
||||
CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo"
|
||||
NOTICE: executing the copy locally for shard 1220077
|
||||
CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..."
|
||||
NOTICE: executing the copy locally for shard 1220073
|
||||
CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully "
|
||||
NOTICE: executing the copy locally for shard 1220071
|
||||
CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..."
|
||||
NOTICE: executing the copy locally for shard 1220083
|
||||
CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..."
|
||||
NOTICE: executing the copy locally for shard 1220081
|
||||
CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol"
|
||||
NOTICE: executing the copy locally for shard 1220075
|
||||
CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..."
|
||||
NOTICE: executing the copy locally for shard 1220069
|
||||
CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..."
|
||||
-- set it back
|
||||
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_sleep(0.1);
|
||||
pg_sleep
|
||||
----------
|
||||
|
||||
(1 row)
|
||||
|
||||
show citus.local_shared_pool_size;
|
||||
citus.local_shared_pool_size
|
||||
------------------------------
|
||||
50
|
||||
(1 row)
|
||||
|
||||
-- These copies were intended to test copying data to single sharded table from
|
||||
-- worker nodes, yet in order to remove broadcast logic related codes we change
|
||||
-- the table to reference table and copy data from master. Should be updated
|
||||
|
|
|
@ -817,6 +817,9 @@ ROLLBACK;
|
|||
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||
SELECT count(*) FROM cte_1;
|
||||
|
||||
-- copy can use local execution even if there is no connection available
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
@ -829,6 +832,9 @@ WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
|||
|
||||
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||
|
||||
-- copy fails if local execution is disabled and there is no connection slot
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
|
||||
-- set the values to originals back
|
||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||
|
|
Loading…
Reference in New Issue