mirror of https://github.com/citusdata/citus.git
Add infrastructure to hide shards on MX worker nodes
Add ability to understand whether a table is a known shard on MX workers. Note that this is only useful and applicable for hiding shards on MX worker nodes given that we can have metadata only there.pull/2323/head
parent
28fd63bee2
commit
e13da6a343
|
@ -62,7 +62,6 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
|
||||||
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
||||||
int32 fileDescriptor);
|
int32 fileDescriptor);
|
||||||
static void CitusDeleteFile(const char *filename);
|
static void CitusDeleteFile(const char *filename);
|
||||||
static uint64 ExtractShardId(const char *tableName);
|
|
||||||
static bool check_log_statement(List *stmt_list);
|
static bool check_log_statement(List *stmt_list);
|
||||||
static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName);
|
static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName);
|
||||||
static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
|
static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
|
||||||
|
@ -526,9 +525,13 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Extracts shard id from the given table name, and returns it. */
|
/*
|
||||||
static uint64
|
* ExtractShardId tries to extract shard id from the given table name,
|
||||||
ExtractShardId(const char *tableName)
|
* and returns the shard id if table name is formatted as shard name.
|
||||||
|
* Else, the function returns INVALID_SHARD_ID.
|
||||||
|
*/
|
||||||
|
uint64
|
||||||
|
ExtractShardId(const char *tableName, bool missingOk)
|
||||||
{
|
{
|
||||||
uint64 shardId = 0;
|
uint64 shardId = 0;
|
||||||
char *shardIdString = NULL;
|
char *shardIdString = NULL;
|
||||||
|
@ -536,11 +539,16 @@ ExtractShardId(const char *tableName)
|
||||||
|
|
||||||
/* find the last underscore and increment for shardId string */
|
/* find the last underscore and increment for shardId string */
|
||||||
shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR);
|
shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR);
|
||||||
if (shardIdString == NULL)
|
if (shardIdString == NULL && !missingOk)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
|
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
|
||||||
tableName)));
|
tableName)));
|
||||||
}
|
}
|
||||||
|
else if (shardIdString == NULL && missingOk)
|
||||||
|
{
|
||||||
|
return INVALID_SHARD_ID;
|
||||||
|
}
|
||||||
|
|
||||||
shardIdString++;
|
shardIdString++;
|
||||||
|
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -548,8 +556,15 @@ ExtractShardId(const char *tableName)
|
||||||
|
|
||||||
if (errno != 0 || (*shardIdStringEnd != '\0'))
|
if (errno != 0 || (*shardIdStringEnd != '\0'))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
|
if (!missingOk)
|
||||||
tableName)));
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
|
||||||
|
tableName)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return INVALID_SHARD_ID;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return shardId;
|
return shardId;
|
||||||
|
@ -761,7 +776,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
* the transaction for this function commits, this lock will automatically
|
* the transaction for this function commits, this lock will automatically
|
||||||
* be released. This ensures appends to a shard happen in a serial manner.
|
* be released. This ensures appends to a shard happen in a serial manner.
|
||||||
*/
|
*/
|
||||||
shardId = ExtractShardId(shardTableName);
|
shardId = ExtractShardId(shardTableName, false);
|
||||||
LockShardResource(shardId, AccessExclusiveLock);
|
LockShardResource(shardId, AccessExclusiveLock);
|
||||||
|
|
||||||
/* copy remote table's data to this node */
|
/* copy remote table's data to this node */
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
/*
|
||||||
|
* worker_shard_visibility.c
|
||||||
|
*
|
||||||
|
* TODO: Write some meaningful comment
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "catalog/namespace.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(relation_is_a_known_shard);
|
||||||
|
|
||||||
|
|
||||||
|
static bool RelationIsAKnownShard(Oid shardRelationId);
|
||||||
|
|
||||||
|
|
||||||
|
Datum
|
||||||
|
relation_is_a_known_shard(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
|
PG_RETURN_BOOL(RelationIsAKnownShard(relationId));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Given a relationId, check whether it's a shard of any distributed table.
|
||||||
|
* We can only do that in MX since we've metadata there. We can actually
|
||||||
|
* implement for non-mx as well, but, there is currently no need for that.
|
||||||
|
*
|
||||||
|
* TODO: improve the comment
|
||||||
|
* TODO: Make sure that we're not missing any edge cases with our
|
||||||
|
* implementation
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
RelationIsAKnownShard(Oid shardRelationId)
|
||||||
|
{
|
||||||
|
int localGroupId = -1;
|
||||||
|
char *shardRelationName = NULL;
|
||||||
|
char *relationName = NULL;
|
||||||
|
bool missingOk = true;
|
||||||
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
char *shardIdString = NULL;
|
||||||
|
int relationNameLength = 0;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TODO: version check
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (!OidIsValid(shardRelationId))
|
||||||
|
{
|
||||||
|
/* we cannot continue without a valid Oid */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
localGroupId = GetLocalGroupId();
|
||||||
|
if (localGroupId == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We're not interested in shards in the coordinator
|
||||||
|
* or non-mx worker nodes.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
shardRelationName = get_rel_name(shardRelationId);
|
||||||
|
|
||||||
|
/* find the last underscore and increment for shardId string */
|
||||||
|
shardIdString = strrchr(shardRelationName, SHARD_NAME_SEPARATOR);
|
||||||
|
if (shardIdString == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
relationNameLength = shardIdString - shardRelationName;
|
||||||
|
relationName = strndup(shardRelationName, relationNameLength);
|
||||||
|
|
||||||
|
relationId = RelnameGetRelid(relationName);
|
||||||
|
if (!OidIsValid(relationId))
|
||||||
|
{
|
||||||
|
/* there is no such relation */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!IsDistributedTable(relationId))
|
||||||
|
{
|
||||||
|
/* we're obviously only interested in distributed tables */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
shardId = ExtractShardId(shardRelationName, missingOk);
|
||||||
|
if (shardId == INVALID_SHARD_ID)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The format of the table name does not align with
|
||||||
|
* our shard name definition.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* At this point we're sure that this is a shard of a
|
||||||
|
* distributed table.
|
||||||
|
*/
|
||||||
|
shardInterval = LoadShardInterval(shardId);
|
||||||
|
if (shardInterval->relationId == relationId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
|
@ -120,6 +120,8 @@ extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
||||||
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
||||||
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
||||||
const char *tableName);
|
const char *tableName);
|
||||||
|
extern uint64 ExtractShardId(const char *tableName, bool missingOk);
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations shared with the master planner */
|
/* Function declarations shared with the master planner */
|
||||||
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
||||||
|
|
Loading…
Reference in New Issue