mirror of https://github.com/citusdata/citus.git
wip
parent
bb381e7eee
commit
e07fe819c7
|
@ -349,7 +349,7 @@ ShouldSyncTableMetadata(Oid relationId)
|
|||
static bool
|
||||
SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
||||
{
|
||||
char *currentUser = CurrentUserName();
|
||||
bool failed = false;
|
||||
|
||||
/* generate and add the local group id's update query */
|
||||
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
|
||||
|
@ -375,9 +375,14 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
}
|
||||
else
|
||||
{
|
||||
SendOptionalCommandToWorkersWithMetadata(command);
|
||||
bool commandFailed =
|
||||
SendOptionalCommandToWorkersWithMetadata(command);
|
||||
|
||||
failed |= commandFailed;
|
||||
}
|
||||
}
|
||||
|
||||
return failed;
|
||||
}
|
||||
|
||||
|
||||
|
@ -388,8 +393,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
static void
|
||||
DropMetadataSnapshotOnNode(WorkerNode *workerNode)
|
||||
{
|
||||
char *userName = CurrentUserName();
|
||||
|
||||
/* generate the queries which drop the metadata */
|
||||
List *dropMetadataCommandList = MetadataDropCommands();
|
||||
|
||||
|
|
|
@ -33,12 +33,12 @@
|
|||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
static void SendCommandToMetadataWorkersParams(const char *command,
|
||||
static bool SendCommandToMetadataWorkersParams(const char *command,
|
||||
const char *user, int parameterCount,
|
||||
const Oid *parameterTypes,
|
||||
const char *const *parameterValues,
|
||||
bool raiseInterrupts);
|
||||
static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
|
||||
static bool SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
|
||||
const char *command, const char *user,
|
||||
int parameterCount,
|
||||
const Oid *parameterTypes,
|
||||
|
@ -129,13 +129,14 @@ SendCommandToWorkersWithMetadata(const char *command)
|
|||
* SendOptionalCommandToWorkersWithMetadata sends a command to
|
||||
* all workers in parallel. Commands are committed on the workers
|
||||
* when the local transaction commits. Failures do not abort
|
||||
* the current transaction.
|
||||
* the current transaction. Instead, the function returns false
|
||||
* if any command fails.
|
||||
*/
|
||||
void
|
||||
bool
|
||||
SendOptionalCommandToWorkersWithMetadata(const char *command)
|
||||
{
|
||||
bool raiseInterrupts = false;
|
||||
SendCommandToMetadataWorkersParams(command, CurrentUserName(),
|
||||
return SendCommandToMetadataWorkersParams(command, CurrentUserName(),
|
||||
0, NULL, NULL, raiseInterrupts);
|
||||
}
|
||||
|
||||
|
@ -218,7 +219,7 @@ SendBareCommandListToMetadataWorkers(List *commandList)
|
|||
* SendCommandToMetadataWorkersParams is a wrapper around
|
||||
* SendCommandToWorkersParamsInternal() enforcing some extra checks.
|
||||
*/
|
||||
static void
|
||||
static bool
|
||||
SendCommandToMetadataWorkersParams(const char *command,
|
||||
const char *user, int parameterCount,
|
||||
const Oid *parameterTypes,
|
||||
|
@ -230,7 +231,7 @@ SendCommandToMetadataWorkersParams(const char *command,
|
|||
|
||||
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
|
||||
|
||||
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
|
||||
return SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
|
||||
parameterCount, parameterTypes,
|
||||
parameterValues, raiseInterrupts);
|
||||
}
|
||||
|
@ -367,7 +368,7 @@ GetConnectionsResults(List *connectionList, bool failOnError)
|
|||
* paramLengths, paramFormats and resultFormat are hard-coded to NULL, NULL and 0
|
||||
* respectively.
|
||||
*/
|
||||
static void
|
||||
static bool
|
||||
SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *command,
|
||||
const char *user, int parameterCount,
|
||||
const Oid *parameterTypes,
|
||||
|
@ -381,6 +382,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
|
|||
Use2PCForCoordinatedTransaction();
|
||||
|
||||
int logLevel = raiseInterrupts ? ERROR : WARNING;
|
||||
bool failed = false;
|
||||
|
||||
/* open connections in parallel */
|
||||
WorkerNode *workerNode = NULL;
|
||||
|
@ -419,6 +421,8 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
|
|||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, logLevel);
|
||||
|
||||
failed = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -429,12 +433,16 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
|
|||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, logLevel);
|
||||
|
||||
failed = true;
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
return failed;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, i
|
|||
const char *nodeUser,
|
||||
List *commandList);
|
||||
extern void SendCommandToWorkersWithMetadata(const char *command);
|
||||
extern void SendOptionalCommandToWorkersWithMetadata(const char *command);
|
||||
extern bool SendOptionalCommandToWorkersWithMetadata(const char *command);
|
||||
extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
||||
extern void EnsureNoModificationsHaveBeenDone(void);
|
||||
extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,
|
||||
|
|
Loading…
Reference in New Issue