diff --git a/citus.control b/citus.control index cdd186d36..28db138ec 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-3' +default_version = '8.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index e7afea3e4..6e6787b06 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 + 8.0-1 8.0-2 8.0-3 8.0-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -221,6 +221,8 @@ $(EXTENSION)--8.0-2.sql: $(EXTENSION)--8.0-1.sql $(EXTENSION)--8.0-1--8.0-2.sql cat $^ > $@ $(EXTENSION)--8.0-3.sql: $(EXTENSION)--8.0-2.sql $(EXTENSION)--8.0-2--8.0-3.sql cat $^ > $@ +$(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-3--8.0-4.sql b/src/backend/distributed/citus--8.0-3--8.0-4.sql new file mode 100644 index 000000000..7589055e9 --- /dev/null +++ b/src/backend/distributed/citus--8.0-3--8.0-4.sql @@ -0,0 +1,11 @@ +/* citus--8.0-3--8.0-4 */ +SET search_path = 'pg_catalog'; + +CREATE OR REPLACE FUNCTION lock_relation_if_exists(table_name text, lock_mode text) +RETURNS BOOL +LANGUAGE C STRICT as 'MODULE_PATHNAME', +$$lock_relation_if_exists$$; +COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text) +IS 'locks relation in the lock_mode if the relation exists'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index cdd186d36..28db138ec 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-3' +default_version = '8.0-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0c2172255..3e3c4ca56 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -591,7 +591,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, else { text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); - Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText); + Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false); EnsureTableCanBeColocatedWith(relationId, replicationModel, distributionColumnType, sourceRelationId); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index d8b470475..78a4a00df 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -67,7 +67,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) int32 shardCount = PG_GETARG_INT32(1); int32 replicationFactor = PG_GETARG_INT32(2); - Oid distributedTableId = ResolveRelationId(tableNameText); + Oid distributedTableId = ResolveRelationId(tableNameText, false); /* do not add any data */ bool useExclusiveConnections = false; diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 3c1b8fea8..43c8c5397 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -56,14 +56,18 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" - +#if (PG_VERSION_NUM >= 100000) +#include "utils/varlena.h" +#endif static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); +static LOCKMODE LockModeTextToLockMode(const char *lockModeName); PG_FUNCTION_INFO_V1(master_modify_multiple_shards); +PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -312,3 +316,106 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command) return false; } + + +/* + * lock_relation_if_exists gets a relation name and lock mode + * and returns true if the relation exists and can be locked with + * the given lock mode. If the relation doesn't exists, the function + * return false. + * + * The relation name should be qualified with the schema name. + * + * The function errors out of the lockmode isn't defined in the PostgreSQL's + * explicit locking table. + */ +Datum +lock_relation_if_exists(PG_FUNCTION_ARGS) +{ + text *relationName = PG_GETARG_TEXT_P(0); + text *lockModeText = PG_GETARG_TEXT_P(1); + + Oid relationId = InvalidOid; + char *lockModeCString = text_to_cstring(lockModeText); + List *relationNameList = NIL; + RangeVar *relation = NULL; + LOCKMODE lockMode = NoLock; + + /* ensure that we're in a transaction block */ + RequireTransactionChain(true, "lock_relation_if_exists"); + + relationId = ResolveRelationId(relationName, true); + if (!OidIsValid(relationId)) + { + PG_RETURN_BOOL(false); + } + + /* get the lock mode */ + lockMode = LockModeTextToLockMode(lockModeCString); + + + /* resolve relationId from passed in schema and relation name */ + relationNameList = textToQualifiedNameList(relationName); + relation = makeRangeVarFromNameList(relationNameList); + + /* lock the relation with the lock mode */ + RangeVarGetRelid(relation, lockMode, false); + + PG_RETURN_BOOL(true); +} + + +/* + * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +static LOCKMODE +LockModeTextToLockMode(const char *lockModeName) +{ + if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0) + { + /* there is no explict call for NoLock, but keeping it here for convinience */ + return NoLock; + } + else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0) + { + return AccessShareLock; + } + else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0) + { + return RowShareLock; + } + else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return RowExclusiveLock; + } + else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ShareUpdateExclusiveLock; + } + else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0) + { + return ShareLock; + } + else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ShareRowExclusiveLock; + } + else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return ExclusiveLock; + } + else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) + { + return AccessExclusiveLock; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode: %s", lockModeName))); + } + + return NoLock; +} diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index acff6fac4..14ce04950 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -98,7 +98,7 @@ Datum master_get_table_metadata(PG_FUNCTION_ARGS) { text *relationName = PG_GETARG_TEXT_P(0); - Oid relationId = ResolveRelationId(relationName); + Oid relationId = ResolveRelationId(relationName, false); DistTableCacheEntry *partitionEntry = NULL; char *partitionKeyString = NULL; @@ -215,7 +215,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { text *relationName = PG_GETARG_TEXT_P(0); - Oid relationId = ResolveRelationId(relationName); + Oid relationId = ResolveRelationId(relationName, false); bool includeSequenceDefaults = true; MemoryContext oldContext = NULL; @@ -320,7 +320,7 @@ GetNextShardId() } sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME); - sequenceId = ResolveRelationId(sequenceName); + sequenceId = ResolveRelationId(sequenceName, false); sequenceIdDatum = ObjectIdGetDatum(sequenceId); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); @@ -399,7 +399,7 @@ GetNextPlacementId(void) } sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); - sequenceId = ResolveRelationId(sequenceName); + sequenceId = ResolveRelationId(sequenceName, false); sequenceIdDatum = ObjectIdGetDatum(sequenceId); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); @@ -527,17 +527,16 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS) /* Finds the relationId from a potentially qualified relation name. */ Oid -ResolveRelationId(text *relationName) +ResolveRelationId(text *relationName, bool missingOk) { List *relationNameList = NIL; RangeVar *relation = NULL; Oid relationId = InvalidOid; - bool failOK = false; /* error if relation cannot be found */ /* resolve relationId from passed in schema and relation name */ relationNameList = textToQualifiedNameList(relationName); relation = makeRangeVarFromNameList(relationNameList); - relationId = RangeVarGetRelid(relation, NoLock, failOK); + relationId = RangeVarGetRelid(relation, NoLock, missingOk); return relationId; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 9a21ff820..9bb487ecf 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -91,7 +91,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char partitionMethod = 0; char storageType = SHARD_STORAGE_TABLE; - Oid relationId = ResolveRelationId(relationNameText); + Oid relationId = ResolveRelationId(relationNameText, false); char relationKind = get_rel_relkind(relationId); char replicationModel = REPLICATION_MODEL_INVALID; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index eb3573672..d7c0e9a2a 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -546,7 +546,7 @@ uint32 GetNextColocationId() { text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 0be0cb807..a6420974c 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -1165,7 +1165,7 @@ int32 GetNextGroupId() { text *sequenceName = cstring_to_text(GROUPID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -1227,7 +1227,7 @@ int GetNextNodeId() { text *sequenceName = cstring_to_text(NODEID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); + Oid sequenceId = ResolveRelationId(sequenceName, false); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Oid savedUserId = InvalidOid; int savedSecurityContext = 0; diff --git a/src/backend/distributed/worker/worker_file_access_protocol.c b/src/backend/distributed/worker/worker_file_access_protocol.c index 9ec8882b2..ee1aba168 100644 --- a/src/backend/distributed/worker/worker_file_access_protocol.c +++ b/src/backend/distributed/worker/worker_file_access_protocol.c @@ -36,7 +36,7 @@ worker_foreign_file_path(PG_FUNCTION_ARGS) { text *foreignTableName = PG_GETARG_TEXT_P(0); text *foreignFilePath = NULL; - Oid relationId = ResolveRelationId(foreignTableName); + Oid relationId = ResolveRelationId(foreignTableName, false); ForeignTable *foreignTable = GetForeignTable(relationId); ListCell *optionCell = NULL; diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index a9d2b4cb9..04f619d6a 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -103,7 +103,7 @@ extern bool IsCoordinator(void); extern bool CStoreTable(Oid relationId); extern uint64 GetNextShardId(void); extern uint64 GetNextPlacementId(void); -extern Oid ResolveRelationId(text *relationName); +extern Oid ResolveRelationId(text *relationName, bool failOK); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation); extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation); extern List * GetTableIndexAndConstraintCommands(Oid relationId); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 126a50d61..c14785c18 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7'; ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; +ALTER EXTENSION citus UPDATE TO '8.0-4'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index 0d3b75bc4..314a5a505 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -129,6 +129,115 @@ BEGIN; ROLLBACK; RESET client_min_messages; \c - - - :master_port +-- also test the infrastructure that is used for supporting +-- TRUNCATE from worker nodes +-- should fail since it is not in transaction block +SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ERROR: lock_relation_if_exists can only be used in transaction blocks +BEGIN; + -- should fail since the schema is not provided + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + f +(1 row) + +ROLLBACK; +BEGIN; + -- should work since the schema is in the search path + SET search_path TO 'truncate_from_workers'; + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + +ROLLBACK; +BEGIN; + -- should return false since there is no such table + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + f +(1 row) + +ROLLBACK; +BEGIN; + -- should fail since there is no such lock mode + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE'); +ERROR: unknown lock mode: MY LOCK MODE +ROLLBACK; +BEGIN; + -- test all lock levels + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE'); + lock_relation_if_exists +------------------------- + t +(1 row) + + -- see them all + SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC; + relation | mode +--------------------------------------------+-------------------------- + truncate_from_workers.on_update_fkey_table | ShareUpdateExclusiveLock + truncate_from_workers.on_update_fkey_table | ShareRowExclusiveLock + truncate_from_workers.on_update_fkey_table | ShareLock + truncate_from_workers.on_update_fkey_table | RowShareLock + truncate_from_workers.on_update_fkey_table | RowExclusiveLock + truncate_from_workers.on_update_fkey_table | ExclusiveLock + truncate_from_workers.on_update_fkey_table | AccessShareLock + truncate_from_workers.on_update_fkey_table | AccessExclusiveLock +(8 rows) + +COMMIT; DROP SCHEMA truncate_from_workers CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table truncate_from_workers.referece_table diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 84253991c..822dc0555 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -146,6 +146,7 @@ ALTER EXTENSION citus UPDATE TO '7.5-7'; ALTER EXTENSION citus UPDATE TO '8.0-1'; ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; +ALTER EXTENSION citus UPDATE TO '8.0-4'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index 00bdd13e6..372a23c45 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -95,6 +95,50 @@ RESET client_min_messages; \c - - - :master_port +-- also test the infrastructure that is used for supporting +-- TRUNCATE from worker nodes + +-- should fail since it is not in transaction block +SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); + +BEGIN; + -- should fail since the schema is not provided + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ROLLBACK; + +BEGIN; + -- should work since the schema is in the search path + SET search_path TO 'truncate_from_workers'; + SELECT lock_relation_if_exists('on_update_fkey_table', 'ACCESS SHARE'); +ROLLBACK; + +BEGIN; + -- should return false since there is no such table + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_tableXXX', 'ACCESS SHARE'); +ROLLBACK; + + +BEGIN; + -- should fail since there is no such lock mode + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'MY LOCK MODE'); +ROLLBACK; + +BEGIN; + -- test all lock levels + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE UPDATE EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'SHARE ROW EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'EXCLUSIVE'); + SELECT lock_relation_if_exists('truncate_from_workers.on_update_fkey_table', 'ACCESS EXCLUSIVE'); + + -- see them all + SELECT relation::regclass, mode FROM pg_locks WHERE pid = pg_backend_pid() AND relation = 'truncate_from_workers.on_update_fkey_table'::regclass ORDER BY 2 DESC; +COMMIT; + DROP SCHEMA truncate_from_workers CASCADE; SET search_path TO public;