mirror of https://github.com/citusdata/citus.git
commit
0d71fcd8af
|
@ -151,10 +151,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
bool isCopyFromWorker = false;
|
bool isCopyFromWorker = false;
|
||||||
|
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
|
||||||
{
|
|
||||||
CoordinatedTransactionUse2PC();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* disallow COPY to/from file or program except for superusers */
|
/* disallow COPY to/from file or program except for superusers */
|
||||||
if (copyStatement->filename != NULL && !superuser())
|
if (copyStatement->filename != NULL && !superuser())
|
||||||
|
@ -244,6 +240,7 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
||||||
uint32 connectionFlags = FOR_DML;
|
uint32 connectionFlags = FOR_DML;
|
||||||
|
|
||||||
masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort);
|
||||||
|
MarkRemoteTransactionCritical(masterConnection);
|
||||||
ClaimConnectionExclusively(masterConnection);
|
ClaimConnectionExclusively(masterConnection);
|
||||||
|
|
||||||
RemoteTransactionBeginIfNecessary(masterConnection);
|
RemoteTransactionBeginIfNecessary(masterConnection);
|
||||||
|
@ -1834,7 +1831,10 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
/* keep the table metadata to avoid looking it up for every tuple */
|
/* keep the table metadata to avoid looking it up for every tuple */
|
||||||
copyDest->tableMetadata = cacheEntry;
|
copyDest->tableMetadata = cacheEntry;
|
||||||
|
|
||||||
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC ||
|
||||||
|
MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
|
||||||
{
|
{
|
||||||
CoordinatedTransactionUse2PC();
|
CoordinatedTransactionUse2PC();
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,8 +102,6 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
|
|
||||||
CitusCopyDestReceiver *copyDest = NULL;
|
CitusCopyDestReceiver *copyDest = NULL;
|
||||||
|
|
||||||
BeginOrContinueCoordinatedTransaction();
|
|
||||||
|
|
||||||
partitionMethod = PartitionMethod(targetRelationId);
|
partitionMethod = PartitionMethod(targetRelationId);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
|
|
@ -483,6 +483,7 @@ void
|
||||||
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
||||||
bool useExclusiveConnection, bool colocatedShard)
|
bool useExclusiveConnection, bool colocatedShard)
|
||||||
{
|
{
|
||||||
|
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedRelationId);
|
||||||
char *placementOwner = TableOwner(distributedRelationId);
|
char *placementOwner = TableOwner(distributedRelationId);
|
||||||
bool includeSequenceDefaults = false;
|
bool includeSequenceDefaults = false;
|
||||||
List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
|
List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
|
||||||
|
@ -509,6 +510,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
|
||||||
|
|
||||||
BeginOrContinueCoordinatedTransaction();
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
||||||
|
cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
|
||||||
|
{
|
||||||
|
CoordinatedTransactionUse2PC();
|
||||||
|
}
|
||||||
|
|
||||||
foreach(shardPlacementCell, shardPlacements)
|
foreach(shardPlacementCell, shardPlacements)
|
||||||
{
|
{
|
||||||
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
|
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||||
|
|
|
@ -773,7 +773,8 @@ CoordinatedRemoteTransactionsCommit(void)
|
||||||
if (transaction->transactionState == REMOTE_TRANS_INVALID ||
|
if (transaction->transactionState == REMOTE_TRANS_INVALID ||
|
||||||
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
|
transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
|
||||||
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
|
transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
|
||||||
transaction->transactionState == REMOTE_TRANS_COMMITTED)
|
transaction->transactionState == REMOTE_TRANS_COMMITTED ||
|
||||||
|
transaction->transactionState == REMOTE_TRANS_ABORTED)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit';
|
||||||
SET citus.shard_replication_factor TO 2;
|
SET citus.shard_replication_factor TO 2;
|
||||||
SET citus.shard_count TO 2;
|
SET citus.shard_count TO 2;
|
||||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||||
|
-- create_distributed_table should add 2 recovery records (1 connection per node)
|
||||||
CREATE TABLE test_recovery (x text);
|
CREATE TABLE test_recovery (x text);
|
||||||
SELECT create_distributed_table('test_recovery', 'x');
|
SELECT create_distributed_table('test_recovery', 'x');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
|
@ -70,7 +71,40 @@ SELECT create_distributed_table('test_recovery', 'x');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create_reference_table should add another 2 recovery records
|
||||||
|
CREATE TABLE test_recovery_ref (x text);
|
||||||
|
SELECT create_reference_table('test_recovery_ref');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- plain INSERT does not use 2PC
|
||||||
INSERT INTO test_recovery VALUES ('hello');
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- Committed DDL commands should write 4 transaction recovery records
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
ALTER TABLE test_recovery ADD COLUMN y text;
|
ALTER TABLE test_recovery ADD COLUMN y text;
|
||||||
|
@ -162,11 +196,33 @@ SELECT recover_prepared_transactions();
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
|
||||||
|
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
count
|
count
|
||||||
-------
|
-------
|
||||||
0
|
4
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :master_port
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Committed COPY should write 4 transaction records
|
||||||
|
COPY test_recovery (x) FROM STDIN CSV;
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
recover_prepared_transactions
|
||||||
|
-------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_recovery_ref;
|
||||||
DROP TABLE test_recovery;
|
DROP TABLE test_recovery;
|
||||||
|
|
|
@ -352,7 +352,12 @@ SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey
|
||||||
|
|
||||||
-- Test copy from the worker node
|
-- Test copy from the worker node
|
||||||
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
|
||||||
|
-- Make sure we don't use 2PC when connecting to master, even if requested
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_commit_protocol TO '2pc';
|
||||||
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
-- Test if there is no relation to copy data with the worker copy
|
-- Test if there is no relation to copy data with the worker copy
|
||||||
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
|
|
@ -468,7 +468,11 @@ HINT: Consider using hash partitioning.
|
||||||
\c - - - 57637
|
\c - - - 57637
|
||||||
-- Test copy from the worker node
|
-- Test copy from the worker node
|
||||||
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
-- Make sure we don't use 2PC when connecting to master, even if requested
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.multi_shard_commit_protocol TO '2pc';
|
||||||
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
COMMIT;
|
||||||
-- Test if there is no relation to copy data with the worker copy
|
-- Test if there is no relation to copy data with the worker copy
|
||||||
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
WARNING: relation "lineitem_copy_none" does not exist
|
WARNING: relation "lineitem_copy_none" does not exist
|
||||||
|
|
|
@ -40,9 +40,21 @@ SET citus.shard_replication_factor TO 2;
|
||||||
SET citus.shard_count TO 2;
|
SET citus.shard_count TO 2;
|
||||||
SET citus.multi_shard_commit_protocol TO '2pc';
|
SET citus.multi_shard_commit_protocol TO '2pc';
|
||||||
|
|
||||||
|
-- create_distributed_table should add 2 recovery records (1 connection per node)
|
||||||
CREATE TABLE test_recovery (x text);
|
CREATE TABLE test_recovery (x text);
|
||||||
SELECT create_distributed_table('test_recovery', 'x');
|
SELECT create_distributed_table('test_recovery', 'x');
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
-- create_reference_table should add another 2 recovery records
|
||||||
|
CREATE TABLE test_recovery_ref (x text);
|
||||||
|
SELECT create_reference_table('test_recovery_ref');
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
-- plain INSERT does not use 2PC
|
||||||
INSERT INTO test_recovery VALUES ('hello');
|
INSERT INTO test_recovery VALUES ('hello');
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
|
||||||
-- Committed DDL commands should write 4 transaction recovery records
|
-- Committed DDL commands should write 4 transaction recovery records
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
@ -78,7 +90,21 @@ INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery;
|
||||||
|
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
SELECT count(*) FROM pg_dist_transaction;
|
|
||||||
|
|
||||||
\c - - - :master_port
|
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
|
||||||
|
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
-- Committed COPY should write 4 transaction records
|
||||||
|
COPY test_recovery (x) FROM STDIN CSV;
|
||||||
|
hello-0
|
||||||
|
hello-1
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_dist_transaction;
|
||||||
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
DROP TABLE test_recovery_ref;
|
||||||
DROP TABLE test_recovery;
|
DROP TABLE test_recovery;
|
||||||
|
|
Loading…
Reference in New Issue