Make shard transfer functions co-location aware

With this change, master_copy_shard_placement and master_move_shard_placement functions
start to copy/move given shard along with its co-located shards.
pull/840/head
Burak Yucesoy 2016-09-30 15:38:38 +03:00 committed by Onder Kalaci
parent 0fbd19550d
commit 6668d19a3b
16 changed files with 760 additions and 142 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -70,6 +70,8 @@ $(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql
cat $^ > $@
$(EXTENSION)--6.0-6.sql: $(EXTENSION)--6.0-5.sql $(EXTENSION)--6.0-5--6.0-6.sql
cat $^ > $@
$(EXTENSION)--6.0-7.sql: $(EXTENSION)--6.0-6.sql $(EXTENSION)--6.0-6--6.0-7.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -26,3 +26,4 @@ CREATE FUNCTION master_get_new_placementid()
AS 'MODULE_PATHNAME', $$master_get_new_placementid$$;
COMMENT ON FUNCTION master_get_new_placementid()
IS 'fetch unique placementid';

View File

@ -12,3 +12,4 @@ CREATE FUNCTION column_name_to_column(table_name regclass, column_name text)
AS 'MODULE_PATHNAME', $$column_name_to_column$$;
COMMENT ON FUNCTION column_name_to_column(table_name regclass, column_name text)
IS 'convert a column name to its textual Var representation';

View File

@ -0,0 +1,21 @@
/* citus--6.0-5--6.0-6.sql */
CREATE FUNCTION pg_catalog.get_colocated_table_array(regclass)
RETURNS regclass[]
AS 'citus'
LANGUAGE C STRICT;
CREATE OR REPLACE FUNCTION pg_catalog.master_move_shard_placement(shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_move_shard_placement$$;
COMMENT ON FUNCTION pg_catalog.master_move_shard_placement(shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer)
IS 'move shard from remote node';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-6'
default_version = '6.0-7'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -18,7 +18,9 @@
#include <string.h>
#include "catalog/pg_class.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
@ -36,68 +38,163 @@
/* local function forward declarations */
static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort);
static void EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName,
int32 sourceNodePort);
static ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
text *nodeName, uint32 nodePort);
char *nodeName, uint32 nodePort,
bool missingOk);
static void CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort,
bool doRepair);
static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName,
int32 sourceNodePort);
static char * ConstructQualifiedShardName(ShardInterval *shardInterval);
static List * RecreateTableDDLCommandList(Oid relationId);
static bool CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId,
ShardPlacement *healthyPlacement,
ShardPlacement *placementToRepair);
static void SendCommandListInSingleTransaction(char *nodeName, int32 nodePort,
List *commandList);
static char * CitusExtensionOwnerName(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_copy_shard_placement);
PG_FUNCTION_INFO_V1(master_move_shard_placement);
/*
* master_copy_shard_placement implements a user-facing UDF to copy data from
* master_copy_shard_placement implements a user-facing UDF to repair data from
* a healthy (source) node to an inactive (target) node. To accomplish this it
* entirely recreates the table structure before copying all data. During this
* time all modifications are paused to the shard. After successful repair, the
* inactive placement is marked healthy and modifications may continue. If the
* repair fails at any point, this function throws an error, leaving the node
* in an unhealthy state.
* in an unhealthy state. Please note that master_copy_shard_placement copies
* given shard along with its co-located shards.
*/
Datum
master_copy_shard_placement(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
text *sourceNodeName = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(1);
int32 sourceNodePort = PG_GETARG_INT32(2);
text *targetNodeName = PG_GETARG_TEXT_P(3);
text *targetNodeNameText = PG_GETARG_TEXT_P(3);
int32 targetNodePort = PG_GETARG_INT32(4);
bool doRepair = true;
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *targetNodeName = text_to_cstring(targetNodeNameText);
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid distributedTableId = shardInterval->relationId;
List *colocatedTableList = ColocatedTableList(distributedTableId);
ListCell *colocatedTableCell = NULL;
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
char *relationOwner = NULL;
List *shardPlacementList = NIL;
ShardPlacement *sourcePlacement = NULL;
ShardPlacement *targetPlacement = NULL;
WorkerNode *targetNode = NULL;
List *ddlCommandList = NIL;
bool dataCopied = false;
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
char relationKind = '\0';
EnsureTableOwner(distributedTableId);
/* check that user has owner rights in all co-located tables */
EnsureTableOwner(colocatedTableId);
/*
* By taking an exclusive lock on the shard, we both stop all modifications
* (INSERT, UPDATE, or DELETE) and prevent concurrent repair operations from
* being able to operate on this shard.
*/
LockShardResource(shardId, ExclusiveLock);
relationKind = get_rel_relkind(colocatedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(colocatedTableId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot repair shard"),
errdetail("Table %s is a foreign table. Repairing "
"shards backed by foreign tables is "
"not supported.", relationName)));
}
}
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
/*
* We've stopped data modifications of this shard, but we plan to move
* a placement to the healthy state, so we need to grab a shard metadata
* lock (in exclusive mode) as well.
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
LockShardDistributionMetadata(colocatedShardId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);
/*
* If our aim is repairing, we should be sure that there is an unhealthy
* placement in target node. We use EnsureShardCanBeRepaired function
* to be sure that there is an unhealthy placement in target node. If
* we just want to copy the shard without any repair, it is enough to use
* EnsureShardCanBeCopied which just checks there is a placement in source
* and no placement in target node.
*/
if (doRepair)
{
/*
* After #810 is fixed, we should remove this check and call EnsureShardCanBeRepaired
* for all shard ids
*/
if (colocatedShardId == shardId)
{
EnsureShardCanBeRepaired(colocatedShardId, sourceNodeName, sourceNodePort,
targetNodeName, targetNodePort);
}
else
{
EnsureShardCanBeMoved(colocatedShardId, sourceNodeName, sourceNodePort);
}
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("master_copy_shard_placement() without repair option "
"is only supported on Citus Enterprise")));
}
}
/* CopyShardPlacement function copies given shard with its co-located shards */
CopyShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
targetNodePort, doRepair);
PG_RETURN_VOID();
}
/*
* master_move_shard_placement moves given shard (and its co-located shards) from one
* node to the other node.
*/
Datum
master_move_shard_placement(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("master_move_shard_placement() is only supported on "
"Citus Enterprise")));
}
/*
* EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source
* node and inactive node on the target node.
*/
static void
EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
ShardPlacement *sourcePlacement = NULL;
ShardPlacement *targetPlacement = NULL;
bool missingSourceOk = false;
bool missingTargetOk = false;
shardPlacementList = ShardPlacementList(shardId);
sourcePlacement = SearchShardPlacementInList(shardPlacementList, sourceNodeName,
sourceNodePort);
sourceNodePort, missingSourceOk);
if (sourcePlacement->shardState != FILE_FINALIZED)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -105,65 +202,44 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
}
targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName,
targetNodePort);
targetNodePort, missingTargetOk);
if (targetPlacement->shardState != FILE_INACTIVE)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("target placement must be in inactive state")));
}
relationKind = get_rel_relkind(distributedTableId);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot repair shard"),
errdetail("Repairing shards backed by foreign tables is "
"not supported.")));
}
targetNode = palloc0(sizeof(WorkerNode));
strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH);
targetNode->workerPort = targetPlacement->nodePort;
/* retrieve DDL commands needed to drop and recreate table*/
ddlCommandList = RecreateTableDDLCommandList(distributedTableId);
/* remove existing (unhealthy) placement row; CreateShardPlacements will recreate */
DeleteShardPlacementRow(targetPlacement->shardId, targetPlacement->nodeName,
targetPlacement->nodePort);
/* finally, drop/recreate remote table and add back row (in healthy state) */
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
list_make1(targetNode), 0, 1);
HOLD_INTERRUPTS();
dataCopied = CopyDataFromFinalizedPlacement(distributedTableId, shardId,
sourcePlacement, targetPlacement);
if (!dataCopied)
{
ereport(ERROR, (errmsg("could not copy shard data"),
errhint("Consult recent messages in the server logs for "
"details.")));
}
RESUME_INTERRUPTS();
PG_RETURN_VOID();
}
/*
* SearchShardPlacementInList searches a provided list for a shard placement
* with the specified node name and port. This function throws an error if no
* such placement exists in the provided list.
* EnsureShardCanBeMoved checks if the given shard has a placement in the source node but
* not on the target node. It is important to note that SearchShardPlacementInList
* function already generates error if given shard does not have a placement in the
* source node. Therefore we do not perform extra check.
*/
static void
EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, int32 sourceNodePort)
{
List *shardPlacementList = ShardPlacementList(shardId);
bool missingSourceOk = false;
/* Actual check is done in SearchShardPlacementInList */
SearchShardPlacementInList(shardPlacementList, sourceNodeName, sourceNodePort,
missingSourceOk);
}
/*
* SearchShardPlacementInList searches a provided list for a shard placement with the
* specified node name and port. If missingOk is set to true, this function returns NULL
* if no such placement exists in the provided list, otherwise it throws an error.
*/
static ShardPlacement *
SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32 nodePort)
SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 nodePort, bool
missingOk)
{
ListCell *shardPlacementCell = NULL;
ShardPlacement *matchingPlacement = NULL;
char *nodeName = text_to_cstring(nodeNameText);
foreach(shardPlacementCell, shardPlacementList)
{
@ -179,6 +255,11 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32
if (matchingPlacement == NULL)
{
if (missingOk)
{
return NULL;
}
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("could not find placement matching \"%s:%d\"",
nodeName, nodePort),
@ -189,6 +270,134 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32
}
/*
* CopyShardPlacement copies a shard along with its co-located shards from a source node
* to target node. CopyShardPlacement does not make any checks about state of the shards.
* It is caller's responsibility to make those checks if they are necessary.
*/
static void
CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
char *targetNodeName, int32 targetNodePort, bool doRepair)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL;
List *ddlCommandList = NIL;
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
List *colocatedShardDdlList = NIL;
colocatedShardDdlList = CopyShardCommandList(colocatedShard, sourceNodeName,
sourceNodePort);
ddlCommandList = list_concat(ddlCommandList, colocatedShardDdlList);
}
HOLD_INTERRUPTS();
SendCommandListInSingleTransaction(targetNodeName, targetNodePort, ddlCommandList);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
uint64 colocatedShardId = colocatedShard->shardId;
/*
* If we call this function for repair purposes, the caller should have
* removed the old shard placement metadata.
*/
if (doRepair)
{
List *shardPlacementList = ShardPlacementList(colocatedShardId);
bool missingSourceOk = false;
ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList,
targetNodeName,
targetNodePort,
missingSourceOk);
UpdateShardPlacementState(placement->placementId, FILE_FINALIZED);
}
else
{
InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID,
FILE_FINALIZED, ShardLength(colocatedShardId),
targetNodeName,
targetNodePort);
}
}
RESUME_INTERRUPTS();
}
/*
* CopyShardCommandList generates command list to copy the given shard placement
* from the source node to the target node.
*/
static List *
CopyShardCommandList(ShardInterval *shardInterval,
char *sourceNodeName, int32 sourceNodePort)
{
char *shardName = ConstructQualifiedShardName(shardInterval);
List *ddlCommandList = NIL;
ListCell *ddlCommandCell = NULL;
List *copyShardToNodeCommandsList = NIL;
StringInfo copyShardDataCommand = makeStringInfo();
ddlCommandList = RecreateTableDDLCommandList(shardInterval->relationId);
foreach(ddlCommandCell, ddlCommandList)
{
char *ddlCommand = lfirst(ddlCommandCell);
char *escapedDdlCommand = quote_literal_cstr(ddlCommand);
StringInfo applyDdlCommand = makeStringInfo();
appendStringInfo(applyDdlCommand,
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA,
shardInterval->shardId, escapedDdlCommand);
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
applyDdlCommand->data);
}
appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName), /* table to append */
quote_literal_cstr(shardName), /* remote table name */
quote_literal_cstr(sourceNodeName), /* remote host */
sourceNodePort); /* remote port */
copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList,
copyShardDataCommand->data);
return copyShardToNodeCommandsList;
}
/*
* ConstuctQualifiedShardName creates the fully qualified name string of the
* given shard in <schema>.<table_name>_<shard_id> format.
*
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
*/
static char *
ConstructQualifiedShardName(ShardInterval *shardInterval)
{
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(shardInterval->relationId);
char *shardName = NULL;
shardName = pstrdup(tableName);
AppendShardIdToName(&shardName, shardInterval->shardId);
shardName = quote_qualified_identifier(schemaName, shardName);
return shardName;
}
/*
* RecreateTableDDLCommandList returns a list of DDL statements similar to that
* returned by GetTableDDLEvents except that the list begins with a "DROP TABLE"
@ -237,45 +446,84 @@ RecreateTableDDLCommandList(Oid relationId)
/*
* CopyDataFromFinalizedPlacement copies a the data for a shard (identified by
* a relation and shard identifier) from a healthy placement to one needing
* repair. The unhealthy placement must already have an empty relation in place
* to receive rows from the healthy placement. This function returns a boolean
* indicating success or failure.
* SendCommandListInSingleTransaction opens connection to the node with the given
* nodeName and nodePort. Then, the connection starts a transaction on the remote
* node and executes the commands in the transaction. The function raises error if
* any of the queries fails.
*
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
*/
static bool
CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId,
ShardPlacement *healthyPlacement,
ShardPlacement *placementToRepair)
static void
SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, List *commandList)
{
const char *shardTableName = NULL;
const char *shardQualifiedName = NULL;
StringInfo copyRelationQuery = makeStringInfo();
List *queryResultList = NIL;
bool copySuccessful = false;
char *nodeUser = CitusExtensionOwnerName();
PGconn *workerConnection = NULL;
PGresult *queryResult = NULL;
ListCell *commandCell = NULL;
char *relationName = get_rel_name(distributedTableId);
Oid shardSchemaOid = get_rel_namespace(distributedTableId);
const char *shardSchemaName = get_namespace_name(shardSchemaOid);
AppendShardIdToName(&relationName, shardId);
shardTableName = quote_identifier(relationName);
shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName);
appendStringInfo(copyRelationQuery, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardQualifiedName), /* table to append */
quote_literal_cstr(shardQualifiedName), /* remote table name */
quote_literal_cstr(healthyPlacement->nodeName), /* remote host */
healthyPlacement->nodePort); /* remote port */
queryResultList = ExecuteRemoteQuery(placementToRepair->nodeName,
placementToRepair->nodePort,
NULL, /* current user, just data manipulation */
copyRelationQuery);
if (queryResultList != NIL)
workerConnection = ConnectToNode(nodeName, nodePort, nodeUser);
if (workerConnection == NULL)
{
copySuccessful = true;
ereport(ERROR, (errmsg("could not open connection to %s:%d as %s",
nodeName, nodePort, nodeUser)));
}
return copySuccessful;
/* start the transaction on the worker node */
queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
}
/* commit the transaction on the worker node */
queryResult = PQexec(workerConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
}
/*
* CitusExtensionOwnerName returns the name of the owner of the extension.
*
* FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus.
*/
static char *
CitusExtensionOwnerName(void)
{
Oid superUserId = CitusExtensionOwner();
#if (PG_VERSION_NUM < 90500)
return GetUserNameFromId(superUserId);
#else
return GetUserNameFromId(superUserId, false);
#endif
}

View File

@ -0,0 +1,231 @@
--
-- MULTI_COLOCATED_SHARD_TRANSFER
--
-- These tables are created in multi_colocation_utils test
-- test repair
-- manually set shardstate as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND (shardid = 1300000 OR shardid = 1300004);
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300016;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300020;
-- test repairing colocated shards
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 3
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
(16 rows)
-- repair colocated shards
SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
(16 rows)
-- test repairing NOT colocated shard
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table5_groupX'::regclass
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300016 | table5_groupx | 57637 | 0 | 1
1300016 | table5_groupx | 57638 | 0 | 3
1300017 | table5_groupx | 57637 | 0 | 1
1300017 | table5_groupx | 57638 | 0 | 1
1300018 | table5_groupx | 57637 | 0 | 1
1300018 | table5_groupx | 57638 | 0 | 1
1300019 | table5_groupx | 57638 | 0 | 1
1300019 | table5_groupx | 57637 | 0 | 1
(8 rows)
-- repair NOT colocated shard
SELECT master_copy_shard_placement(1300016, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table5_groupX'::regclass
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300016 | table5_groupx | 57637 | 0 | 1
1300016 | table5_groupx | 57638 | 0 | 1
1300017 | table5_groupx | 57637 | 0 | 1
1300017 | table5_groupx | 57638 | 0 | 1
1300018 | table5_groupx | 57637 | 0 | 1
1300018 | table5_groupx | 57638 | 0 | 1
1300019 | table5_groupx | 57638 | 0 | 1
1300019 | table5_groupx | 57637 | 0 | 1
(8 rows)
-- test repairing shard in append distributed table
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table6_append'::regclass
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300020 | table6_append | 57637 | 0 | 1
1300020 | table6_append | 57638 | 0 | 3
1300021 | table6_append | 57637 | 0 | 1
1300021 | table6_append | 57638 | 0 | 1
(4 rows)
-- repair shard in append distributed table
SELECT master_copy_shard_placement(1300020, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table6_append'::regclass
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300020 | table6_append | 57637 | 0 | 1
1300020 | table6_append | 57638 | 0 | 1
1300021 | table6_append | 57637 | 0 | 1
1300021 | table6_append | 57638 | 0 | 1
(4 rows)
-- test repair while all placements of one shard in colocation group is unhealthy
-- manually set shardstate as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1300000;
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 3
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
(16 rows)
-- repair while all placements of one shard in colocation group is unhealthy
SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: source placement must be in finalized state
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 3
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
(16 rows)

View File

@ -48,10 +48,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_table_array(regclass)
RETURNS regclass[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_shard_array(bigint)
RETURNS BIGINT[]
AS 'citus'
@ -71,7 +67,7 @@ SELECT master_create_distributed_table('table1_group1', 'id', 'hash');
(1 row)
SELECT master_create_worker_shards('table1_group1', 4, 1);
SELECT master_create_worker_shards('table1_group1', 4, 2);
master_create_worker_shards
-----------------------------
@ -84,7 +80,7 @@ SELECT master_create_distributed_table('table2_group1', 'id', 'hash');
(1 row)
SELECT master_create_worker_shards('table2_group1', 4, 1);
SELECT master_create_worker_shards('table2_group1', 4, 2);
master_create_worker_shards
-----------------------------
@ -97,7 +93,7 @@ SELECT master_create_distributed_table('table3_group2', 'id', 'hash');
(1 row)
SELECT master_create_worker_shards('table3_group2', 4, 1);
SELECT master_create_worker_shards('table3_group2', 4, 2);
master_create_worker_shards
-----------------------------
@ -110,7 +106,7 @@ SELECT master_create_distributed_table('table4_group2', 'id', 'hash');
(1 row)
SELECT master_create_worker_shards('table4_group2', 4, 1);
SELECT master_create_worker_shards('table4_group2', 4, 2);
master_create_worker_shards
-----------------------------
@ -123,7 +119,7 @@ SELECT master_create_distributed_table('table5_groupX', 'id', 'hash');
(1 row)
SELECT master_create_worker_shards('table5_groupX', 4, 1);
SELECT master_create_worker_shards('table5_groupX', 4, 2);
master_create_worker_shards
-----------------------------

View File

@ -32,6 +32,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -58,7 +58,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE public.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(10 rows)
@ -78,7 +78,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE public.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(11 rows)
@ -100,7 +100,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(12 rows)
@ -126,7 +126,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(12 rows)
@ -145,7 +145,7 @@ SELECT unnest(master_metadata_snapshot());
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(12 rows)

View File

@ -55,6 +55,8 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo
-- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port;
-- we are done with dummyhost, it is safe to remove it
DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost';
-- get the data from the second placement
SELECT * FROM customer_engagements;
id | created_at | event_data
@ -93,4 +95,4 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :remotenewshar
-- oops! we don't support repairing shards backed by foreign tables
SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
ERROR: cannot repair shard
DETAIL: Repairing shards backed by foreign tables is not supported.
DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported.

View File

@ -177,5 +177,7 @@ test: multi_expire_table_cache
# ----------
# multi_colocation_utils tests utility functions written for co-location feature & internal API
# multi_colocated_shard_transfer tests master_copy_shard_placement with colocated tables.
# ----------
test: multi_colocation_utils
test: multi_colocated_shard_transfer

View File

@ -0,0 +1,114 @@
--
-- MULTI_COLOCATED_SHARD_TRANSFER
--
-- These tables are created in multi_colocation_utils test
-- test repair
-- manually set shardstate as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND (shardid = 1300000 OR shardid = 1300004);
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300016;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300020;
-- test repairing colocated shards
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
-- repair colocated shards
SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
-- test repairing NOT colocated shard
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table5_groupX'::regclass
ORDER BY s.shardid;
-- repair NOT colocated shard
SELECT master_copy_shard_placement(1300016, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table5_groupX'::regclass
ORDER BY s.shardid;
-- test repairing shard in append distributed table
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table6_append'::regclass
ORDER BY s.shardid;
-- repair shard in append distributed table
SELECT master_copy_shard_placement(1300020, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
p.logicalrelid = 'table6_append'::regclass
ORDER BY s.shardid;
-- test repair while all placements of one shard in colocation group is unhealthy
-- manually set shardstate as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1300000;
-- status before shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;
-- repair while all placements of one shard in colocation group is unhealthy
SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port);
-- status after shard repair
SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate
FROM
pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp
WHERE
p.logicalrelid = s.logicalrelid AND
s.shardid = sp.shardid AND
colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass)
ORDER BY s.shardid;

View File

@ -57,11 +57,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint)
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_table_array(regclass)
RETURNS regclass[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_shard_array(bigint)
RETURNS BIGINT[]
AS 'citus'
@ -79,23 +74,23 @@ CREATE FUNCTION find_shard_interval_index(bigint)
-- create distributed table observe shard pruning
CREATE TABLE table1_group1 ( id int );
SELECT master_create_distributed_table('table1_group1', 'id', 'hash');
SELECT master_create_worker_shards('table1_group1', 4, 1);
SELECT master_create_worker_shards('table1_group1', 4, 2);
CREATE TABLE table2_group1 ( id int );
SELECT master_create_distributed_table('table2_group1', 'id', 'hash');
SELECT master_create_worker_shards('table2_group1', 4, 1);
SELECT master_create_worker_shards('table2_group1', 4, 2);
CREATE TABLE table3_group2 ( id int );
SELECT master_create_distributed_table('table3_group2', 'id', 'hash');
SELECT master_create_worker_shards('table3_group2', 4, 1);
SELECT master_create_worker_shards('table3_group2', 4, 2);
CREATE TABLE table4_group2 ( id int );
SELECT master_create_distributed_table('table4_group2', 'id', 'hash');
SELECT master_create_worker_shards('table4_group2', 4, 1);
SELECT master_create_worker_shards('table4_group2', 4, 2);
CREATE TABLE table5_groupX ( id int );
SELECT master_create_distributed_table('table5_groupX', 'id', 'hash');
SELECT master_create_worker_shards('table5_groupX', 4, 1);
SELECT master_create_worker_shards('table5_groupX', 4, 2);
CREATE TABLE table6_append ( id int );
SELECT master_create_distributed_table('table6_append', 'id', 'append');

View File

@ -37,6 +37,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
ALTER EXTENSION citus UPDATE TO '6.0-6';
ALTER EXTENSION citus UPDATE TO '6.0-7';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -55,6 +55,9 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo
-- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port;
-- we are done with dummyhost, it is safe to remove it
DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost';
-- get the data from the second placement
SELECT * FROM customer_engagements;