From 5cf8fbe7b607cb2fd29f66f749d8dfe15fe03382 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 7 Sep 2018 14:19:43 +0300 Subject: [PATCH 1/3] Add infrastructure to relation if exists --- citus.control | 2 +- src/backend/distributed/Makefile | 4 +- .../distributed/citus--8.0-3--8.0-4.sql | 11 ++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 2 +- .../distributed/master/master_create_shards.c | 2 +- .../master/master_modify_multiple_shards.c | 109 +++++++++++++++++- .../distributed/master/master_node_protocol.c | 13 +-- .../master/master_stage_protocol.c | 2 +- .../distributed/utils/colocation_utils.c | 2 +- src/backend/distributed/utils/node_metadata.c | 4 +- .../worker/worker_file_access_protocol.c | 2 +- src/include/distributed/master_protocol.h | 2 +- src/test/regress/expected/multi_extension.out | 1 + .../multi_mx_truncate_from_worker.out | 109 ++++++++++++++++++ src/test/regress/sql/multi_extension.sql | 1 + .../sql/multi_mx_truncate_from_worker.sql | 44 +++++++ 17 files changed, 293 insertions(+), 19 deletions(-) create mode 100644 src/backend/distributed/citus--8.0-3--8.0-4.sql diff --git a/citus.control b/citus.control index cdd186d36..28db138ec 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-3' +default_version = '8.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index e7afea3e4..6e6787b06 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 + 8.0-1 8.0-2 8.0-3 8.0-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -221,6 +221,8 @@ $(EXTENSION)--8.0-2.sql: $(EXTENSION)--8.0-1.sql $(EXTENSION)--8.0-1--8.0-2.sql cat $^ > $@ $(EXTENSION)--8.0-3.sql: $(EXTENSION)--8.0-2.sql $(EXTENSION)--8.0-2--8.0-3.sql cat $^ > $@ +$(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-3--8.0-4.sql b/src/backend/distributed/citus--8.0-3--8.0-4.sql new file mode 100644 index 000000000..7589055e9 --- /dev/null +++ b/src/backend/distributed/citus--8.0-3--8.0-4.sql @@ -0,0 +1,11 @@ +/* citus--8.0-3--8.0-4 */ +SET search_path = 'pg_catalog'; + +CREATE OR REPLACE FUNCTION lock_relation_if_exists(table_name text, lock_mode text) +RETURNS BOOL +LANGUAGE C STRICT as 'MODULE_PATHNAME', +$$lock_relation_if_exists$$; +COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text) +IS 'locks relation in the lock_mode if the relation exists'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index cdd186d36..28db138ec 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-3' +default_version = '8.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0c2172255..3e3c4ca56 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -591,7 +591,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, else { text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); - Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText); + Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false); EnsureTableCanBeColocatedWith(relationId, replicationModel, distributionColumnType, sourceRelationId); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index d8b470475..78a4a00df 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -67,7 +67,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) int32 shardCount = PG_GETARG_INT32(1); int32 replicationFactor = PG_GETARG_INT32(2); - Oid distributedTableId = ResolveRelationId(tableNameText); + Oid distributedTableId = ResolveRelationId(tableNameText, false); /* do not add any data */ bool useExclusiveConnections = false; diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 3c1b8fea8..43c8c5397 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -56,14 +56,18 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" - +#if (PG_VERSION_NUM >= 100000) +#include "utils/varlena.h" +#endif static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); +static LOCKMODE LockModeTextToLockMode(const char *lockModeName); PG_FUNCTION_INFO_V1(master_modify_multiple_shards); +PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -312,3 +316,106 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command) return false; } + + +/* + * lock_relation_if_exists gets a relation name and lock mode + * and returns true if the relation exists and can be locked with + * the given lock mode. If the relation doesn't exists, the function + * return false. + * + * The relation name should be qualified with the schema name. + * + * The function errors out of the lockmode isn't defined in the PostgreSQL's + * explicit locking table. + */ +Datum +lock_relation_if_exists(PG_FUNCTION_ARGS) +{ + text *relationName = PG_GETARG_TEXT_P(0); + text *lockModeText = PG_GETARG_TEXT_P(1); + + Oid relationId = InvalidOid; + char *lockModeCString = text_to_cstring(lockModeText); + List *relationNameList = NIL; + RangeVar *relation = NULL; + LOCKMODE lockMode = NoLock; + + /* ensure that we're in a transaction block */ + RequireTransactionChain(true, "lock_relation_if_exists"); + + relationId = ResolveRelationId(relationName, true); + if (!OidIsValid(relationId)) + { + PG_RETURN_BOOL(false); + } + + /* get the lock mode */ + lockMode = LockModeTextToLockMode(lockModeCString); + + + /* resolve relationId from passed in schema and relation name */ + relationNameList = textToQualifiedNameList(relationName); + relation = makeRangeVarFromNameList(relationNameList); + + /* lock the relation with the lock mode */ + RangeVarGetRelid(relation, lockMode, false); + + PG_RETURN_BOOL(true); +} + + +/* + * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +static LOCKMODE +LockModeTextToLockMode(const char *lockModeName) +{ + if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0) + { + /* there is no explict call for NoLock, but keeping it here for convinience */ + return NoLock; + } + else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0) + { + return AccessShareLock; + } + else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0) + { + return RowShareLock; + } + else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return RowExclusiveLock; + } + else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ShareUpdateExclusiveLock; + } + else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0) + { + return ShareLock; + } + else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ShareRowExclusiveLock; + } + else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ExclusiveLock; + } + else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return AccessExclusiveLock; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode: %s", lockModeName))); + } + + return NoLock; +} diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index acff6fac4..14ce04950 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -98,7 +98,7 @@ Datum master_get_table_metadata(PG_FUNCTION_ARGS) { text *relationName = PG_GETARG_TEXT_P(0); - Oid relationId = ResolveRelationId(relationName); + Oid relationId = ResolveRelationId(relationName, false); DistTableCacheEntry *partitionEntry = NULL; char *partitionKeyString = NULL; @@ -215,7 +215,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { text *relationName = PG_GETARG_TEXT_P(0); - Oid relationId = ResolveRelationId(relationName); + Oid relationId = ResolveRelationId(relationName, false); bool includeSequenceDefaults = true; MemoryContext oldContext = NULL; @@ -320,7 +320,7 @@ GetNextShardId() } sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME); - sequenceId = ResolveRelationId(sequenceName); + sequenceId = ResolveRelationId(sequenceName, false); sequenceIdDatum = ObjectIdGetDatum(sequenceId); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); @@ -399,7 +399,7 @@ GetNextPlacementId(void) } sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); - sequenceId = ResolveRelationId(sequenceName); + sequenceId = ResolveRelationId(sequenceName, false); sequenceIdDatum = ObjectIdGetDatum(sequenceId); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); @@ -527,17 +527,16 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) /* Finds the relationId from a potentially qualified relation name. */ Oid -ResolveRelationId(text *relationName) +ResolveRelationId(text *relationName, bool missingOk) { List *relationNameList = NIL; RangeVar *relation = NULL; Oid relationId = InvalidOid; - bool failOK = false; /* error if relation cannot be found */ /* resolve relationId from passed in schema and relation name */ relationNameList = textToQualifiedNameList(relationName); relation = makeRangeVarFromNameList(relationNameList); - relationId = RangeVarGetRelid(relation, NoLock, failOK); + relationId = RangeVarGetRelid(relation, NoLock, missingOk); return relationId; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 9a21ff820..9bb487ecf 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -91,7 +91,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char partitionMethod = 0; char storageType = SHARD_STORAGE_TABLE; - Oid relationId = ResolveRelationId(relationNameText); + Oid relationId = ResolveRelationId(relationNameText, false); char relationKind = get_rel_relkind(relationId); char replicationModel = REPLICATION_MODEL_INVALID; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index eb3573672..d7c0e9a2a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -546,7 +546,7 @@ uint32 GetNextColocationId() { text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 0be0cb807..a6420974c 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -1165,7 +1165,7 @@ int32 GetNextGroupId() { text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -1227,7 +1227,7 @@ int GetNextNodeId() { text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index 9ec8882b2..ee1aba168 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -36,7 +36,7 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) { text *foreignTableName = PG_GETARG_TEXT_P(0); text *foreignFilePath = NULL; - Oid relationId = ResolveRelationId(foreignTableName); + Oid relationId = ResolveRelationId(foreignTableName, false); ForeignTable *foreignTable = GetForeignTable(relationId); ListCell *optionCell = NULL; diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index a9d2b4cb9..04f619d6a 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -103,7 +103,7 @@ extern bool IsCoordinator(void); extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); -extern Oid ResolveRelationId(text *relationName); +extern Oid ResolveRelationId(text *relationName, bool failOK); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation); extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation); extern List * GetTableIndexAndConstraintCommands(Oid relationId); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 126a50d61..c14785c18 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7'; ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; +ALTER EXTENSION citus UPDATE TO '8.0-4'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index 0d3b75bc4..314a5a505 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -129,6 +129,115 @@ BEGIN; ROLLBACK; RESET client_min_messages; \c - - - :master_port +-- also test the infrastructure that is used for supporting +-- TRUNCATE from worker nodes +-- should fail since it is not in transaction block +SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ERROR: lock_relation_if_exists can only be used in transaction blocks +BEGIN; + -- should fail since the schema is not provided + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + f +(1 row) + +ROLLBACK; +BEGIN; + -- should work since the schema is in the search path + SET search_path TO 'truncate_from_workers'; + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + +ROLLBACK; +BEGIN; + -- should return false since there is no such table + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + f +(1 row) + +ROLLBACK; +BEGIN; + -- should fail since there is no such lock mode + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE'); +ERROR: unknown lock mode: MY LOCK MODE +ROLLBACK; +BEGIN; + -- test all lock levels + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + -- see them all + SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC; + relation | mode +--------------------------------------------+-------------------------- + truncate_from_workers.on_update_fkey_table | ShareUpdateExclusiveLock + truncate_from_workers.on_update_fkey_table | ShareRowExclusiveLock + truncate_from_workers.on_update_fkey_table | ShareLock + truncate_from_workers.on_update_fkey_table | RowShareLock + truncate_from_workers.on_update_fkey_table | RowExclusiveLock + truncate_from_workers.on_update_fkey_table | ExclusiveLock + truncate_from_workers.on_update_fkey_table | AccessShareLock + truncate_from_workers.on_update_fkey_table | AccessExclusiveLock +(8 rows) + +COMMIT; DROP SCHEMA truncate_from_workers CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table truncate_from_workers.referece_table diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 84253991c..822dc0555 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7'; ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; +ALTER EXTENSION citus UPDATE TO '8.0-4'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index 00bdd13e6..372a23c45 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -95,6 +95,50 @@ RESET client_min_messages; \c - - - :master_port +-- also test the infrastructure that is used for supporting +-- TRUNCATE from worker nodes + +-- should fail since it is not in transaction block +SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + +BEGIN; + -- should fail since the schema is not provided + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ROLLBACK; + +BEGIN; + -- should work since the schema is in the search path + SET search_path TO 'truncate_from_workers'; + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ROLLBACK; + +BEGIN; + -- should return false since there is no such table + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE'); +ROLLBACK; + + +BEGIN; + -- should fail since there is no such lock mode + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE'); +ROLLBACK; + +BEGIN; + -- test all lock levels + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE'); + + -- see them all + SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC; +COMMIT; + DROP SCHEMA truncate_from_workers CASCADE; SET search_path TO public; From 76aa6951c202a8a4fc09d311aeef9e79ec420f38 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 7 Sep 2018 14:42:30 +0300 Subject: [PATCH 2/3] Properly send commands to other nodes We previously implemented OTHER_WORKERS_WITH_METADATA tag. However, that was wrong. See the related discussion: https://github.com/citusdata/citus/issues/2320 Instead, we switched using OTHER_WORKER_NODES and make the command that we're running optional such that even if the node is not a metadata node, we won't be in trouble. --- .../master/master_modify_multiple_shards.c | 28 +++++++++---------- .../transaction/worker_transaction.c | 10 +++---- src/include/distributed/errormessage.h | 6 ++-- src/include/distributed/master_protocol.h | 3 +- src/include/distributed/version_compat.h | 1 + src/include/distributed/worker_transaction.h | 2 +- 6 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 43c8c5397..885d8a10e 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -43,6 +43,7 @@ #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_pruning.h" +#include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "optimizer/clauses.h" @@ -60,6 +61,10 @@ #include "utils/varlena.h" #endif +#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" +#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE" + + static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); @@ -208,27 +213,23 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); /* - * We should execute "TRUNCATE table_name;" on the other worker nodes before + * We lock the relation we're TRUNCATING on the other worker nodes before * executing the truncate commands on the shards. This is necessary to prevent * distributed deadlocks where a concurrent operation on the same table (or a * cascading table) is executed on the other nodes. * - * Note that we should skip the current node to prevent a self-deadlock. + * Note that we should skip the current node to prevent a self-deadlock that's why + * we use OTHER_WORKERS tag. */ if (truncateOperation && ShouldSyncTableMetadata(relationId)) { - SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, - DISABLE_DDL_PROPAGATION); + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + StringInfo lockRelation = makeStringInfo(); + appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName, + REMOTE_LOCK_MODE_FOR_TRUNCATE); - /* - * Note that here we ignore the schema and send the queryString as is - * since citus_truncate_trigger already uses qualified table name. - * If that was not the case, we should also had to set the search path - * as we do for regular DDLs. - */ - SendCommandToWorkers(OTHER_WORKERS_WITH_METADATA, - queryString); + SendCommandToWorkers(OTHER_WORKERS, lockRelation->data); } if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) @@ -342,7 +343,7 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) LOCKMODE lockMode = NoLock; /* ensure that we're in a transaction block */ - RequireTransactionChain(true, "lock_relation_if_exists"); + RequireTransactionBlock(true, "lock_relation_if_exists"); relationId = ResolveRelationId(relationName, true); if (!OidIsValid(relationId)) @@ -353,7 +354,6 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) /* get the lock mode */ lockMode = LockModeTextToLockMode(lockModeCString); - /* resolve relationId from passed in schema and relation name */ relationNameList = textToQualifiedNameList(relationName); relation = makeRangeVarFromNameList(relationNameList); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 4e4db59d2..152499843 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -117,14 +117,13 @@ SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList) int nodePort = workerNode->workerPort; int connectionFlags = FORCE_NEW_CONNECTION; - if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == GetLocalGroupId()) { continue; @@ -177,14 +176,13 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command, MultiConnection *connection = NULL; int connectionFlags = 0; - if ((targetWorkerSet == WORKERS_WITH_METADATA || - targetWorkerSet == OTHER_WORKERS_WITH_METADATA) && + if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata) { continue; } - if (targetWorkerSet == OTHER_WORKERS_WITH_METADATA && + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == GetLocalGroupId()) { continue; diff --git a/src/include/distributed/errormessage.h b/src/include/distributed/errormessage.h index 4626da6b6..0eeb7fd04 100644 --- a/src/include/distributed/errormessage.h +++ b/src/include/distributed/errormessage.h @@ -56,7 +56,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel); \ if (__builtin_constant_p(elevel) && (elevel) >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) #else /* !HAVE_BUILTIN_CONSTANT_P */ #define RaiseDeferredError(error, elevel) \ do { \ @@ -64,7 +65,8 @@ DeferredErrorMessage * DeferredErrorInternal(int code, const char *message, cons RaiseDeferredErrorInternal(error, elevel_); \ if (elevel_ >= ERROR) { \ pg_unreachable(); } \ - } while (0) + } \ + while (0) #endif /* HAVE_BUILTIN_CONSTANT_P */ void RaiseDeferredErrorInternal(DeferredErrorMessage *error, int elevel); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 04f619d6a..f68051283 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -103,7 +103,7 @@ extern bool IsCoordinator(void); extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); -extern Oid ResolveRelationId(text *relationName, bool failOK); +extern Oid ResolveRelationId(text *relationName, bool missingOk); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation); extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation); extern List * GetTableIndexAndConstraintCommands(Oid relationId); @@ -151,6 +151,7 @@ extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); extern Datum master_drop_sequences(PG_FUNCTION_ARGS); extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS); +extern Datum lock_relation_if_exists(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); /* function declarations for shard creation functionality */ diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 9589825d3..19c8025b9 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -37,6 +37,7 @@ /* following functions are renamed in PG11 */ #define PreventInTransactionBlock PreventTransactionChain #define DatumGetJsonbP(d) DatumGetJsonb(d) +#define RequireTransactionBlock RequireTransactionChain /* following defines also exist for PG11 */ #define RELATION_OBJECT_TYPE ACL_OBJECT_RELATION diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 048e684cc..73c1eb3a2 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -22,7 +22,7 @@ typedef enum TargetWorkerSet { WORKERS_WITH_METADATA, - OTHER_WORKERS_WITH_METADATA, + OTHER_WORKERS, ALL_WORKERS } TargetWorkerSet; From 7de5e304325bbdbd6c4c829288e7e3222ef0b51d Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 7 Sep 2018 15:34:02 +0300 Subject: [PATCH 3/3] Change flaky explain test to non-explain This test's output changes depending on which worker is picked for explain (e.g., worker port in the output changes). Given that the test is only aiming to ensure that CTEs inside CTEs work fine in DML queries, it should be fine to get rid of the EXPLAIN. The output is verified to be correct as well. --- src/test/regress/expected/dml_recursive.out | 62 +-------------------- src/test/regress/sql/dml_recursive.sql | 3 +- 2 files changed, 2 insertions(+), 63 deletions(-) diff --git a/src/test/regress/expected/dml_recursive.out b/src/test/regress/expected/dml_recursive.out index 5b1a09dc6..2299da913 100644 --- a/src/test/regress/expected/dml_recursive.out +++ b/src/test/regress/expected/dml_recursive.out @@ -295,8 +295,7 @@ DEBUG: common table expressions are not supported in distributed modifications DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: common table expressions are not supported in distributed modifications DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) -SET citus.explain_all_tasks TO ON; -EXPLAIN (COSTS FALSE) WITH cte_1 AS ( +WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id FROM second_distributed_table @@ -315,65 +314,6 @@ DEBUG: common table expressions are not supported in distributed modifications DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.>=) 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info DEBUG: common table expressions are not supported in distributed modifications DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) cte_1.tenant_id) - QUERY PLAN ------------------------------------------------------------------------------------------------- - Custom Scan (Citus Router) - -> Distributed Subplan 22_1 - -> Custom Scan (Citus Router) - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57638 dbname=regression - -> Update on distributed_table_2370000 distributed_table - -> Seq Scan on distributed_table_2370000 distributed_table - -> Task - Node: host=localhost port=57637 dbname=regression - -> Update on distributed_table_2370001 distributed_table - -> Seq Scan on distributed_table_2370001 distributed_table - -> Task - Node: host=localhost port=57638 dbname=regression - -> Update on distributed_table_2370002 distributed_table - -> Seq Scan on distributed_table_2370002 distributed_table - -> Task - Node: host=localhost port=57637 dbname=regression - -> Update on distributed_table_2370003 distributed_table - -> Seq Scan on distributed_table_2370003 distributed_table - Task Count: 4 - Tasks Shown: All - -> Task - Node: host=localhost port=57638 dbname=regression - -> Update on distributed_table_2370000 distributed_table - -> Nested Loop - Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id) - -> Function Scan on read_intermediate_result intermediate_result - -> Materialize - -> Seq Scan on distributed_table_2370000 distributed_table - -> Task - Node: host=localhost port=57637 dbname=regression - -> Update on distributed_table_2370001 distributed_table - -> Nested Loop - Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id) - -> Function Scan on read_intermediate_result intermediate_result - -> Materialize - -> Seq Scan on distributed_table_2370001 distributed_table - -> Task - Node: host=localhost port=57638 dbname=regression - -> Update on distributed_table_2370002 distributed_table - -> Nested Loop - Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id) - -> Function Scan on read_intermediate_result intermediate_result - -> Materialize - -> Seq Scan on distributed_table_2370002 distributed_table - -> Task - Node: host=localhost port=57637 dbname=regression - -> Update on distributed_table_2370003 distributed_table - -> Nested Loop - Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id) - -> Function Scan on read_intermediate_result intermediate_result - -> Materialize - -> Seq Scan on distributed_table_2370003 distributed_table -(55 rows) - -- we don't support updating local table with a join with -- distributed tables UPDATE diff --git a/src/test/regress/sql/dml_recursive.sql b/src/test/regress/sql/dml_recursive.sql index 678aff0f2..4ee7a3121 100644 --- a/src/test/regress/sql/dml_recursive.sql +++ b/src/test/regress/sql/dml_recursive.sql @@ -234,8 +234,7 @@ SET dept = 5 FROM cte_1 WHERE distributed_table.tenant_id < cte_1.tenant_id; -SET citus.explain_all_tasks TO ON; -EXPLAIN (COSTS FALSE) WITH cte_1 AS ( +WITH cte_1 AS ( WITH cte_2 AS ( SELECT tenant_id as cte2_id FROM second_distributed_table