do_not_have_single_tx_command
Onder Kalaci 2021-07-27 18:59:07 +02:00
parent 1cdd8ac512
commit 4d5a4e3a10
3 changed files with 31 additions and 11 deletions

View File

@ -214,6 +214,8 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
MarkNodeHasMetadata(nodeNameString, nodePort, true); MarkNodeHasMetadata(nodeNameString, nodePort, true);
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
workerNode = FindWorkerNode(nodeNameString, nodePort);
/* fail if metadata synchronization doesn't succeed */ /* fail if metadata synchronization doesn't succeed */
bool raiseInterrupts = true; bool raiseInterrupts = true;
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts); SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
@ -371,7 +373,7 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
{ {
if (raiseOnError) if (raiseOnError)
{ {
SendCommandToWorkersWithMetadata(command); SendCommandToWorkersWithMetadataNonStrict(command);
} }
else else
{ {
@ -1789,17 +1791,19 @@ SyncMetadataToNodes(void)
{ {
bool raiseInterrupts = false; bool raiseInterrupts = false;
MarkNodeMetadataSynced(workerNode->workerName,
workerNode->workerPort, true);
workerNode = FindWorkerNode(workerNode->workerName, workerNode->workerPort);
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
{ {
ereport(WARNING, (errmsg("failed to sync metadata to %s:%d", ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
workerNode->workerName, workerNode->workerName,
workerNode->workerPort))); workerNode->workerPort)));
result = METADATA_SYNC_FAILED_SYNC; result = METADATA_SYNC_FAILED_SYNC;
}
else
{
MarkNodeMetadataSynced(workerNode->workerName, MarkNodeMetadataSynced(workerNode->workerName,
workerNode->workerPort, true); workerNode->workerPort, false);
} }
} }
} }

View File

@ -37,7 +37,7 @@ static bool SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,
const char *const *parameterValues, const char *const *parameterValues,
bool raiseInterrupts); bool raiseInterrupts, bool strictMode);
static bool SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, static bool SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
const char *command, const char *user, const char *command, const char *user,
int parameterCount, int parameterCount,
@ -120,8 +120,19 @@ void
SendCommandToWorkersWithMetadata(const char *command) SendCommandToWorkersWithMetadata(const char *command)
{ {
bool raiseInterrupts = true; bool raiseInterrupts = true;
bool strictMode = true;
SendCommandToMetadataWorkersParams(command, CurrentUserName(), SendCommandToMetadataWorkersParams(command, CurrentUserName(),
0, NULL, NULL, raiseInterrupts); 0, NULL, NULL, raiseInterrupts, strictMode);
}
void
SendCommandToWorkersWithMetadataNonStrict(const char *command)
{
bool raiseInterrupts = true;
bool strictMode = false;
SendCommandToMetadataWorkersParams(command, CurrentUserName(),
0, NULL, NULL, raiseInterrupts, strictMode);
} }
@ -136,8 +147,9 @@ bool
SendOptionalCommandToWorkersWithMetadata(const char *command) SendOptionalCommandToWorkersWithMetadata(const char *command)
{ {
bool raiseInterrupts = false; bool raiseInterrupts = false;
bool strictMode = false;
return SendCommandToMetadataWorkersParams(command, CurrentUserName(), return SendCommandToMetadataWorkersParams(command, CurrentUserName(),
0, NULL, NULL, raiseInterrupts); 0, NULL, NULL, raiseInterrupts, strictMode);
} }
@ -224,13 +236,16 @@ SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,
const char *const *parameterValues, const char *const *parameterValues,
bool raiseInterrupts) bool raiseInterrupts,
bool strictMode)
{ {
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
ShareLock); ShareLock);
if (strictMode)
{
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
}
return SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, return SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command,
user, user,
parameterCount, parameterTypes, parameterCount, parameterTypes,

View File

@ -42,6 +42,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, i
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataNonStrict(const char *command);
extern bool SendOptionalCommandToWorkersWithMetadata(const char *command); extern bool SendOptionalCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);