Merge pull request #857 from citusdata/feature/placementid

Introduce placement IDs.
pull/821/merge
Andres Freund 2016-10-07 12:54:12 -07:00 committed by GitHub
commit 85075b7c28
32 changed files with 409 additions and 239 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -66,6 +66,8 @@ $(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql
cat $^ > $@
$(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql
cat $^ > $@
$(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,28 @@
/* citus--5.2-1--5.2-2.sql */
/*
* Replace oid column in pg_dist_shard_placement with an sequence column.
*/
CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq
NO CYCLE;
ALTER SEQUENCE citus.pg_dist_shard_placement_placementid_seq
SET SCHEMA pg_catalog;
ALTER TABLE pg_catalog.pg_dist_shard_placement
ADD COLUMN placementid bigint;
-- keep existing oids, and update sequence to match max.
UPDATE pg_catalog.pg_dist_shard_placement SET placementid = oid;
ALTER TABLE pg_catalog.pg_dist_shard_placement
ALTER COLUMN placementid SET DEFAULT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq'),
ALTER COLUMN placementid SET NOT NULL,
SET WITHOUT OIDS;
CREATE UNIQUE INDEX pg_dist_shard_placement_placementid_index
ON pg_catalog.pg_dist_shard_placement using btree(placementid);
SELECT setval('pg_catalog.pg_dist_shard_placement_placementid_seq', max(placementid))
FROM pg_catalog.pg_dist_shard_placement;
CREATE FUNCTION master_get_new_placementid()
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_get_new_placementid$$;
COMMENT ON FUNCTION master_get_new_placementid()
IS 'fetch unique placementid';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-4'
default_version = '6.0-5'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -1026,12 +1026,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 shardLength = 0;
DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName,
failedPlacement->nodePort);
InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength,
failedPlacement->nodeName, failedPlacement->nodePort);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
shardConnections->connectionList = connectionList;

View File

@ -605,12 +605,8 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
{
ShardPlacement *failedPlacement =
(ShardPlacement *) lfirst(failedPlacementCell);
uint64 shardLength = 0;
DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName,
failedPlacement->nodePort);
InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength,
failedPlacement->nodeName, failedPlacement->nodePort);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
executorState->es_processed = affectedTupleCount;
@ -1406,9 +1402,11 @@ MarkRemainingInactivePlacements(void)
uint64 shardId = shardConnSet->shardId;
NodeConnectionKey *nodeKey = &participant->cacheKey;
uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort);
InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength,
placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName,
nodeKey->nodePort);
InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength,
nodeKey->nodeName, nodeKey->nodePort);
}
}

View File

@ -376,12 +376,13 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
foreach(lingeringPlacementCell, lingeringPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_TO_DELETE, oldShardLength,
InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength,
workerName, workerPort);
ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"",

View File

@ -20,6 +20,7 @@
#include "catalog/pg_type.h"
#include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
@ -315,9 +316,9 @@ ShardPlacementList(uint64 shardId)
/*
* TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, and
* converts this tuple to an equivalent struct in memory. The function assumes
* the caller already has locks on the tuple, and doesn't perform any locking.
* TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement,
* and converts this tuple to in-memory struct. The function assumes the
* caller already has locks on the tuple, and doesn't perform any locking.
*/
ShardPlacement *
TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
@ -325,7 +326,8 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
ShardPlacement *shardPlacement = NULL;
bool isNull = false;
Oid tupleOid = HeapTupleGetOid(heapTuple);
Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
tupleDescriptor, &isNull);
Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid,
tupleDescriptor, &isNull);
Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength,
@ -336,11 +338,14 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
tupleDescriptor, &isNull);
Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport,
tupleDescriptor, &isNull);
Assert(!HeapTupleHasNulls(heapTuple));
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
}
shardPlacement = CitusMakeNode(ShardPlacement);
shardPlacement->tupleOid = tupleOid;
shardPlacement->placementId = DatumGetInt64(placementId);
shardPlacement->shardId = DatumGetInt64(shardId);
shardPlacement->shardLength = DatumGetInt64(shardLength);
shardPlacement->shardState = DatumGetUInt32(shardState);
@ -408,10 +413,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
/*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog.
* a new row with the given values into that system catalog. If placementId is
* INVALID_PLACEMENT_ID, a new placement id will be assigned.
*/
void
InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort)
{
Relation pgDistShardPlacement = NULL;
@ -424,11 +431,16 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
if (placementId == INVALID_PLACEMENT_ID)
{
placementId = master_get_new_placementid(NULL);
}
values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort);
values[Anum_pg_dist_shard_placement_placementid - 1] = Int64GetDatum(placementId);
/* open shard placement relation and insert new tuple */
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
@ -496,7 +508,7 @@ DeleteShardRow(uint64 shardId)
* first (unique) row that corresponds to the given shardId and worker node, and
* deletes this row.
*/
void
uint64
DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
{
Relation pgDistShardPlacement = NULL;
@ -506,8 +518,12 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
bool indexOK = true;
HeapTuple heapTuple = NULL;
bool heapTupleFound = false;
TupleDesc tupleDescriptor = NULL;
int64 placementId = INVALID_PLACEMENT_ID;
bool isNull = false;
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
@ -519,8 +535,6 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple);
if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 &&
placement->nodePort == workerPort)
@ -540,11 +554,73 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
shardId, workerName, workerPort)));
}
placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
}
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
systable_endscan(scanDescriptor);
CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock);
return placementId;
}
/*
* UpdateShardPlacementState sets the shardState for the placement identified
* by placementId.
*/
void
UpdateShardPlacementState(uint64 placementId, char shardState)
{
Relation pgDistShardPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Datum values[Natts_pg_dist_shard_placement];
bool isnull[Natts_pg_dist_shard_placement];
bool replace[Natts_pg_dist_shard_placement];
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_placementid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
scanDescriptor = systable_beginscan(pgDistShardPlacement,
DistShardPlacementPlacementidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard placement "
UINT64_FORMAT,
placementId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
isnull[Anum_pg_dist_shard_placement_shardstate - 1] = false;
replace[Anum_pg_dist_shard_placement_shardstate - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistShardPlacement, NoLock);
}
@ -794,7 +870,8 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS)
EnsureTablePermissions(distributedRelationId, ACL_INSERT);
/* finally insert placement */
InsertShardPlacementRow(shardId, shardState, shardLength, nodeName, nodePort);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength,
nodeName, nodePort);
heap_close(relation, NoLock);
heap_close(pgDistShard, NoLock);

View File

@ -67,6 +67,7 @@ static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescripto
PG_FUNCTION_INFO_V1(master_get_table_metadata);
PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
PG_FUNCTION_INFO_V1(master_get_new_shardid);
PG_FUNCTION_INFO_V1(master_get_new_placementid);
PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes);
PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes);
PG_FUNCTION_INFO_V1(master_get_active_worker_nodes);
@ -251,8 +252,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
* ahead logs; writing to logs avoids the risk of having shardId collisions.
*
* Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node. Further note that this function relies
* on an internal sequence created in initdb to generate unique identifiers.
* and the shardId with the master node.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
@ -280,6 +280,38 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
}
/*
* master_get_new_placementid allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having shardId
* collisions.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
* at some later point.
*/
Datum
master_get_new_placementid(PG_FUNCTION_ARGS)
{
text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum shardIdDatum = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum);
}
/*
* master_get_local_first_candidate_nodes returns a set of candidate host names
* and port numbers on which to place new shards. The function makes sure to

View File

@ -292,13 +292,11 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength,
workerName, workerPort);
UpdateShardPlacementState(placementId, FILE_INACTIVE);
ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node "
"\"%s:%u\"", shardQualifiedName, workerName,
@ -398,7 +396,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0;
InsertShardPlacementRow(shardId, shardState, shardSize, nodeName, nodePort);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeName, nodePort);
placementsCreated++;
}
else
@ -537,11 +536,12 @@ UpdateShardStatistics(int64 shardId)
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize,
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize,
workerName, workerPort);
}

View File

@ -5101,7 +5101,7 @@ CompareTasksByShardId(const void *leftElement, const void *rightElement)
/*
* ActiveShardPlacementLists finds the active shard placement list for each task in
* the given task list, sorts each shard placement list by tuple insertion time,
* the given task list, sorts each shard placement list by shard creation time,
* and adds the sorted placement list into a new list of lists. The function also
* ensures a one-to-one mapping between each placement list in the new list of
* lists and each task in the given task list.
@ -5122,7 +5122,7 @@ ActiveShardPlacementLists(List *taskList)
/* filter out shard placements that reside in inactive nodes */
List *activeShardPlacementList = ActivePlacementList(shardPlacementList);
/* sort shard placements by their insertion time */
/* sort shard placements by their creation time */
activeShardPlacementList = SortList(activeShardPlacementList,
CompareShardPlacements);
shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList);
@ -5142,12 +5142,21 @@ CompareShardPlacements(const void *leftElement, const void *rightElement)
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
Oid leftTupleOid = leftPlacement->tupleOid;
Oid rightTupleOid = rightPlacement->tupleOid;
uint64 leftPlacementId = leftPlacement->placementId;
uint64 rightPlacementId = rightPlacement->placementId;
/* tuples that are inserted earlier appear first */
int tupleOidDiff = leftTupleOid - rightTupleOid;
return tupleOidDiff;
if (leftPlacementId < rightPlacementId)
{
return -1;
}
else if (leftPlacementId > rightPlacementId)
{
return 1;
}
else
{
return 0;
}
}

View File

@ -309,7 +309,8 @@ create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS)
int64 shardId = PG_GETARG_INT64(0);
int64 shardLength = 0;
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardLength, "localhost", 5432);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, shardLength,
"localhost", 5432);
PG_RETURN_VOID();
}
@ -347,9 +348,11 @@ update_shard_placement_row_state(PG_FUNCTION_ARGS)
bool successful = true;
char *hostNameString = text_to_cstring(hostName);
uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
DeleteShardPlacementRow(shardId, hostNameString, hostPort);
InsertShardPlacementRow(shardId, shardState, shardLength, hostNameString, hostPort);
placementId = DeleteShardPlacementRow(shardId, hostNameString, hostPort);
InsertShardPlacementRow(shardId, placementId, shardState, shardLength,
hostNameString, hostPort);
PG_RETURN_BOOL(successful);
}

View File

@ -431,7 +431,7 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node)
{
WRITE_NODE_TYPE("SHARDPLACEMENT");
WRITE_OID_FIELD(tupleOid);
WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState);

View File

@ -1478,7 +1478,7 @@ _readShardPlacement(void)
{
READ_LOCALS(ShardPlacement);
READ_OID_FIELD(tupleOid);
READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState);

View File

@ -58,6 +58,7 @@ static Oid distPartitionColocationidIndexId = InvalidOid;
static Oid distShardLogicalRelidIndexId = InvalidOid;
static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid;
static Oid distShardPlacementPlacementidIndexId = InvalidOid;
static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid;
@ -702,6 +703,17 @@ DistShardPlacementShardidIndexId(void)
}
/* return oid of pg_dist_shard_placement_shardid_index */
Oid
DistShardPlacementPlacementidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_placementid_index",
&distShardPlacementPlacementidIndexId);
return distShardPlacementPlacementidIndexId;
}
/* return oid of pg_dist_shard_placement_nodeid_index */
Oid
DistShardPlacementNodeidIndexId(void)
@ -1234,6 +1246,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardLogicalRelidIndexId = InvalidOid;
distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid;
distShardPlacementPlacementidIndexId = InvalidOid;
distNodeRelationId = InvalidOid;
extraDataContainerFuncId = InvalidOid;
}

View File

@ -47,7 +47,7 @@ typedef struct ShardInterval
typedef struct ShardPlacement
{
CitusNodeTag type;
Oid tupleOid; /* unique oid that implies this row's insertion order */
uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId;
uint64 shardLength;
RelayFileState shardState;
@ -71,10 +71,14 @@ extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue);
extern void InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
extern void DeleteShardRow(uint64 shardId);
extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort);
extern void DeleteShardRow(uint64 shardId);
extern void DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
workerPort);
/* Remaining metadata utility functions */
extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,

View File

@ -48,8 +48,8 @@
/* Name of columnar foreign data wrapper */
#define CSTORE_FDW_NAME "cstore_fdw"
/* ShardId sequence name as defined in initdb.c */
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_shard_placement_placementid_seq"
/* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \
@ -97,10 +97,11 @@ extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventL
int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
/* Function declarations for generating metadata for shard creation */
/* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS);
extern Datum master_get_new_shardid(PG_FUNCTION_ARGS);
extern Datum master_get_new_placementid(PG_FUNCTION_ARGS);
extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS);
extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS);
extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);

View File

@ -75,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void);
extern Oid DistShardPlacementPlacementidIndexId(void);
extern Oid DistShardPlacementNodeidIndexId(void);
/* function oids */

View File

@ -29,6 +29,7 @@ typedef struct FormData_pg_dist_shard_placement
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text nodename; /* remote node's host name */
int32 nodeport; /* remote node's port number */
int64 placementid; /* global placementId on remote node (added later) */
#endif
} FormData_pg_dist_shard_placement;
@ -43,12 +44,13 @@ typedef FormData_pg_dist_shard_placement *Form_pg_dist_shard_placement;
* compiler constants for pg_dist_shard_placement
* ----------------
*/
#define Natts_pg_dist_shard_placement 5
#define Natts_pg_dist_shard_placement 6
#define Anum_pg_dist_shard_placement_shardid 1
#define Anum_pg_dist_shard_placement_shardstate 2
#define Anum_pg_dist_shard_placement_shardlength 3
#define Anum_pg_dist_shard_placement_nodename 4
#define Anum_pg_dist_shard_placement_nodeport 5
#define Anum_pg_dist_shard_placement_placementid 6
#endif /* PG_DIST_SHARD_PLACEMENT_H */

View File

@ -23,6 +23,7 @@
/* Shard name and identifier related defines */
#define SHARD_NAME_SEPARATOR '_'
#define INVALID_SHARD_ID 0
#define INVALID_PLACEMENT_ID 0
/*
* RelayFileState represents last known states of shards on a given node. We

View File

@ -98,10 +98,10 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li
GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | count | avg | array_agg
------------+-------+-----------------------+--------------------------------------------------------------------------------------------------
1.00 | 17 | 1477.1258823529411765 | {8997,9026,9158,9184,9220,9222,9348,9383,9476,5543,5633,5634,5698,5766,5856,5857,5986}
2.00 | 19 | 3078.4242105263157895 | {9030,9058,9123,9124,9188,9344,9441,9476,5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923}
3.00 | 14 | 4714.0392857142857143 | {9124,9157,9184,9223,9254,9349,9414,9475,9477,5509,5543,5605,5606,5827}
4.00 | 19 | 5929.7136842105263158 | {9091,9120,9281,9347,9382,9440,9473,5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985}
1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476}
2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476}
3.00 | 14 | 4714.0392857142857143 | {5509,5543,5605,5606,5827,9124,9157,9184,9223,9254,9349,9414,9475,9477}
4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473}
(4 rows)
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
@ -109,10 +109,10 @@ SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | my_month
------------+------------------------------------------------
1.00 | {7,7,4,7,4,2,6,3,5,9,5,7,5,9,11,11,4}
2.00 | {7,6,6,10,1,12,6,5,11,10,8,5,5,12,3,11,7,11,5}
3.00 | {10,6,7,8,5,8,9,11,3,4,9,8,11,7}
4.00 | {11,6,2,8,2,6,10,1,5,6,11,12,10,9,6,1,2,5,1}
1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5}
2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5}
3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3}
4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10}
(4 rows)
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
@ -120,10 +120,10 @@ SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | array_agg
------------+---------------------------------------------
1.00 | {18317,18445,11269,11397,11713,11715,11973}
2.00 | {18061,18247,18953,11847}
1.00 | {11269,11397,11713,11715,11973,18317,18445}
2.00 | {11847,18061,18247,18953}
3.00 | {18249,18315,18699,18951,18955}
4.00 | {18241,18765,11653,11659}
4.00 | {11653,11659,18241,18765}
(4 rows)
-- Check that we can execute array_agg() with an expression containing NULL values

View File

@ -72,17 +72,17 @@ SELECT master_create_worker_shards('cluster_management_test', 16, 1);
(1 row)
-- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1220001 | 1 | 0 | localhost | 57638
1220003 | 1 | 0 | localhost | 57638
1220005 | 1 | 0 | localhost | 57638
1220007 | 1 | 0 | localhost | 57638
1220009 | 1 | 0 | localhost | 57638
1220011 | 1 | 0 | localhost | 57638
1220013 | 1 | 0 | localhost | 57638
1220015 | 1 | 0 | localhost | 57638
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1220001 | 1 | localhost | 57638
1220003 | 1 | localhost | 57638
1220005 | 1 | localhost | 57638
1220007 | 1 | localhost | 57638
1220009 | 1 | localhost | 57638
1220011 | 1 | localhost | 57638
1220013 | 1 | localhost | 57638
1220015 | 1 | localhost | 57638
(8 rows)
-- try to remove a node with active placements and see that node removal is failed
@ -97,17 +97,17 @@ SELECT master_get_active_worker_nodes();
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1220001 | 3 | 0 | localhost | 57638
1220003 | 3 | 0 | localhost | 57638
1220005 | 3 | 0 | localhost | 57638
1220007 | 3 | 0 | localhost | 57638
1220009 | 3 | 0 | localhost | 57638
1220011 | 3 | 0 | localhost | 57638
1220013 | 3 | 0 | localhost | 57638
1220015 | 3 | 0 | localhost | 57638
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
1220001 | 3 | localhost | 57638
1220003 | 3 | localhost | 57638
1220005 | 3 | localhost | 57638
1220007 | 3 | localhost | 57638
1220009 | 3 | localhost | 57638
1220011 | 3 | localhost | 57638
1220013 | 3 | localhost | 57638
1220015 | 3 | localhost | 57638
(8 rows)
-- try to remove a node with only inactive placements and see that node is removed

View File

@ -444,16 +444,16 @@ LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
l_partkey | o_custkey | l_quantity
-----------+-----------+------------
135912 | 509 | 26.00
75351 | 1261 | 26.00
199475 | 1052 | 26.00
91309 | 8 | 26.00
53624 | 400 | 26.00
182736 | 1048 | 26.00
59694 | 163 | 26.00
20481 | 173 | 26.00
78748 | 1499 | 26.00
7614 | 1397 | 26.00
154380 | 421 | 26.00
103981 | 325 | 26.00
77886 | 817 | 26.00
147369 | 755 | 26.00
78175 | 1075 | 26.00
109784 | 1268 | 26.00
28635 | 1207 | 26.00
188845 | 554 | 26.00
189398 | 844 | 26.00
71383 | 1397 | 26.00
(10 rows)
RESET client_min_messages;

View File

@ -40,7 +40,7 @@ Distributed Query into pg_merge_job_570000
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290000 lineitem
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
@ -71,7 +71,7 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_290000",
"Relation Name": "lineitem_290001",
"Alias": "lineitem"
}
]
@ -140,7 +140,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_290000</Relation-Name>
<Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
@ -206,7 +206,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_290000"
Relation Name: "lineitem_290001"
Alias: "lineitem"
Master Query:
@ -238,7 +238,7 @@ Distributed Query into pg_merge_job_570006
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_290000 lineitem
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
@ -256,7 +256,7 @@ Distributed Query into pg_merge_job_570007
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_290000 lineitem
-> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
@ -279,7 +279,7 @@ Distributed Query into pg_merge_job_570008
Sort Key: lineitem.l_quantity
-> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_290000 lineitem
-> Seq Scan on lineitem_290001 lineitem
Filter: (l_quantity < 5.0)
-> Hash
-> Seq Scan on orders_290008 orders
@ -296,7 +296,7 @@ Distributed Query
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000
-> Result
-- Test update
@ -309,7 +309,7 @@ Distributed Query
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
@ -325,7 +325,7 @@ Distributed Query
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1)
@ -361,7 +361,7 @@ Distributed Query into pg_merge_job_570012
Tasks Shown: One of 8
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_290000 lineitem
-> Seq Scan on lineitem_290001 lineitem
Master Query
-> Seq Scan on pg_merge_job_570012
-- Test all tasks output
@ -375,22 +375,22 @@ Distributed Query into pg_merge_job_570013
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290004 lineitem
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290005 lineitem
-> Seq Scan on lineitem_290004 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290006 lineitem
-> Seq Scan on lineitem_290007 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290007 lineitem
-> Seq Scan on lineitem_290006 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
@ -413,7 +413,7 @@ Distributed Query into pg_merge_job_570016
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_290004 lineitem
-> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate

View File

@ -30,6 +30,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -69,14 +69,14 @@ DEBUG: generated sql query for job 1250 and task 21
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for job 1250 and task 24
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: assigned task 15 to node localhost:57637
DEBUG: assigned task 18 to node localhost:57638
DEBUG: assigned task 21 to node localhost:57637
DEBUG: assigned task 24 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 18 to node localhost:57637
DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 24 to node localhost:57637
DEBUG: assigned task 21 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
@ -107,8 +107,8 @@ DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252
@ -178,20 +178,20 @@ DEBUG: generated sql query for job 1253 and task 14
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for job 1253 and task 16
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 8 to node localhost:57638
DEBUG: assigned task 10 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: assigned task 14 to node localhost:57637
DEBUG: assigned task 16 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 14 to node localhost:57638
DEBUG: generated sql query for job 1254 and task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)"
DEBUG: generated sql query for job 1254 and task 4
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)"
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3

View File

@ -27,8 +27,8 @@ FROM
WHERE
o_custkey = c_custkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000]
@ -41,8 +41,8 @@ DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 11
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: CommitTransactionCommand
count
@ -66,10 +66,10 @@ WHERE
o_custkey = c_custkey AND
o_orderkey = l_orderkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 15 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 18 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 18 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: join prunable for intervals [1,1509] and [2951,4455]
@ -156,12 +156,12 @@ DEBUG: pruning merge fetch taskId 55
DETAIL: Creating dependency on merge taskId 68
DEBUG: pruning merge fetch taskId 58
DETAIL: Creating dependency on merge taskId 68
DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 21 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 27 to node localhost:57638
DEBUG: assigned task 33 to node localhost:57637
DEBUG: assigned task 48 to node localhost:57638
DEBUG: assigned task 21 to node localhost:57637
DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 27 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 48 to node localhost:57637
DEBUG: assigned task 33 to node localhost:57638
DEBUG: assigned task 39 to node localhost:57637
DEBUG: assigned task 57 to node localhost:57638
DEBUG: propagating assignment from merge task 19 to constrained sql task 6
@ -208,16 +208,16 @@ FROM
WHERE
l_partkey = c_nationkey;
DEBUG: StartTransactionCommand
DEBUG: assigned task 2 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 8 to node localhost:57638
DEBUG: assigned task 10 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638
DEBUG: assigned task 14 to node localhost:57637
DEBUG: assigned task 16 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 14 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2

View File

@ -790,11 +790,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardstate | nodename | nodeport
------------+-----------+----------
1 | localhost | 57638
1 | localhost | 57637
(2 rows)
--test with search_path is set
@ -808,11 +808,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardstate | nodename | nodeport
------------+-----------+----------
1 | localhost | 57638
1 | localhost | 57637
(2 rows)
-- test master_apply_delete_command with schemas

View File

@ -58,8 +58,8 @@ SELECT * FROM pg_dist_shard;
(0 rows)
SELECT * FROM pg_dist_shard_placement;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+----------+-------------
(0 rows)
-- check that the extension now can be dropped (and recreated)

View File

@ -353,16 +353,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey
-----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | |
2 | |
3 | |
@ -378,6 +368,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- Right join with single shard right most table should error out
@ -399,16 +399,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ]
t_custkey | r_custkey | l_custkey
-----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
@ -419,6 +409,16 @@ LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join
18 | 18 |
19 | 19 |
20 | 20 |
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(20 rows)
-- Make it anti-join, should display values with l_custkey is null
@ -458,16 +458,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
l_custkey | r_custkey
-----------+-----------
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
1 |
2 |
3 |
@ -488,6 +478,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
| 18
| 19
| 16
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
(30 rows)
-- full outer join + anti (right) should work with 1-1 matched shards
@ -577,6 +577,11 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey
-----------+-----------+-----------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
@ -587,11 +592,6 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
(15 rows)
-- inner (broadcast) join + 2 shards left (local) join should work
@ -604,16 +604,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey
-----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 |
2 | 2 |
3 | 3 |
@ -629,6 +619,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- inner (local) join + 2 shards left (dual partition) join should error out
@ -650,16 +650,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey
-----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 |
2 | 2 |
3 | 3 |
@ -675,6 +665,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows)
-- inner (broadcast) join + 2 shards left (local) + anti join should work
@ -712,16 +712,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ]
t_custkey
-----------
21
22
23
24
25
26
27
28
29
30
11
12
13
@ -732,6 +722,16 @@ LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_oute
18
19
20
21
22
23
24
25
26
27
28
29
30
(20 rows)
-- flattened out subqueries with outer joins are not supported

View File

@ -29,7 +29,7 @@ SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash
SELECT master_create_worker_shards('cluster_management_test', 16, 1);
-- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port);
@ -37,7 +37,7 @@ SELECT master_get_active_worker_nodes();
-- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with only inactive placements and see that node is removed
SELECT master_remove_node('localhost', :worker_2_port);

View File

@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -504,7 +504,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
--test with search_path is set
@ -515,7 +515,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
-- test master_apply_delete_command with schemas