From 8d42d18c81e320b4f007d500583ea5abf0cb8029 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 18 Sep 2016 19:23:06 -0700 Subject: [PATCH] Parallelize and transactionalize DDL. --- .../distributed/executor/multi_utility.c | 185 +++--------------- .../distributed/master/master_create_shards.c | 1 - .../expected/multi_index_statements.out | 8 +- .../expected/multi_join_order_additional.out | 4 - .../input/multi_alter_table_statements.source | 7 +- .../multi_alter_table_statements.source | 35 ++-- 6 files changed, 55 insertions(+), 185 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 8864a7b16..ee6f91e99 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -30,17 +30,15 @@ #include "commands/defrem.h" #include "commands/tablecmds.h" #include "distributed/citus_ruleutils.h" -#include "distributed/commit_protocol.h" -#include "distributed/connection_cache.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_join_order.h" -#include "distributed/multi_shard_transaction.h" #include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" +#include "distributed/remote_commands.h" #include "distributed/transmit.h" #include "distributed/worker_protocol.h" #include "executor/executor.h" @@ -95,14 +93,13 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand, bool isTopLevel); + const char *createIndexCommand); static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand, bool isTopLevel); + const char *dropIndexCommand); static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand, bool isTopLevel); + const char *alterTableCommand); static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, - bool isTopLevel); + const char *alterObjectSchemaCommand); /* Local functions forward declarations for unsupported command checks */ static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); @@ -117,12 +114,8 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel); +static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString); static void ShowNoticeIfNotUsing2PC(void); -static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString); -static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, - ShardConnections *shardConnections); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void CheckCopyPermissions(CopyStmt *copyStatement); @@ -210,12 +203,9 @@ multi_ProcessUtility(Node *parsetree, /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ if (EnableDDLPropagation) { - bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); - if (IsA(parsetree, IndexStmt)) { - parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, - isTopLevel); + parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString); } if (IsA(parsetree, DropStmt)) @@ -223,7 +213,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); + parsetree = ProcessDropIndexStmt(dropStatement, queryString); } } @@ -232,8 +222,7 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, - isTopLevel); + parsetree = ProcessAlterTableStmt(alterTableStmt, queryString); } } @@ -257,8 +246,7 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, AlterObjectSchemaStmt)) { AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; - parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString, - isTopLevel); + parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString); } /* @@ -538,8 +526,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * master node table. */ static Node * -ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, - bool isTopLevel) +ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) { /* * We first check whether a distributed relation is affected. For that, we need to @@ -586,7 +573,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand ErrorIfUnsupportedIndexStmt(createIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); + ExecuteDistributedDDLCommand(relationId, createIndexCommand); } } @@ -603,8 +590,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand * master node table. */ static Node * -ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, - bool isTopLevel) +ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) { ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; @@ -673,7 +659,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); + ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand); } return (Node *) dropIndexStatement; @@ -689,8 +675,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, * master node table. */ static Node * -ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, - bool isTopLevel) +ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) { /* first check whether a distributed relation is affected */ if (alterTableStatement->relation != NULL) @@ -705,7 +690,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl ErrorIfUnsupportedAlterTableStmt(alterTableStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, alterTableCommand, isTopLevel); + ExecuteDistributedDDLCommand(relationId, alterTableCommand); } } } @@ -722,7 +707,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl */ static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, - const char *alterObjectSchemaCommand, bool isTopLevel) + const char *alterObjectSchemaCommand) { Oid relationId = InvalidOid; bool noWait = false; @@ -1278,16 +1263,13 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) * ExecuteDistributedDDLCommand applies a given DDL command to the given * distributed table in a distributed transaction. If the multi shard commit protocol is * in its default value of '1pc', then a notice message indicating that '2pc' might be - * used for extra safety. In the commit protocol, a BEGIN is sent after connection to - * each shard placement and COMMIT/ROLLBACK is handled by - * CompleteShardPlacementTransactions function. + * used for extra safety. + * + * DDL is executed, via worker_apply_shard_ddl_command(), on the workers. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, - bool isTopLevel) +ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) { - bool executionOK = false; - if (XactModificationLevel == XACT_MODIFICATION_DATA) { ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), @@ -1295,16 +1277,14 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, "transaction blocks containing data modifications"))); } + BeginOrContinueCoordinatedTransaction(); + + /* FIXME: Move into ExecuteDDLOnRelationPlacements()? */ ShowNoticeIfNotUsing2PC(); - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); - - /* if command could not be executed on any finalized shard placement, error out */ - if (!executionOK) - { - ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); - } + ExecuteDDLOnRelationPlacements(relationId, ddlCommandString); + /* FIXME: Move into ExecuteDDLOnRelationPlacements()? */ XactModificationLevel = XACT_MODIFICATION_SCHEMA; } @@ -1328,116 +1308,6 @@ ShowNoticeIfNotUsing2PC(void) } -/* - * ExecuteCommandOnWorkerShards executes a given command on all the finalized - * shard placements of the given table within a distributed transaction. The - * value of citus.multi_shard_commit_protocol is set to '2pc' by the caller - * ExecuteDistributedDDLCommand function so that two phase commit protocol is used. - * - * ExecuteCommandOnWorkerShards opens an individual connection for each of the - * shard placement. After all connections are opened, a BEGIN command followed by - * a proper "SELECT worker_apply_shard_ddl_command(, )" is - * sent to all open connections in a serial manner. - * - * The opened transactions are handled by the CompleteShardPlacementTransactions - * function. - * - * Note: There are certain errors which would occur on few nodes and not on the - * others. For example, adding a column with a type which exists on some nodes - * and not on the others. - * - * Note: The execution will be blocked if a prepared transaction from previous - * executions exist on the workers. In this case, those prepared transactions should - * be removed by either COMMIT PREPARED or ROLLBACK PREPARED. - */ -static bool -ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) -{ - List *shardIntervalList = LoadShardIntervalList(relationId); - char *tableOwner = TableOwner(relationId); - ListCell *shardIntervalCell = NULL; - Oid schemaId = get_rel_namespace(relationId); - char *schemaName = get_namespace_name(schemaId); - - LockShards(shardIntervalList, ShareLock); - OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner); - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - ShardConnections *shardConnections = NULL; - bool shardConnectionsFound = false; - char *escapedSchemaName = quote_literal_cstr(schemaName); - char *escapedCommandString = quote_literal_cstr(commandString); - StringInfo applyCommand = makeStringInfo(); - - shardConnections = GetShardConnections(shardId, &shardConnectionsFound); - Assert(shardConnectionsFound); - - /* build the shard ddl command */ - appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, - escapedSchemaName, escapedCommandString); - - ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections); - - FreeStringInfo(applyCommand); - } - - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); - - return true; -} - - -/* - * ExecuteCommandOnShardPlacements executes the given ddl command on the - * placements of the given shard, using the given shard connections. - */ -static void -ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, - ShardConnections *shardConnections) -{ - List *connectionList = shardConnections->connectionList; - ListCell *connectionCell = NULL; - - Assert(connectionList != NIL); - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - PGresult *result = NULL; - - /* send the query */ - result = PQexec(connection, applyCommand->data); - if (PQresultStatus(result) != PGRES_TUPLES_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not execute DDL command on worker " - "node shards"))); - } - else - { - char *workerName = ConnectionGetOptionValue(connection, "host"); - char *workerPort = ConnectionGetOptionValue(connection, "port"); - - ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT - " on node %s:%s", shardId, workerName, - workerPort))); - } - - PQclear(result); - - transactionConnection->transactionState = TRANSACTION_STATE_OPEN; - - CHECK_FOR_INTERRUPTS(); - } -} - - /* * Before acquiring a table lock, check whether we have sufficient rights. * In the case of DROP INDEX, also try to lock the table before the index. @@ -1761,7 +1631,6 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; - bool isTopLevel = true; if (!IsDistributedTable(relOid)) { @@ -1794,7 +1663,7 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); + ExecuteDistributedDDLCommand(relOid, ddlString.data); resetStringInfo(&ddlString); } } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index a55ed2842..f4d3fb788 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -26,7 +26,6 @@ #include "catalog/namespace.h" #include "catalog/pg_class.h" -#include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 639d18ad0..1144d52fa 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -155,12 +155,12 @@ NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' WARNING: data type bigint has no default operator class for access method "gist" HINT: You must specify an operator class for the index or define a default operator class for the data type. -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 CREATE INDEX try_index ON lineitem (non_existent_column); WARNING: column "non_existent_column" does not exist -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 CREATE INDEX ON lineitem (l_orderkey); ERROR: creating index without a name on a distributed table is currently unsupported -- Verify that none of failed indexes got created on the master node diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 7254b56ba..7cebe1ec9 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -44,11 +44,7 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' -DEBUG: applied command on shard 650000 on node localhost:57637 -DEBUG: applied command on shard 650001 on node localhost:57638 DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" -DEBUG: sent COMMIT over connection 650000 -DEBUG: sent COMMIT over connection 650001 CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index 614327b0c..f14ee756b 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -230,7 +230,9 @@ COMMIT; -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; - +SET client_min_messages = 'ERROR'; +ALTER TABLE lineitem_alter DROP COLUMN first; +RESET client_min_messages; -- Create single-shard table (to avoid deadlocks in the upcoming test hackery) CREATE TABLE single_shard_items (id integer, name text); SELECT master_create_distributed_table('single_shard_items', 'id', 'hash'); @@ -264,14 +266,13 @@ COMMIT; -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; --- Now try with 2pc off +-- Now try with 2pc off, that'll result in invalid shards RESET citus.multi_shard_commit_protocol; BEGIN; CREATE INDEX single_index_2 ON single_shard_items(id); CREATE INDEX single_index_3 ON single_shard_items(name); COMMIT; --- The block should have committed with a warning SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; \c - - - :worker_2_port diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 2e74da86d..cacf6fef0 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -262,8 +262,8 @@ NOTICE: relation "non_existent_table" does not exist, skipping ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; WARNING: column "non_existent_column" of relation "lineitem_alter_220000" does not exist -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2; @@ -362,16 +362,16 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc -- types ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; WARNING: type "non_existent_type" does not exist -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; WARNING: column "null_column" contains null values -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; WARNING: invalid input syntax for integer: "a" -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 -- Verify that we error out on statements involving RENAME ALTER TABLE lineitem_alter RENAME TO lineitem_renamed; ERROR: renaming distributed tables or their objects is currently unsupported @@ -520,7 +520,7 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh ALTER TABLE lineitem_alter ADD COLUMN first integer; WARNING: column "first" of relation "lineitem_alter_220000" already exists CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +ERROR: failure on connection marked as essential: localhost:57638 COMMIT; -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; @@ -528,6 +528,10 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; -----------+----------- (0 rows) +SET client_min_messages = 'ERROR'; +ALTER TABLE lineitem_alter DROP COLUMN first; +ERROR: failure on connection marked as essential: localhost:57637 +RESET client_min_messages; -- Create single-shard table (to avoid deadlocks in the upcoming test hackery) CREATE TABLE single_shard_items (id integer, name text); SELECT master_create_distributed_table('single_shard_items', 'id', 'hash'); @@ -565,14 +569,14 @@ COMMIT; WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" DETAIL: Key (command)=(CREATE INDEX) already exists. CONTEXT: while executing command on localhost:57638 -ERROR: failed to prepare transaction +ERROR: failure on connection marked as essential: localhost:57638 -- Nothing from the block should have committed SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; indexname | tablename -----------+----------- (0 rows) --- Now try with 2pc off +-- Now try with 2pc off, that'll result in invalid shards RESET citus.multi_shard_commit_protocol; BEGIN; CREATE INDEX single_index_2 ON single_shard_items(id); @@ -580,8 +584,9 @@ NOTICE: using one-phase commit for distributed DDL commands HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' CREATE INDEX single_index_3 ON single_shard_items(name); COMMIT; -WARNING: failed to commit transaction on localhost:57638 --- The block should have committed with a warning +WARNING: duplicate key value violates unique constraint "ddl_commands_command_key" +DETAIL: Key (command)=(CREATE INDEX) already exists. +CONTEXT: while executing command on localhost:57638 SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items'; indexname | tablename ----------------+-------------------- @@ -653,8 +658,8 @@ INSERT INTO test_ab VALUES (2, 11); CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); WARNING: could not create unique index "temp_unique_index_1_220022" DETAIL: Key (a)=(2) is duplicated. -CONTEXT: while executing command on localhost:57638 -ERROR: could not execute DDL command on worker node shards +CONTEXT: while executing command on localhost:57637 +ERROR: failure on connection marked as essential: localhost:57637 SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard WHERE logicalrelid='test_ab'::regclass AND shardstate=3; shardid