mirror of https://github.com/citusdata/citus.git
Merge pull request #2370 from citusdata/fix_truncate_from_workers
Fix the bug introduced via allowing Truncate from MX worker nodespull/2360/head
commit
df0ca4617f
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '8.0-3'
|
default_version = '8.0-4'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
7.3-1 7.3-2 7.3-3 \
|
7.3-1 7.3-2 7.3-3 \
|
||||||
7.4-1 7.4-2 7.4-3 \
|
7.4-1 7.4-2 7.4-3 \
|
||||||
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
|
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
|
||||||
8.0-1 8.0-2 8.0-3
|
8.0-1 8.0-2 8.0-3 8.0-4
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -221,6 +221,8 @@ $(EXTENSION)--8.0-2.sql: $(EXTENSION)--8.0-1.sql $(EXTENSION)--8.0-1--8.0-2.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--8.0-3.sql: $(EXTENSION)--8.0-2.sql $(EXTENSION)--8.0-2--8.0-3.sql
|
$(EXTENSION)--8.0-3.sql: $(EXTENSION)--8.0-2.sql $(EXTENSION)--8.0-2--8.0-3.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
/* citus--8.0-3--8.0-4 */
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION lock_relation_if_exists(table_name text, lock_mode text)
|
||||||
|
RETURNS BOOL
|
||||||
|
LANGUAGE C STRICT as 'MODULE_PATHNAME',
|
||||||
|
$$lock_relation_if_exists$$;
|
||||||
|
COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text)
|
||||||
|
IS 'locks relation in the lock_mode if the relation exists';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '8.0-3'
|
default_version = '8.0-4'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -591,7 +591,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
||||||
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText);
|
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
|
||||||
|
|
||||||
EnsureTableCanBeColocatedWith(relationId, replicationModel,
|
EnsureTableCanBeColocatedWith(relationId, replicationModel,
|
||||||
distributionColumnType, sourceRelationId);
|
distributionColumnType, sourceRelationId);
|
||||||
|
|
|
@ -67,7 +67,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
|
||||||
int32 shardCount = PG_GETARG_INT32(1);
|
int32 shardCount = PG_GETARG_INT32(1);
|
||||||
int32 replicationFactor = PG_GETARG_INT32(2);
|
int32 replicationFactor = PG_GETARG_INT32(2);
|
||||||
|
|
||||||
Oid distributedTableId = ResolveRelationId(tableNameText);
|
Oid distributedTableId = ResolveRelationId(tableNameText, false);
|
||||||
|
|
||||||
/* do not add any data */
|
/* do not add any data */
|
||||||
bool useExclusiveConnections = false;
|
bool useExclusiveConnections = false;
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
#include "optimizer/clauses.h"
|
#include "optimizer/clauses.h"
|
||||||
|
@ -56,14 +57,22 @@
|
||||||
#include "utils/inval.h"
|
#include "utils/inval.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
#include "utils/varlena.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');"
|
||||||
|
#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE"
|
||||||
|
|
||||||
|
|
||||||
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
||||||
taskType);
|
taskType);
|
||||||
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
||||||
|
static LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
||||||
|
PG_FUNCTION_INFO_V1(lock_relation_if_exists);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -204,27 +213,23 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We should execute "TRUNCATE table_name;" on the other worker nodes before
|
* We lock the relation we're TRUNCATING on the other worker nodes before
|
||||||
* executing the truncate commands on the shards. This is necessary to prevent
|
* executing the truncate commands on the shards. This is necessary to prevent
|
||||||
* distributed deadlocks where a concurrent operation on the same table (or a
|
* distributed deadlocks where a concurrent operation on the same table (or a
|
||||||
* cascading table) is executed on the other nodes.
|
* cascading table) is executed on the other nodes.
|
||||||
*
|
*
|
||||||
* Note that we should skip the current node to prevent a self-deadlock.
|
* Note that we should skip the current node to prevent a self-deadlock that's why
|
||||||
|
* we use OTHER_WORKERS tag.
|
||||||
*/
|
*/
|
||||||
if (truncateOperation && ShouldSyncTableMetadata(relationId))
|
if (truncateOperation && ShouldSyncTableMetadata(relationId))
|
||||||
{
|
{
|
||||||
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
|
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||||
DISABLE_DDL_PROPAGATION);
|
StringInfo lockRelation = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName,
|
||||||
|
REMOTE_LOCK_MODE_FOR_TRUNCATE);
|
||||||
|
|
||||||
/*
|
SendCommandToWorkers(OTHER_WORKERS, lockRelation->data);
|
||||||
* Note that here we ignore the schema and send the queryString as is
|
|
||||||
* since citus_truncate_trigger already uses qualified table name.
|
|
||||||
* If that was not the case, we should also had to set the search path
|
|
||||||
* as we do for regular DDLs.
|
|
||||||
*/
|
|
||||||
SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA,
|
|
||||||
queryString);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
|
@ -312,3 +317,105 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command)
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* lock_relation_if_exists gets a relation name and lock mode
|
||||||
|
* and returns true if the relation exists and can be locked with
|
||||||
|
* the given lock mode. If the relation doesn't exists, the function
|
||||||
|
* return false.
|
||||||
|
*
|
||||||
|
* The relation name should be qualified with the schema name.
|
||||||
|
*
|
||||||
|
* The function errors out of the lockmode isn't defined in the PostgreSQL's
|
||||||
|
* explicit locking table.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *relationName = PG_GETARG_TEXT_P(0);
|
||||||
|
text *lockModeText = PG_GETARG_TEXT_P(1);
|
||||||
|
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
char *lockModeCString = text_to_cstring(lockModeText);
|
||||||
|
List *relationNameList = NIL;
|
||||||
|
RangeVar *relation = NULL;
|
||||||
|
LOCKMODE lockMode = NoLock;
|
||||||
|
|
||||||
|
/* ensure that we're in a transaction block */
|
||||||
|
RequireTransactionBlock(true, "lock_relation_if_exists");
|
||||||
|
|
||||||
|
relationId = ResolveRelationId(relationName, true);
|
||||||
|
if (!OidIsValid(relationId))
|
||||||
|
{
|
||||||
|
PG_RETURN_BOOL(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* get the lock mode */
|
||||||
|
lockMode = LockModeTextToLockMode(lockModeCString);
|
||||||
|
|
||||||
|
/* resolve relationId from passed in schema and relation name */
|
||||||
|
relationNameList = textToQualifiedNameList(relationName);
|
||||||
|
relation = makeRangeVarFromNameList(relationNameList);
|
||||||
|
|
||||||
|
/* lock the relation with the lock mode */
|
||||||
|
RangeVarGetRelid(relation, lockMode, false);
|
||||||
|
|
||||||
|
PG_RETURN_BOOL(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
|
||||||
|
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
|
||||||
|
* explicit locking table.
|
||||||
|
*/
|
||||||
|
static LOCKMODE
|
||||||
|
LockModeTextToLockMode(const char *lockModeName)
|
||||||
|
{
|
||||||
|
if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
/* there is no explict call for NoLock, but keeping it here for convinience */
|
||||||
|
return NoLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return AccessShareLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return RowShareLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return RowExclusiveLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return ShareUpdateExclusiveLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return ShareLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return ShareRowExclusiveLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return ExclusiveLock;
|
||||||
|
}
|
||||||
|
else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return AccessExclusiveLock;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||||
|
errmsg("unknown lock mode: %s", lockModeName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return NoLock;
|
||||||
|
}
|
||||||
|
|
|
@ -98,7 +98,7 @@ Datum
|
||||||
master_get_table_metadata(PG_FUNCTION_ARGS)
|
master_get_table_metadata(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *relationName = PG_GETARG_TEXT_P(0);
|
text *relationName = PG_GETARG_TEXT_P(0);
|
||||||
Oid relationId = ResolveRelationId(relationName);
|
Oid relationId = ResolveRelationId(relationName, false);
|
||||||
|
|
||||||
DistTableCacheEntry *partitionEntry = NULL;
|
DistTableCacheEntry *partitionEntry = NULL;
|
||||||
char *partitionKeyString = NULL;
|
char *partitionKeyString = NULL;
|
||||||
|
@ -215,7 +215,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
||||||
if (SRF_IS_FIRSTCALL())
|
if (SRF_IS_FIRSTCALL())
|
||||||
{
|
{
|
||||||
text *relationName = PG_GETARG_TEXT_P(0);
|
text *relationName = PG_GETARG_TEXT_P(0);
|
||||||
Oid relationId = ResolveRelationId(relationName);
|
Oid relationId = ResolveRelationId(relationName, false);
|
||||||
bool includeSequenceDefaults = true;
|
bool includeSequenceDefaults = true;
|
||||||
|
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
@ -320,7 +320,7 @@ GetNextShardId()
|
||||||
}
|
}
|
||||||
|
|
||||||
sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
|
sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
|
||||||
sequenceId = ResolveRelationId(sequenceName);
|
sequenceId = ResolveRelationId(sequenceName, false);
|
||||||
sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
@ -399,7 +399,7 @@ GetNextPlacementId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
|
sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
|
||||||
sequenceId = ResolveRelationId(sequenceName);
|
sequenceId = ResolveRelationId(sequenceName, false);
|
||||||
sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
@ -527,17 +527,16 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* Finds the relationId from a potentially qualified relation name. */
|
/* Finds the relationId from a potentially qualified relation name. */
|
||||||
Oid
|
Oid
|
||||||
ResolveRelationId(text *relationName)
|
ResolveRelationId(text *relationName, bool missingOk)
|
||||||
{
|
{
|
||||||
List *relationNameList = NIL;
|
List *relationNameList = NIL;
|
||||||
RangeVar *relation = NULL;
|
RangeVar *relation = NULL;
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
bool failOK = false; /* error if relation cannot be found */
|
|
||||||
|
|
||||||
/* resolve relationId from passed in schema and relation name */
|
/* resolve relationId from passed in schema and relation name */
|
||||||
relationNameList = textToQualifiedNameList(relationName);
|
relationNameList = textToQualifiedNameList(relationName);
|
||||||
relation = makeRangeVarFromNameList(relationNameList);
|
relation = makeRangeVarFromNameList(relationNameList);
|
||||||
relationId = RangeVarGetRelid(relation, NoLock, failOK);
|
relationId = RangeVarGetRelid(relation, NoLock, missingOk);
|
||||||
|
|
||||||
return relationId;
|
return relationId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,7 +91,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
char partitionMethod = 0;
|
char partitionMethod = 0;
|
||||||
char storageType = SHARD_STORAGE_TABLE;
|
char storageType = SHARD_STORAGE_TABLE;
|
||||||
|
|
||||||
Oid relationId = ResolveRelationId(relationNameText);
|
Oid relationId = ResolveRelationId(relationNameText, false);
|
||||||
char relationKind = get_rel_relkind(relationId);
|
char relationKind = get_rel_relkind(relationId);
|
||||||
char replicationModel = REPLICATION_MODEL_INVALID;
|
char replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
|
|
||||||
|
|
|
@ -117,14 +117,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
|
||||||
int nodePort = workerNode->workerPort;
|
int nodePort = workerNode->workerPort;
|
||||||
int connectionFlags = FORCE_NEW_CONNECTION;
|
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||||
|
|
||||||
if ((targetWorkerSet == WORKERS_WITH_METADATA ||
|
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
||||||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
|
|
||||||
!workerNode->hasMetadata)
|
!workerNode->hasMetadata)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
|
if (targetWorkerSet == OTHER_WORKERS &&
|
||||||
workerNode->groupId == GetLocalGroupId())
|
workerNode->groupId == GetLocalGroupId())
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
@ -177,14 +176,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
|
||||||
MultiConnection *connection = NULL;
|
MultiConnection *connection = NULL;
|
||||||
int connectionFlags = 0;
|
int connectionFlags = 0;
|
||||||
|
|
||||||
if ((targetWorkerSet == WORKERS_WITH_METADATA ||
|
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
||||||
targetWorkerSet == OTHER_WORKERS_WITH_METADATA) &&
|
|
||||||
!workerNode->hasMetadata)
|
!workerNode->hasMetadata)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA &&
|
if (targetWorkerSet == OTHER_WORKERS &&
|
||||||
workerNode->groupId == GetLocalGroupId())
|
workerNode->groupId == GetLocalGroupId())
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -546,7 +546,7 @@ uint32
|
||||||
GetNextColocationId()
|
GetNextColocationId()
|
||||||
{
|
{
|
||||||
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
|
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
|
||||||
Oid sequenceId = ResolveRelationId(sequenceName);
|
Oid sequenceId = ResolveRelationId(sequenceName, false);
|
||||||
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
|
|
|
@ -1165,7 +1165,7 @@ int32
|
||||||
GetNextGroupId()
|
GetNextGroupId()
|
||||||
{
|
{
|
||||||
text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
|
text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME);
|
||||||
Oid sequenceId = ResolveRelationId(sequenceName);
|
Oid sequenceId = ResolveRelationId(sequenceName, false);
|
||||||
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
|
@ -1227,7 +1227,7 @@ int
|
||||||
GetNextNodeId()
|
GetNextNodeId()
|
||||||
{
|
{
|
||||||
text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
|
text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME);
|
||||||
Oid sequenceId = ResolveRelationId(sequenceName);
|
Oid sequenceId = ResolveRelationId(sequenceName, false);
|
||||||
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
|
|
|
@ -36,7 +36,7 @@ worker_foreign_file_path(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *foreignTableName = PG_GETARG_TEXT_P(0);
|
text *foreignTableName = PG_GETARG_TEXT_P(0);
|
||||||
text *foreignFilePath = NULL;
|
text *foreignFilePath = NULL;
|
||||||
Oid relationId = ResolveRelationId(foreignTableName);
|
Oid relationId = ResolveRelationId(foreignTableName, false);
|
||||||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||||
|
|
||||||
ListCell *optionCell = NULL;
|
ListCell *optionCell = NULL;
|
||||||
|
|
|
@ -56,7 +56,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons
|
||||||
RaiseDeferredErrorInternal(error, elevel); \
|
RaiseDeferredErrorInternal(error, elevel); \
|
||||||
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
|
if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \
|
||||||
pg_unreachable(); } \
|
pg_unreachable(); } \
|
||||||
} while (0)
|
} \
|
||||||
|
while (0)
|
||||||
#else /* !HAVE_BUILTIN_CONSTANT_P */
|
#else /* !HAVE_BUILTIN_CONSTANT_P */
|
||||||
#define RaiseDeferredError(error, elevel) \
|
#define RaiseDeferredError(error, elevel) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -64,7 +65,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons
|
||||||
RaiseDeferredErrorInternal(error, elevel_); \
|
RaiseDeferredErrorInternal(error, elevel_); \
|
||||||
if (elevel_ >= ERROR) { \
|
if (elevel_ >= ERROR) { \
|
||||||
pg_unreachable(); } \
|
pg_unreachable(); } \
|
||||||
} while (0)
|
} \
|
||||||
|
while (0)
|
||||||
#endif /* HAVE_BUILTIN_CONSTANT_P */
|
#endif /* HAVE_BUILTIN_CONSTANT_P */
|
||||||
|
|
||||||
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);
|
void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel);
|
||||||
|
|
|
@ -103,7 +103,7 @@ extern bool IsCoordinator(void);
|
||||||
extern bool CStoreTable(Oid relationId);
|
extern bool CStoreTable(Oid relationId);
|
||||||
extern uint64 GetNextShardId(void);
|
extern uint64 GetNextShardId(void);
|
||||||
extern uint64 GetNextPlacementId(void);
|
extern uint64 GetNextPlacementId(void);
|
||||||
extern Oid ResolveRelationId(text *relationName);
|
extern Oid ResolveRelationId(text *relationName, bool missingOk);
|
||||||
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
||||||
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
||||||
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
||||||
|
@ -151,6 +151,7 @@ extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
|
extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
|
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
|
||||||
|
extern Datum lock_relation_if_exists(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
/* function declarations for shard creation functionality */
|
/* function declarations for shard creation functionality */
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
/* following functions are renamed in PG11 */
|
/* following functions are renamed in PG11 */
|
||||||
#define PreventInTransactionBlock PreventTransactionChain
|
#define PreventInTransactionBlock PreventTransactionChain
|
||||||
#define DatumGetJsonbP(d) DatumGetJsonb(d)
|
#define DatumGetJsonbP(d) DatumGetJsonb(d)
|
||||||
|
#define RequireTransactionBlock RequireTransactionChain
|
||||||
|
|
||||||
/* following defines also exist for PG11 */
|
/* following defines also exist for PG11 */
|
||||||
#define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION
|
#define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
typedef enum TargetWorkerSet
|
typedef enum TargetWorkerSet
|
||||||
{
|
{
|
||||||
WORKERS_WITH_METADATA,
|
WORKERS_WITH_METADATA,
|
||||||
OTHER_WORKERS_WITH_METADATA,
|
OTHER_WORKERS,
|
||||||
ALL_WORKERS
|
ALL_WORKERS
|
||||||
} TargetWorkerSet;
|
} TargetWorkerSet;
|
||||||
|
|
||||||
|
|
|
@ -295,8 +295,7 @@ DEBUG: common table expressions are not supported in distributed modifications
|
||||||
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
|
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
|
||||||
DEBUG: common table expressions are not supported in distributed modifications
|
DEBUG: common table expressions are not supported in distributed modifications
|
||||||
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
|
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
|
||||||
SET citus.explain_all_tasks TO ON;
|
WITH cte_1 AS (
|
||||||
EXPLAIN (COSTS FALSE) WITH cte_1 AS (
|
|
||||||
WITH cte_2 AS (
|
WITH cte_2 AS (
|
||||||
SELECT tenant_id as cte2_id
|
SELECT tenant_id as cte2_id
|
||||||
FROM second_distributed_table
|
FROM second_distributed_table
|
||||||
|
@ -315,65 +314,6 @@ DEBUG: common table expressions are not supported in distributed modifications
|
||||||
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
|
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
|
||||||
DEBUG: common table expressions are not supported in distributed modifications
|
DEBUG: common table expressions are not supported in distributed modifications
|
||||||
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
|
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id)
|
||||||
QUERY PLAN
|
|
||||||
------------------------------------------------------------------------------------------------
|
|
||||||
Custom Scan (Citus Router)
|
|
||||||
-> Distributed Subplan 22_1
|
|
||||||
-> Custom Scan (Citus Router)
|
|
||||||
Task Count: 4
|
|
||||||
Tasks Shown: All
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57638 dbname=regression
|
|
||||||
-> Update on distributed_table_2370000 distributed_table
|
|
||||||
-> Seq Scan on distributed_table_2370000 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57637 dbname=regression
|
|
||||||
-> Update on distributed_table_2370001 distributed_table
|
|
||||||
-> Seq Scan on distributed_table_2370001 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57638 dbname=regression
|
|
||||||
-> Update on distributed_table_2370002 distributed_table
|
|
||||||
-> Seq Scan on distributed_table_2370002 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57637 dbname=regression
|
|
||||||
-> Update on distributed_table_2370003 distributed_table
|
|
||||||
-> Seq Scan on distributed_table_2370003 distributed_table
|
|
||||||
Task Count: 4
|
|
||||||
Tasks Shown: All
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57638 dbname=regression
|
|
||||||
-> Update on distributed_table_2370000 distributed_table
|
|
||||||
-> Nested Loop
|
|
||||||
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
|
|
||||||
-> Function Scan on read_intermediate_result intermediate_result
|
|
||||||
-> Materialize
|
|
||||||
-> Seq Scan on distributed_table_2370000 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57637 dbname=regression
|
|
||||||
-> Update on distributed_table_2370001 distributed_table
|
|
||||||
-> Nested Loop
|
|
||||||
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
|
|
||||||
-> Function Scan on read_intermediate_result intermediate_result
|
|
||||||
-> Materialize
|
|
||||||
-> Seq Scan on distributed_table_2370001 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57638 dbname=regression
|
|
||||||
-> Update on distributed_table_2370002 distributed_table
|
|
||||||
-> Nested Loop
|
|
||||||
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
|
|
||||||
-> Function Scan on read_intermediate_result intermediate_result
|
|
||||||
-> Materialize
|
|
||||||
-> Seq Scan on distributed_table_2370002 distributed_table
|
|
||||||
-> Task
|
|
||||||
Node: host=localhost port=57637 dbname=regression
|
|
||||||
-> Update on distributed_table_2370003 distributed_table
|
|
||||||
-> Nested Loop
|
|
||||||
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
|
|
||||||
-> Function Scan on read_intermediate_result intermediate_result
|
|
||||||
-> Materialize
|
|
||||||
-> Seq Scan on distributed_table_2370003 distributed_table
|
|
||||||
(55 rows)
|
|
||||||
|
|
||||||
-- we don't support updating local table with a join with
|
-- we don't support updating local table with a join with
|
||||||
-- distributed tables
|
-- distributed tables
|
||||||
UPDATE
|
UPDATE
|
||||||
|
|
|
@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-1';
|
ALTER EXTENSION citus UPDATE TO '8.0-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-2';
|
ALTER EXTENSION citus UPDATE TO '8.0-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '8.0-4';
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
|
|
|
@ -129,6 +129,115 @@ BEGIN;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
-- also test the infrastructure that is used for supporting
|
||||||
|
-- TRUNCATE from worker nodes
|
||||||
|
-- should fail since it is not in transaction block
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
ERROR: lock_relation_if_exists can only be used in transaction blocks
|
||||||
|
BEGIN;
|
||||||
|
-- should fail since the schema is not provided
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- should work since the schema is in the search path
|
||||||
|
SET search_path TO 'truncate_from_workers';
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- should return false since there is no such table
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- should fail since there is no such lock mode
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE');
|
||||||
|
ERROR: unknown lock mode: MY LOCK MODE
|
||||||
|
ROLLBACK;
|
||||||
|
BEGIN;
|
||||||
|
-- test all lock levels
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE');
|
||||||
|
lock_relation_if_exists
|
||||||
|
-------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- see them all
|
||||||
|
SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC;
|
||||||
|
relation | mode
|
||||||
|
--------------------------------------------+--------------------------
|
||||||
|
truncate_from_workers.on_update_fkey_table | ShareUpdateExclusiveLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | ShareRowExclusiveLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | ShareLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | RowShareLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | RowExclusiveLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | ExclusiveLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | AccessShareLock
|
||||||
|
truncate_from_workers.on_update_fkey_table | AccessExclusiveLock
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
DROP SCHEMA truncate_from_workers CASCADE;
|
DROP SCHEMA truncate_from_workers CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to 2 other objects
|
||||||
DETAIL: drop cascades to table truncate_from_workers.referece_table
|
DETAIL: drop cascades to table truncate_from_workers.referece_table
|
||||||
|
|
|
@ -234,8 +234,7 @@ SET dept = 5
|
||||||
FROM cte_1
|
FROM cte_1
|
||||||
WHERE distributed_table.tenant_id < cte_1.tenant_id;
|
WHERE distributed_table.tenant_id < cte_1.tenant_id;
|
||||||
|
|
||||||
SET citus.explain_all_tasks TO ON;
|
WITH cte_1 AS (
|
||||||
EXPLAIN (COSTS FALSE) WITH cte_1 AS (
|
|
||||||
WITH cte_2 AS (
|
WITH cte_2 AS (
|
||||||
SELECT tenant_id as cte2_id
|
SELECT tenant_id as cte2_id
|
||||||
FROM second_distributed_table
|
FROM second_distributed_table
|
||||||
|
|
|
@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-1';
|
ALTER EXTENSION citus UPDATE TO '8.0-1';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-2';
|
ALTER EXTENSION citus UPDATE TO '8.0-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '8.0-4';
|
||||||
|
|
||||||
-- show running version
|
-- show running version
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
|
|
|
@ -95,6 +95,50 @@ RESET client_min_messages;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
-- also test the infrastructure that is used for supporting
|
||||||
|
-- TRUNCATE from worker nodes
|
||||||
|
|
||||||
|
-- should fail since it is not in transaction block
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- should fail since the schema is not provided
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- should work since the schema is in the search path
|
||||||
|
SET search_path TO 'truncate_from_workers';
|
||||||
|
SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- should return false since there is no such table
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- should fail since there is no such lock mode
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- test all lock levels
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE');
|
||||||
|
SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE');
|
||||||
|
|
||||||
|
-- see them all
|
||||||
|
SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
DROP SCHEMA truncate_from_workers CASCADE;
|
DROP SCHEMA truncate_from_workers CASCADE;
|
||||||
|
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
|
|
Loading…
Reference in New Issue