Switch to using LOCK instead of lock_relation_if_exists in TRUNCATE (#5930)

Breaking down #5899 into smaller PR-s

This particular PR changes the way TRUNCATE acquires distributed locks on the relations it is truncating to use the LOCK command instead of lock_relation_if_exists. This has the benefit of using pg's recursive locking logic it implements for the LOCK command instead of us having to resolve relation dependencies and lock them explicitly. While this does not directly affect truncate, it will allow us to generalize this locking logic to then log different relations where the pg recursive locking will become useful (e.g. locking views).

This implementation is a bit more complex that it needs to be due to pg not supporting locking foreign tables. We can however, still lock foreign tables with lock_relation_if_exists. So for a command:

TRUNCATE dist_table_1, dist_table_2, foreign_table_1, foreign_table_2, dist_table_3;

We generate and send the following command to all the workers in metadata:
```sql
SEL citus.enable_ddl_propagation TO FALSE;
LOCK dist_table_1, dist_table_2 IN ACCESS EXCLUSIVE MODE;
SELECT lock_relation_if_exists('foreign_table_1', 'ACCESS EXCLUSIVE');
SELECT lock_relation_if_exists('foreign_table_2', 'ACCESS EXCLUSIVE');
LOCK dist_table_3 IN ACCESS EXCLUSIVE MODE;
SEL citus.enable_ddl_propagation TO TRUE;
```

Note that we need to alternate between the lock command and lock_table_if_exists in order to preserve the TRUNCATE order of relations.
When pg supports locking foreign tables, we will be able to massive simplify this logic and send a single LOCK command.

(cherry picked from commit 4c6f62efc6)
release-11.0-onder-23-may
Gledis Zeneli 2022-05-11 18:38:48 +03:00 committed by gledis69
parent 73fd4f7ded
commit 3f282c660b
19 changed files with 292 additions and 234 deletions

View File

@ -40,15 +40,11 @@
#include "utils/rel.h"
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, '%s');"
/* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
static List * TruncateTaskList(Oid relationId);
@ -363,113 +359,51 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
static void
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
{
List *distributedRelationList = NIL;
/* nothing to do if there is no metadata at worker nodes */
if (!ClusterHasKnownMetadataWorkers())
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync)
{
return;
}
List *distributedRelationList = NIL;
/*
* this is used to enforce the lock order:
* [...TruncatedTables], [...TablesTruncatedFromCascadingOnTruncatedTables]
*/
List *referencingRelationIds = NIL;
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, truncateStatement->relations)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
Oid referencingRelationId = InvalidOid;
if (!IsCitusTable(relationId))
{
continue;
}
if (list_member_oid(distributedRelationList, relationId))
{
continue;
}
distributedRelationList = lappend_oid(distributedRelationList, relationId);
distributedRelationList = list_append_unique_oid(distributedRelationList,
relationId);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
Oid referencingRelationId = InvalidOid;
foreach_oid(referencingRelationId, referencingTableList)
{
distributedRelationList = list_append_unique_oid(distributedRelationList,
referencingRelationId);
referencingRelationIds = lappend_oid(referencingRelationIds,
referencingRelationId);
}
}
distributedRelationList = list_concat_unique_oid(distributedRelationList,
referencingRelationIds);
if (distributedRelationList != NIL)
{
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
}
}
/*
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
* for given list of relations ids. Relation id list and worker node list
* sorted so that the lock is acquired in the same order regardless of which
* node it was run on. Notice that no lock is acquired on coordinator node.
*
* Notice that the locking functions is sent to all workers regardless of if
* it has metadata or not. This is because a worker node only knows itself
* and previous workers that has metadata sync turned on. The node does not
* know about other nodes that have metadata sync turned on afterwards.
*/
static void
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
{
Oid relationId = InvalidOid;
List *workerNodeList = ActivePrimaryNodeList(NoLock);
const char *lockModeText = LockModeToLockModeText(lockMode);
/*
* We want to acquire locks in the same order across the nodes.
* Although relation ids may change, their ordering will not.
*/
relationIdList = SortList(relationIdList, CompareOids);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
UseCoordinatedTransaction();
int32 localGroupId = GetLocalGroupId();
foreach_oid(relationId, relationIdList)
{
/*
* We only acquire distributed lock on relation if
* the relation is sync'ed between mx nodes.
*
* Even if users disable metadata sync, we cannot
* allow them not to acquire the remote locks.
* Hence, we have !IsCoordinator() check.
*/
if (ShouldSyncTableMetadata(relationId) || !IsCoordinator())
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
StringInfo lockRelationCommand = makeStringInfo();
appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS,
quote_literal_cstr(qualifiedRelationName),
lockModeText);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == localGroupId)
{
LockRelationOid(relationId, lockMode);
continue;
}
SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data);
}
}
}
}

View File

@ -6,6 +6,6 @@ RETURNS BOOL
LANGUAGE C STRICT as 'MODULE_PATHNAME',
$$lock_relation_if_exists$$;
COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text)
IS 'locks relation in the lock_mode if the relation exists';
IS 'used internally to locks relation in the lock_mode if the relation exists without throwing errors; consider using LOCK * IN * MODE instead';
RESET search_path;

View File

@ -149,7 +149,7 @@ List *
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
List *workerNodeList = NIL;
if (targetWorkerSet == ALL_SHARD_NODES)
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
{
workerNodeList = ActivePrimaryNodeList(lockMode);
}
@ -162,7 +162,9 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES && !workerNode->hasMetadata)
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet ==
METADATA_NODES) &&
!workerNode->hasMetadata)
{
continue;
}

View File

@ -37,11 +37,13 @@
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "distributed/local_executor.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/varlena.h"
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, %s);"
/* static definition and declarations */
struct LockModeToStringType
@ -974,9 +976,6 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
text *lockModeText = PG_GETARG_TEXT_P(1);
char *lockModeCString = text_to_cstring(lockModeText);
/* ensure that we're in a transaction block */
RequireTransactionBlock(true, "lock_relation_if_exists");
/* get the lock mode */
LOCKMODE lockMode = LockModeTextToLockMode(lockModeCString);
@ -1059,3 +1058,140 @@ CitusLockTableAclCheck(Oid relationId, LOCKMODE lockmode, Oid userId)
return aclResult;
}
/*
* EnsureCanAcquireLock checks if currect user has the permissions
* to acquire a lock on the table and throws an error if the user does
* not have the permissions
*/
static void
EnsureCanAcquireLock(Oid relationId, LOCKMODE lockMode)
{
AclResult aclResult = CitusLockTableAclCheck(relationId, lockMode,
GetUserId());
if (aclResult != ACLCHECK_OK)
{
aclcheck_error(aclResult,
get_relkind_objtype(get_rel_relkind(relationId)),
get_rel_name(relationId));
}
}
/*
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
* for given list of relations ids. Worker node list is sorted so that the lock
* is acquired in the same order regardless of which node it was run on. Notice
* that no lock is acquired on coordinator node if the coordinator is not added
* to the metadata.
*/
void
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
{
const char *lockModeText = LockModeToLockModeText(lockMode);
UseCoordinatedTransaction();
StringInfo lockRelationsCommand = makeStringInfo();
appendStringInfo(lockRelationsCommand, "%s;\n", DISABLE_DDL_PROPAGATION);
/*
* In the following loop, when there are foreign tables, we need to switch from
* LOCK * statement to lock_relation_if_exists() and vice versa since pg
* does not support using LOCK on foreign tables.
*/
bool startedLockCommand = false;
int lockedRelations = 0;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
{
/*
* we want to prevent under privileged users to trigger establishing connections,
* that's why we have this additional check. Otherwise, we'd get the errors as
* soon as we execute the command on any node
*/
EnsureCanAcquireLock(relationId, lockMode);
if (!ShouldSyncTableMetadata(relationId))
{
continue;
}
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
/*
* As of pg14 we cannot use LOCK to lock a FOREIGN TABLE
* so we have to use the lock_relation_if_exist udf
*/
if (get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE)
{
/* finish the partial lock statement */
if (startedLockCommand)
{
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
startedLockCommand = false;
}
/* use lock_relation_if_exits to lock foreign table */
appendStringInfo(lockRelationsCommand, LOCK_RELATION_IF_EXISTS,
quote_literal_cstr(qualifiedRelationName),
quote_literal_cstr(lockModeText));
appendStringInfoChar(lockRelationsCommand, '\n');
}
else if (startedLockCommand)
{
/* append relation to partial lock statement */
appendStringInfo(lockRelationsCommand, ", %s", qualifiedRelationName);
}
else
{
/* start a new partial lock statement */
appendStringInfo(lockRelationsCommand, "LOCK %s", qualifiedRelationName);
startedLockCommand = true;
}
lockedRelations++;
}
if (lockedRelations == 0)
{
return;
}
if (startedLockCommand)
{
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
}
appendStringInfo(lockRelationsCommand, ENABLE_DDL_PROPAGATION);
const char *lockCommand = lockRelationsCommand->data;
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, NoLock);
/*
* We want to acquire locks in the same order across the nodes.
* Although relation ids may change, their ordering will not.
*/
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == localGroupId)
{
ExecuteUtilityCommand(lockCommand);
continue;
}
SendCommandToWorker(nodeName, nodePort, lockCommand);
}
}

View File

@ -117,6 +117,7 @@ extern bool TryLockPlacementCleanup(void);
extern void EnsureShardOwner(uint64 shardId, bool missingOk);
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
extern void BlockWritesToShardList(List *shardList);
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
/* Lock shard/relation metadata of the referenced reference table if exists */
extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId,

View File

@ -22,9 +22,27 @@
*/
typedef enum TargetWorkerSet
{
/*
* All the active primary nodes in the metadata which have metadata
* except the coordinator
*/
NON_COORDINATOR_METADATA_NODES,
/*
* All the active primary nodes in the metadata except the coordinator
*/
NON_COORDINATOR_NODES,
ALL_SHARD_NODES
/*
* All active primary nodes in the metadata
*/
ALL_SHARD_NODES,
/*
* All the active primary nodes in the metadata which have metadata
* (includes the coodinator if it is added)
*/
METADATA_NODES
} TargetWorkerSet;

View File

@ -1,7 +1,7 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-insert s2-update s1-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -26,7 +26,7 @@ restore_isolation_tester_func
starting permutation: s1-insert s2-update
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -44,7 +44,7 @@ restore_isolation_tester_func
starting permutation: s1-begin s1-multi-insert s2-update s1-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -69,7 +69,7 @@ restore_isolation_tester_func
starting permutation: s1-begin s1-multi-insert s2-multi-insert-overlap s1-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -93,7 +93,7 @@ restore_isolation_tester_func
starting permutation: s1-begin s2-begin s1-multi-insert s2-multi-insert s1-commit s2-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)

View File

@ -1,7 +1,7 @@
Parsed test spec with 2 sessions
starting permutation: s2-invalidate-57637 s1-begin s1-insertone s2-repair s1-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -29,7 +29,7 @@ master_copy_shard_placement
starting permutation: s1-insertone s2-invalidate-57637 s1-begin s1-insertall s2-repair s1-commit
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -60,7 +60,7 @@ master_copy_shard_placement
starting permutation: s2-invalidate-57637 s2-begin s2-repair s1-insertone s2-commit s2-invalidate-57638 s1-display s2-invalidate-57637 s2-revalidate-57638 s1-display
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -113,7 +113,7 @@ test_id|data
starting permutation: s2-invalidate-57637 s1-prepared-insertone s2-begin s2-repair s1-prepared-insertone s2-commit s2-invalidate-57638 s1-display s2-invalidate-57637 s2-revalidate-57638 s1-display
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -171,7 +171,7 @@ test_id|data
starting permutation: s2-invalidate-57637 s1-insertone s1-prepared-insertall s2-begin s2-repair s1-prepared-insertall s2-commit s2-invalidate-57638 s1-display s2-invalidate-57637 s2-revalidate-57638 s1-display
master_create_worker_shards
create_distributed_table
---------------------------------------------------------------------
(1 row)

View File

@ -251,13 +251,19 @@ step s2-truncate:
step s1-commit:
COMMIT;
s2: WARNING: relation "public.dist_table" does not exist
s2: WARNING: relation "public.dist_table" does not exist
step s2-truncate: <... completed>
ERROR: failure on connection marked as essential: localhost:xxxxx
step s2-select:
SELECT * FROM dist_table ORDER BY 1, 2;
a|b
---------------------------------------------------------------------
(0 rows)
1|2
3|4
5|6
(3 rows)
restore_isolation_tester_func
---------------------------------------------------------------------

View File

@ -282,14 +282,11 @@ DELETE FROM the_table;
ERROR: cannot assign TransactionIds during recovery
-- DDL is not possible
TRUNCATE the_table;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ERROR: cannot execute LOCK TABLE during recovery
TRUNCATE reference_table;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ERROR: cannot execute LOCK TABLE during recovery
TRUNCATE citus_local_table;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ERROR: cannot execute LOCK TABLE during recovery
ALTER TABLE the_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.

View File

@ -215,9 +215,13 @@ RESET client_min_messages;
\c - - - :master_port
-- also test the infrastructure that is used for supporting
-- TRUNCATE from worker nodes
-- should fail since it is not in transaction block
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
ERROR: lock_relation_if_exists can only be used in transaction blocks
-- should pass since we don't check for xact block in lock_relation_if_exists
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE');
lock_relation_if_exists
---------------------------------------------------------------------
t
(1 row)
BEGIN;
-- should fail since the schema is not provided
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');

View File

@ -161,51 +161,8 @@ DROP TABLE test_truncate_range;
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_hash(a int);
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
master_create_distributed_table
---------------------------------------------------------------------
(1 row)
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash;
count
---------------------------------------------------------------------
0
(1 row)
INSERT INTO test_truncate_hash values (1);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (1001);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (2000);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
INSERT INTO test_truncate_hash values (100);
ERROR: could not find any shards
DETAIL: No shards exist for distributed table "test_truncate_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT count(*) FROM test_truncate_hash;
count
---------------------------------------------------------------------
0
(1 row)
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid
---------------------------------------------------------------------
(0 rows)
TRUNCATE TABLE test_truncate_hash;
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
master_create_worker_shards
SELECT create_distributed_table('test_truncate_hash', 'a', 'hash');
create_distributed_table
---------------------------------------------------------------------
(1 row)
@ -214,37 +171,13 @@ INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
count
---------------------------------------------------------------------
4
(1 row)
TRUNCATE TABLE test_truncate_hash;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_hash;
count
---------------------------------------------------------------------
0
(1 row)
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid
---------------------------------------------------------------------
1210006
1210007
1210008
1210009
(4 rows)
-- verify that truncate can be aborted
INSERT INTO test_truncate_hash VALUES (1);
BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;
SELECT count(*) FROM test_truncate_hash;
count
---------------------------------------------------------------------
1
5
(1 row)
DROP TABLE test_truncate_hash;

View File

@ -89,7 +89,6 @@ INSERT INTO super_user_owned_regular_user_granted VALUES (1, 1), (2, 1) ON CONFL
ERROR: permission denied for table super_user_owned_regular_user_granted
TRUNCATE super_user_owned_regular_user_granted;
ERROR: permission denied for table super_user_owned_regular_user_granted
CONTEXT: while executing command on localhost:xxxxx
DELETE FROM super_user_owned_regular_user_granted;
ERROR: permission denied for table super_user_owned_regular_user_granted
UPDATE super_user_owned_regular_user_granted SET a = 1;

View File

@ -1319,6 +1319,49 @@ SELECT count(*) FROM foreign_table;
(1 row)
TRUNCATE foreign_table;
-- test truncating foreign tables in the same statement with
-- other distributed tables
CREATE TABLE foreign_table_test_2 (id integer NOT NULL, data text, a bigserial);
CREATE FOREIGN TABLE foreign_table_2
(
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'pg14', table_name 'foreign_table_test_2');
SELECT citus_add_local_table_to_metadata('foreign_table_2');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_table_1(a int);
CREATE TABLE dist_table_2(a int);
CREATE TABLE dist_table_3(a int);
SELECT create_distributed_table('dist_table_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_table_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('dist_table_3');
create_reference_table
---------------------------------------------------------------------
(1 row)
TRUNCATE foreign_table, foreign_table_2;
TRUNCATE dist_table_1, foreign_table, dist_table_2, foreign_table_2, dist_table_3;
TRUNCATE dist_table_1, dist_table_2, foreign_table, dist_table_3;
TRUNCATE dist_table_1, foreign_table, foreign_table_2, dist_table_3;
TRUNCATE dist_table_1, foreign_table, foreign_table_2, dist_table_3, dist_table_2;
\c - - - :worker_1_port
set search_path to pg14;
-- verify the foreign table is truncated

View File

@ -3,8 +3,8 @@ setup
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE test_concurrent_dml (test_id integer NOT NULL, data text);
SELECT master_create_distributed_table('test_concurrent_dml', 'test_id', 'hash');
SELECT master_create_worker_shards('test_concurrent_dml', 4, 2);
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_concurrent_dml', 'test_id', 'hash', shard_count:=4);
}
teardown

View File

@ -1,24 +1,8 @@
setup
{
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
RETURNS void
LANGUAGE C STRICT
AS 'citus', $$master_create_distributed_table$$;
COMMENT ON FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
distribution_column text,
distribution_method citus.distribution_type)
IS 'define the table distribution functions';
-- this function is dropped in Citus10, added here for tests
CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name text, shard_count integer,
replication_factor integer DEFAULT 2)
RETURNS void
AS 'citus', $$master_create_worker_shards$$
LANGUAGE C STRICT;
CREATE TABLE test_dml_vs_repair (test_id integer NOT NULL, data int);
SELECT master_create_distributed_table('test_dml_vs_repair', 'test_id', 'hash');
SELECT master_create_worker_shards('test_dml_vs_repair', 1, 2);
SET citus.shard_replication_factor TO 2;
SELECT create_distributed_table('test_dml_vs_repair', 'test_id', 'hash', shard_count:=1);
}
teardown

View File

@ -146,8 +146,8 @@ RESET client_min_messages;
-- also test the infrastructure that is used for supporting
-- TRUNCATE from worker nodes
-- should fail since it is not in transaction block
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
-- should pass since we don't check for xact block in lock_relation_if_exists
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE');
BEGIN;
-- should fail since the schema is not provided

View File

@ -114,42 +114,13 @@ DROP TABLE test_truncate_range;
-- expect shard to be present, data to be truncated
--
CREATE TABLE test_truncate_hash(a int);
SELECT master_create_distributed_table('test_truncate_hash', 'a', 'hash');
-- verify no error is thrown when no shards are present
TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash;
SELECT create_distributed_table('test_truncate_hash', 'a', 'hash');
INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_hash;
SELECT master_create_worker_shards('test_truncate_hash', 4, 1);
INSERT INTO test_truncate_hash values (1);
INSERT INTO test_truncate_hash values (1001);
INSERT INTO test_truncate_hash values (2000);
INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash;
TRUNCATE TABLE test_truncate_hash;
-- verify data is truncated from the table
SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
-- verify that truncate can be aborted
INSERT INTO test_truncate_hash VALUES (1);
BEGIN; TRUNCATE TABLE test_truncate_hash; ROLLBACK;

View File

@ -671,6 +671,36 @@ SELECT citus_add_local_table_to_metadata('foreign_table');
SELECT count(*) FROM foreign_table;
TRUNCATE foreign_table;
-- test truncating foreign tables in the same statement with
-- other distributed tables
CREATE TABLE foreign_table_test_2 (id integer NOT NULL, data text, a bigserial);
CREATE FOREIGN TABLE foreign_table_2
(
id integer NOT NULL,
data text,
a bigserial
)
SERVER foreign_server
OPTIONS (schema_name 'pg14', table_name 'foreign_table_test_2');
SELECT citus_add_local_table_to_metadata('foreign_table_2');
CREATE TABLE dist_table_1(a int);
CREATE TABLE dist_table_2(a int);
CREATE TABLE dist_table_3(a int);
SELECT create_distributed_table('dist_table_1', 'a');
SELECT create_distributed_table('dist_table_2', 'a');
SELECT create_reference_table('dist_table_3');
TRUNCATE foreign_table, foreign_table_2;
TRUNCATE dist_table_1, foreign_table, dist_table_2, foreign_table_2, dist_table_3;
TRUNCATE dist_table_1, dist_table_2, foreign_table, dist_table_3;
TRUNCATE dist_table_1, foreign_table, foreign_table_2, dist_table_3;
TRUNCATE dist_table_1, foreign_table, foreign_table_2, dist_table_3, dist_table_2;
\c - - - :worker_1_port
set search_path to pg14;
-- verify the foreign table is truncated