Finish VACUUM implementation

Required adding a new variable to store last MultiShardCommitProtocol
for restore at Xact end, but otherwise pretty clean.
pull/1013/head
Jason Petersen 2016-12-05 12:30:37 -07:00
parent 3189c2256d
commit 10236564fa
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
4 changed files with 22 additions and 8 deletions

View File

@ -46,6 +46,7 @@
#include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/multi_utility.h" /* IWYU pragma: keep */
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -920,6 +921,10 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand, bool isTopL
taskList = VacuumTaskList(relationId, vacuumStmt); taskList = VacuumTaskList(relationId, vacuumStmt);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
ExecuteModifyTasksWithoutResults(taskList);
return (Node *) vacuumStmt; return (Node *) vacuumStmt;
} }
@ -967,9 +972,6 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("VACUUM of distributed tables is not supported")));
return taskList; return taskList;
} }

View File

@ -175,9 +175,11 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
* transaction to fail. * transaction to fail.
*/ */
MarkRemoteTransactionCritical(connection); MarkRemoteTransactionCritical(connection);
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
/* issue BEGIN */ {
RemoteTransactionBegin(connection); /* issue BEGIN */
RemoteTransactionBegin(connection);
}
} }
} }
@ -270,6 +272,11 @@ ResetShardPlacementTransactionState(void)
* round. * round.
*/ */
shardConnectionHash = NULL; shardConnectionHash = NULL;
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
{
MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
}
} }

View File

@ -32,6 +32,7 @@ CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NON
/* GUC, the commit protocol to use for commands affecting more than one connection */ /* GUC, the commit protocol to use for commands affecting more than one connection */
int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC; int MultiShardCommitProtocol = COMMIT_PROTOCOL_1PC;
int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
/* state needed to keep track of operations used during a transaction */ /* state needed to keep track of operations used during a transaction */
XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE; XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;

View File

@ -47,13 +47,17 @@ typedef enum CoordinatedTransactionState
/* Enumeration that defines the different commit protocols available */ /* Enumeration that defines the different commit protocols available */
typedef enum typedef enum
{ {
COMMIT_PROTOCOL_1PC = 0, COMMIT_PROTOCOL_BARE = 0,
COMMIT_PROTOCOL_2PC = 1 COMMIT_PROTOCOL_1PC = 1,
COMMIT_PROTOCOL_2PC = 2
} CommitProtocolType; } CommitProtocolType;
/* config variable managed via guc.c */ /* config variable managed via guc.c */
extern int MultiShardCommitProtocol; extern int MultiShardCommitProtocol;
/* state needed to restore multi-shard commit protocol during VACUUM/ANALYZE */
extern int SavedMultiShardCommitProtocol;
/* state needed to prevent new connections during modifying transactions */ /* state needed to prevent new connections during modifying transactions */
extern XactModificationType XactModificationLevel; extern XactModificationType XactModificationLevel;