From e13da6a343c44ef7c6bd453fe943e3589df397f7 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 1 Aug 2018 10:25:53 +0300 Subject: [PATCH] Add infrastructure to hide shards on MX worker nodes Add ability to understand whether a table is a known shard on MX workers. Note that this is only useful and applicable for hiding shards on MX worker nodes given that we can have metadata only there. --- .../worker/worker_data_fetch_protocol.c | 31 +++-- .../worker/worker_shard_visibility.c | 118 ++++++++++++++++++ src/include/distributed/worker_protocol.h | 2 + 3 files changed, 143 insertions(+), 8 deletions(-) create mode 100644 src/backend/distributed/worker/worker_shard_visibility.c diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index cb4939086..f7ef65f29 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -62,7 +62,6 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort, static void ReceiveResourceCleanup(int32 connectionId, const char *filename, int32 fileDescriptor); static void CitusDeleteFile(const char *filename); -static uint64 ExtractShardId(const char *tableName); static bool check_log_statement(List *stmt_list); static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName); static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg); @@ -526,9 +525,13 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) } -/* Extracts shard id from the given table name, and returns it. */ -static uint64 -ExtractShardId(const char *tableName) +/* + * ExtractShardId tries to extract shard id from the given table name, + * and returns the shard id if table name is formatted as shard name. + * Else, the function returns INVALID_SHARD_ID. + */ +uint64 +ExtractShardId(const char *tableName, bool missingOk) { uint64 shardId = 0; char *shardIdString = NULL; @@ -536,11 +539,16 @@ ExtractShardId(const char *tableName) /* find the last underscore and increment for shardId string */ shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR); - if (shardIdString == NULL) + if (shardIdString == NULL && !missingOk) { ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", tableName))); } + else if (shardIdString == NULL && missingOk) + { + return INVALID_SHARD_ID; + } + shardIdString++; errno = 0; @@ -548,8 +556,15 @@ ExtractShardId(const char *tableName) if (errno != 0 || (*shardIdStringEnd != '\0')) { - ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", - tableName))); + if (!missingOk) + { + ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", + tableName))); + } + else + { + return INVALID_SHARD_ID; + } } return shardId; @@ -761,7 +776,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) * the transaction for this function commits, this lock will automatically * be released. This ensures appends to a shard happen in a serial manner. */ - shardId = ExtractShardId(shardTableName); + shardId = ExtractShardId(shardTableName, false); LockShardResource(shardId, AccessExclusiveLock); /* copy remote table's data to this node */ diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c new file mode 100644 index 000000000..808f53cdb --- /dev/null +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -0,0 +1,118 @@ +/* + * worker_shard_visibility.c + * + * TODO: Write some meaningful comment + */ + +#include "postgres.h" + +#include "catalog/namespace.h" +#include "distributed/metadata_cache.h" +#include "distributed/worker_protocol.h" +#include "utils/lsyscache.h" + + +PG_FUNCTION_INFO_V1(relation_is_a_known_shard); + + +static bool RelationIsAKnownShard(Oid shardRelationId); + + +Datum +relation_is_a_known_shard(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + + PG_RETURN_BOOL(RelationIsAKnownShard(relationId)); +} + +/* + * Given a relationId, check whether it's a shard of any distributed table. + * We can only do that in MX since we've metadata there. We can actually + * implement for non-mx as well, but, there is currently no need for that. + * + * TODO: improve the comment + * TODO: Make sure that we're not missing any edge cases with our + * implementation + */ +bool +RelationIsAKnownShard(Oid shardRelationId) +{ + int localGroupId = -1; + char *shardRelationName = NULL; + char *relationName = NULL; + bool missingOk = true; + uint64 shardId = INVALID_SHARD_ID; + ShardInterval *shardInterval = NULL; + Oid relationId = InvalidOid; + char *shardIdString = NULL; + int relationNameLength = 0; + + + /* + * TODO: version check + */ + + if (!OidIsValid(shardRelationId)) + { + /* we cannot continue without a valid Oid */ + return false; + } + + localGroupId = GetLocalGroupId(); + if (localGroupId == 0) + { + /* + * We're not interested in shards in the coordinator + * or non-mx worker nodes. + */ + return false; + } + + shardRelationName = get_rel_name(shardRelationId); + + /* find the last underscore and increment for shardId string */ + shardIdString = strrchr(shardRelationName, SHARD_NAME_SEPARATOR); + if (shardIdString == NULL) + { + return false; + } + + relationNameLength = shardIdString - shardRelationName; + relationName = strndup(shardRelationName, relationNameLength); + + relationId = RelnameGetRelid(relationName); + if (!OidIsValid(relationId)) + { + /* there is no such relation */ + return false; + } + + if (!IsDistributedTable(relationId)) + { + /* we're obviously only interested in distributed tables */ + return false; + } + + shardId = ExtractShardId(shardRelationName, missingOk); + if (shardId == INVALID_SHARD_ID) + { + /* + * The format of the table name does not align with + * our shard name definition. + */ + return false; + } + + /* + * At this point we're sure that this is a shard of a + * distributed table. + */ + shardInterval = LoadShardInterval(shardId); + if (shardInterval->relationId == relationId) + { + return true; + } + + return false; +} diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 4321fab9c..77d5e1da0 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -120,6 +120,8 @@ extern int32 ArrayObjectCount(ArrayType *arrayObject); extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName); +extern uint64 ExtractShardId(const char *tableName, bool missingOk); + /* Function declarations shared with the master planner */ extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);