Introduce placement IDs.

So far placements were assigned an Oid, but that was just used to track
insertion order. It also did so incompletely, as it was not preserved
across changes of the shard state. The behaviour around oid wraparound
was also not entirely as intended.

The newly introduced, explicitly assigned, IDs are preserved across
shard-state changes.

The prime goal of this change is not to improve ordering of task
assignment policies, but to make it easier to reference shards.  The
newly introduced UpdateShardPlacementState() makes use of that, and so
will the in-progress connection and transaction management changes.
pull/857/head
Andres Freund 2016-09-06 19:23:17 -07:00
parent 7e8efbe540
commit 982ad66753
32 changed files with 413 additions and 228 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -66,6 +66,8 @@ $(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql $(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,28 @@
/* citus--5.2-1--5.2-2.sql */
/*
* Replace oid column in pg_dist_shard_placement with an sequence column.
*/
CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq
NO CYCLE;
ALTER SEQUENCE citus.pg_dist_shard_placement_placementid_seq
SET SCHEMA pg_catalog;
ALTER TABLE pg_catalog.pg_dist_shard_placement
ADD COLUMN placementid bigint;
-- keep existing oids, and update sequence to match max.
UPDATE pg_catalog.pg_dist_shard_placement SET placementid = oid;
ALTER TABLE pg_catalog.pg_dist_shard_placement
ALTER COLUMN placementid SET DEFAULT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq'),
ALTER COLUMN placementid SET NOT NULL,
SET WITHOUT OIDS;
CREATE UNIQUE INDEX pg_dist_shard_placement_placementid_index
ON pg_catalog.pg_dist_shard_placement using btree(placementid);
SELECT setval('pg_catalog.pg_dist_shard_placement_placementid_seq', max(placementid))
FROM pg_catalog.pg_dist_shard_placement;
CREATE FUNCTION master_get_new_placementid()
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_get_new_placementid$$;
COMMENT ON FUNCTION master_get_new_placementid()
IS 'fetch unique placementid';

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.0-4' default_version = '6.0-5'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -1030,7 +1030,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName,
failedPlacement->nodePort); failedPlacement->nodePort);
InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, InsertShardPlacementRow(failedPlacement->shardId, failedPlacement->placementId,
FILE_INACTIVE, shardLength,
failedPlacement->nodeName, failedPlacement->nodePort); failedPlacement->nodeName, failedPlacement->nodePort);
} }

View File

@ -609,7 +609,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task,
DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName,
failedPlacement->nodePort); failedPlacement->nodePort);
InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, InsertShardPlacementRow(failedPlacement->shardId,
failedPlacement->placementId,
FILE_INACTIVE, shardLength,
failedPlacement->nodeName, failedPlacement->nodePort); failedPlacement->nodeName, failedPlacement->nodePort);
} }
@ -1406,9 +1408,11 @@ MarkRemainingInactivePlacements(void)
uint64 shardId = shardConnSet->shardId; uint64 shardId = shardConnSet->shardId;
NodeConnectionKey *nodeKey = &participant->cacheKey; NodeConnectionKey *nodeKey = &participant->cacheKey;
uint64 shardLength = 0; uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort); placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName,
InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength, nodeKey->nodePort);
InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength,
nodeKey->nodeName, nodeKey->nodePort); nodeKey->nodeName, nodeKey->nodePort);
} }
} }

View File

@ -376,12 +376,13 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
foreach(lingeringPlacementCell, lingeringPlacementList) foreach(lingeringPlacementCell, lingeringPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName; char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort; uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength; uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort); DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_TO_DELETE, oldShardLength, InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength,
workerName, workerPort); workerName, workerPort);
ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"",

View File

@ -20,6 +20,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
@ -315,9 +316,9 @@ ShardPlacementList(uint64 shardId)
/* /*
* TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, and * TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement,
* converts this tuple to an equivalent struct in memory. The function assumes * and converts this tuple to in-memory struct. The function assumes the
* the caller already has locks on the tuple, and doesn't perform any locking. * caller already has locks on the tuple, and doesn't perform any locking.
*/ */
ShardPlacement * ShardPlacement *
TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
@ -325,7 +326,8 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
ShardPlacement *shardPlacement = NULL; ShardPlacement *shardPlacement = NULL;
bool isNull = false; bool isNull = false;
Oid tupleOid = HeapTupleGetOid(heapTuple); Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
tupleDescriptor, &isNull);
Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid, Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength, Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength,
@ -336,11 +338,14 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport, Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
Assert(!HeapTupleHasNulls(heapTuple)); HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
}
shardPlacement = CitusMakeNode(ShardPlacement); shardPlacement = CitusMakeNode(ShardPlacement);
shardPlacement->tupleOid = tupleOid; shardPlacement->placementId = DatumGetInt64(placementId);
shardPlacement->shardId = DatumGetInt64(shardId); shardPlacement->shardId = DatumGetInt64(shardId);
shardPlacement->shardLength = DatumGetInt64(shardLength); shardPlacement->shardLength = DatumGetInt64(shardLength);
shardPlacement->shardState = DatumGetUInt32(shardState); shardPlacement->shardState = DatumGetUInt32(shardState);
@ -408,10 +413,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
/* /*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts * InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. * a new row with the given values into that system catalog. If placementId is
* INVALID_PLACEMENT_ID, a new placement id will be assigned.
*/ */
void void
InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort) char *nodeName, uint32 nodePort)
{ {
Relation pgDistShardPlacement = NULL; Relation pgDistShardPlacement = NULL;
@ -424,11 +431,16 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength,
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
if (placementId == INVALID_PLACEMENT_ID)
{
placementId = master_get_new_placementid(NULL);
}
values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId);
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState); values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength); values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName);
values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort); values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort);
values[Anum_pg_dist_shard_placement_placementid - 1] = Int64GetDatum(placementId);
/* open shard placement relation and insert new tuple */ /* open shard placement relation and insert new tuple */
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
@ -496,7 +508,7 @@ DeleteShardRow(uint64 shardId)
* first (unique) row that corresponds to the given shardId and worker node, and * first (unique) row that corresponds to the given shardId and worker node, and
* deletes this row. * deletes this row.
*/ */
void uint64
DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
{ {
Relation pgDistShardPlacement = NULL; Relation pgDistShardPlacement = NULL;
@ -506,8 +518,12 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
bool indexOK = true; bool indexOK = true;
HeapTuple heapTuple = NULL; HeapTuple heapTuple = NULL;
bool heapTupleFound = false; bool heapTupleFound = false;
TupleDesc tupleDescriptor = NULL;
int64 placementId = INVALID_PLACEMENT_ID;
bool isNull = false;
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid, ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId));
@ -519,8 +535,6 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple)) while (HeapTupleIsValid(heapTuple))
{ {
TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple); ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple);
if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 && if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 &&
placement->nodePort == workerPort) placement->nodePort == workerPort)
@ -540,11 +554,73 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort)
shardId, workerName, workerPort))); shardId, workerName, workerPort)));
} }
placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement ||
HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple")));
}
simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self);
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
CommandCounterIncrement(); CommandCounterIncrement();
heap_close(pgDistShardPlacement, RowExclusiveLock); heap_close(pgDistShardPlacement, RowExclusiveLock);
return placementId;
}
/*
* UpdateShardPlacementState sets the shardState for the placement identified
* by placementId.
*/
void
UpdateShardPlacementState(uint64 placementId, char shardState)
{
Relation pgDistShardPlacement = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
Datum values[Natts_pg_dist_shard_placement];
bool isnull[Natts_pg_dist_shard_placement];
bool replace[Natts_pg_dist_shard_placement];
pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistShardPlacement);
ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_placementid,
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
scanDescriptor = systable_beginscan(pgDistShardPlacement,
DistShardPlacementPlacementidIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for shard placement "
UINT64_FORMAT,
placementId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState);
isnull[Anum_pg_dist_shard_placement_shardstate - 1] = false;
replace[Anum_pg_dist_shard_placement_shardstate - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistShardPlacement, heapTuple);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistShardPlacement, NoLock);
} }
@ -794,7 +870,8 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS)
EnsureTablePermissions(distributedRelationId, ACL_INSERT); EnsureTablePermissions(distributedRelationId, ACL_INSERT);
/* finally insert placement */ /* finally insert placement */
InsertShardPlacementRow(shardId, shardState, shardLength, nodeName, nodePort); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength,
nodeName, nodePort);
heap_close(relation, NoLock); heap_close(relation, NoLock);
heap_close(pgDistShard, NoLock); heap_close(pgDistShard, NoLock);

View File

@ -67,6 +67,7 @@ static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescripto
PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_metadata);
PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
PG_FUNCTION_INFO_V1(master_get_new_shardid); PG_FUNCTION_INFO_V1(master_get_new_shardid);
PG_FUNCTION_INFO_V1(master_get_new_placementid);
PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes);
PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes);
PG_FUNCTION_INFO_V1(master_get_active_worker_nodes); PG_FUNCTION_INFO_V1(master_get_active_worker_nodes);
@ -251,8 +252,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
* ahead logs; writing to logs avoids the risk of having shardId collisions. * ahead logs; writing to logs avoids the risk of having shardId collisions.
* *
* Please note that the caller is still responsible for finalizing shard data * Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node. Further note that this function relies * and the shardId with the master node.
* on an internal sequence created in initdb to generate unique identifiers.
* *
* NB: This can be called by any user; for now we have decided that that's * NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such * ok. We might want to restrict this to users part of a specific role or such
@ -280,6 +280,38 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
} }
/*
* master_get_new_placementid allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having shardId
* collisions.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
* at some later point.
*/
Datum
master_get_new_placementid(PG_FUNCTION_ARGS)
{
text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum shardIdDatum = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */
shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum);
}
/* /*
* master_get_local_first_candidate_nodes returns a set of candidate host names * master_get_local_first_candidate_nodes returns a set of candidate host names
* and port numbers on which to place new shards. The function makes sure to * and port numbers on which to place new shards. The function makes sure to

View File

@ -292,12 +292,14 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
foreach(failedPlacementCell, failedPlacementList) foreach(failedPlacementCell, failedPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName; char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort; uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength; uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort); DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength, InsertShardPlacementRow(shardId, placementId,
FILE_INACTIVE, oldShardLength,
workerName, workerPort); workerName, workerPort);
ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node "
@ -398,7 +400,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0; const uint64 shardSize = 0;
InsertShardPlacementRow(shardId, shardState, shardSize, nodeName, nodePort); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
nodeName, nodePort);
placementsCreated++; placementsCreated++;
} }
else else
@ -537,11 +540,12 @@ UpdateShardStatistics(int64 shardId)
foreach(shardPlacementCell, shardPlacementList) foreach(shardPlacementCell, shardPlacementList)
{ {
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 placementId = placement->placementId;
char *workerName = placement->nodeName; char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort; uint32 workerPort = placement->nodePort;
DeleteShardPlacementRow(shardId, workerName, workerPort); DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize, InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize,
workerName, workerPort); workerName, workerPort);
} }

View File

@ -5101,7 +5101,7 @@ CompareTasksByShardId(const void *leftElement, const void *rightElement)
/* /*
* ActiveShardPlacementLists finds the active shard placement list for each task in * ActiveShardPlacementLists finds the active shard placement list for each task in
* the given task list, sorts each shard placement list by tuple insertion time, * the given task list, sorts each shard placement list by shard creation time,
* and adds the sorted placement list into a new list of lists. The function also * and adds the sorted placement list into a new list of lists. The function also
* ensures a one-to-one mapping between each placement list in the new list of * ensures a one-to-one mapping between each placement list in the new list of
* lists and each task in the given task list. * lists and each task in the given task list.
@ -5122,7 +5122,7 @@ ActiveShardPlacementLists(List *taskList)
/* filter out shard placements that reside in inactive nodes */ /* filter out shard placements that reside in inactive nodes */
List *activeShardPlacementList = ActivePlacementList(shardPlacementList); List *activeShardPlacementList = ActivePlacementList(shardPlacementList);
/* sort shard placements by their insertion time */ /* sort shard placements by their creation time */
activeShardPlacementList = SortList(activeShardPlacementList, activeShardPlacementList = SortList(activeShardPlacementList,
CompareShardPlacements); CompareShardPlacements);
shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList); shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList);
@ -5142,12 +5142,21 @@ CompareShardPlacements(const void *leftElement, const void *rightElement)
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
Oid leftTupleOid = leftPlacement->tupleOid; uint64 leftPlacementId = leftPlacement->placementId;
Oid rightTupleOid = rightPlacement->tupleOid; uint64 rightPlacementId = rightPlacement->placementId;
/* tuples that are inserted earlier appear first */ if (leftPlacementId < rightPlacementId)
int tupleOidDiff = leftTupleOid - rightTupleOid; {
return tupleOidDiff; return -1;
}
else if (leftPlacementId > rightPlacementId)
{
return 1;
}
else
{
return 0;
}
} }

View File

@ -309,7 +309,8 @@ create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS)
int64 shardId = PG_GETARG_INT64(0); int64 shardId = PG_GETARG_INT64(0);
int64 shardLength = 0; int64 shardLength = 0;
InsertShardPlacementRow(shardId, FILE_FINALIZED, shardLength, "localhost", 5432); InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, shardLength,
"localhost", 5432);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -347,9 +348,11 @@ update_shard_placement_row_state(PG_FUNCTION_ARGS)
bool successful = true; bool successful = true;
char *hostNameString = text_to_cstring(hostName); char *hostNameString = text_to_cstring(hostName);
uint64 shardLength = 0; uint64 shardLength = 0;
uint64 placementId = INVALID_PLACEMENT_ID;
DeleteShardPlacementRow(shardId, hostNameString, hostPort); placementId = DeleteShardPlacementRow(shardId, hostNameString, hostPort);
InsertShardPlacementRow(shardId, shardState, shardLength, hostNameString, hostPort); InsertShardPlacementRow(shardId, placementId, shardState, shardLength,
hostNameString, hostPort);
PG_RETURN_BOOL(successful); PG_RETURN_BOOL(successful);
} }

View File

@ -431,7 +431,7 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node)
{ {
WRITE_NODE_TYPE("SHARDPLACEMENT"); WRITE_NODE_TYPE("SHARDPLACEMENT");
WRITE_OID_FIELD(tupleOid); WRITE_UINT64_FIELD(placementId);
WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardId);
WRITE_UINT64_FIELD(shardLength); WRITE_UINT64_FIELD(shardLength);
WRITE_ENUM_FIELD(shardState, RelayFileState); WRITE_ENUM_FIELD(shardState, RelayFileState);

View File

@ -1478,7 +1478,7 @@ _readShardPlacement(void)
{ {
READ_LOCALS(ShardPlacement); READ_LOCALS(ShardPlacement);
READ_OID_FIELD(tupleOid); READ_UINT64_FIELD(placementId);
READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardId);
READ_UINT64_FIELD(shardLength); READ_UINT64_FIELD(shardLength);
READ_ENUM_FIELD(shardState, RelayFileState); READ_ENUM_FIELD(shardState, RelayFileState);

View File

@ -58,6 +58,7 @@ static Oid distPartitionColocationidIndexId = InvalidOid;
static Oid distShardLogicalRelidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid;
static Oid distShardShardidIndexId = InvalidOid; static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid;
static Oid distShardPlacementPlacementidIndexId = InvalidOid;
static Oid distShardPlacementNodeidIndexId = InvalidOid; static Oid distShardPlacementNodeidIndexId = InvalidOid;
static Oid extraDataContainerFuncId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid;
@ -702,6 +703,17 @@ DistShardPlacementShardidIndexId(void)
} }
/* return oid of pg_dist_shard_placement_shardid_index */
Oid
DistShardPlacementPlacementidIndexId(void)
{
CachedRelationLookup("pg_dist_shard_placement_placementid_index",
&distShardPlacementPlacementidIndexId);
return distShardPlacementPlacementidIndexId;
}
/* return oid of pg_dist_shard_placement_nodeid_index */ /* return oid of pg_dist_shard_placement_nodeid_index */
Oid Oid
DistShardPlacementNodeidIndexId(void) DistShardPlacementNodeidIndexId(void)
@ -1234,6 +1246,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardLogicalRelidIndexId = InvalidOid; distShardLogicalRelidIndexId = InvalidOid;
distShardShardidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid;
distShardPlacementPlacementidIndexId = InvalidOid;
distNodeRelationId = InvalidOid; distNodeRelationId = InvalidOid;
extraDataContainerFuncId = InvalidOid; extraDataContainerFuncId = InvalidOid;
} }

View File

@ -47,7 +47,7 @@ typedef struct ShardInterval
typedef struct ShardPlacement typedef struct ShardPlacement
{ {
CitusNodeTag type; CitusNodeTag type;
Oid tupleOid; /* unique oid that implies this row's insertion order */ uint64 placementId; /* sequence that implies this placement creation order */
uint64 shardId; uint64 shardId;
uint64 shardLength; uint64 shardLength;
RelayFileState shardState; RelayFileState shardState;
@ -71,10 +71,14 @@ extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc,
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue); text *shardMinValue, text *shardMaxValue);
extern void InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, extern void DeleteShardRow(uint64 shardId);
extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength,
char *nodeName, uint32 nodePort); char *nodeName, uint32 nodePort);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern void DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
workerPort);
/* Remaining metadata utility functions */ /* Remaining metadata utility functions */
extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation, extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,

View File

@ -48,8 +48,8 @@
/* Name of columnar foreign data wrapper */ /* Name of columnar foreign data wrapper */
#define CSTORE_FDW_NAME "cstore_fdw" #define CSTORE_FDW_NAME "cstore_fdw"
/* ShardId sequence name as defined in initdb.c */
#define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq"
#define PLACEMENTID_SEQUENCE_NAME "pg_dist_shard_placement_placementid_seq"
/* Remote call definitions to help with data staging and deletion */ /* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \ #define WORKER_APPLY_SHARD_DDL_COMMAND \
@ -97,10 +97,11 @@ extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventL
int workerStartIndex, int replicationFactor); int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
/* Function declarations for generating metadata for shard creation */ /* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS); extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS);
extern Datum master_get_new_shardid(PG_FUNCTION_ARGS); extern Datum master_get_new_shardid(PG_FUNCTION_ARGS);
extern Datum master_get_new_placementid(PG_FUNCTION_ARGS);
extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS);
extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS);
extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS); extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);

View File

@ -75,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void); extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void);
extern Oid DistShardPlacementPlacementidIndexId(void);
extern Oid DistShardPlacementNodeidIndexId(void); extern Oid DistShardPlacementNodeidIndexId(void);
/* function oids */ /* function oids */

View File

@ -29,6 +29,7 @@ typedef struct FormData_pg_dist_shard_placement
#ifdef CATALOG_VARLEN /* variable-length fields start here */ #ifdef CATALOG_VARLEN /* variable-length fields start here */
text nodename; /* remote node's host name */ text nodename; /* remote node's host name */
int32 nodeport; /* remote node's port number */ int32 nodeport; /* remote node's port number */
int64 placementid; /* global placementId on remote node (added later) */
#endif #endif
} FormData_pg_dist_shard_placement; } FormData_pg_dist_shard_placement;
@ -43,12 +44,13 @@ typedef FormData_pg_dist_shard_placement *Form_pg_dist_shard_placement;
* compiler constants for pg_dist_shard_placement * compiler constants for pg_dist_shard_placement
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_shard_placement 5 #define Natts_pg_dist_shard_placement 6
#define Anum_pg_dist_shard_placement_shardid 1 #define Anum_pg_dist_shard_placement_shardid 1
#define Anum_pg_dist_shard_placement_shardstate 2 #define Anum_pg_dist_shard_placement_shardstate 2
#define Anum_pg_dist_shard_placement_shardlength 3 #define Anum_pg_dist_shard_placement_shardlength 3
#define Anum_pg_dist_shard_placement_nodename 4 #define Anum_pg_dist_shard_placement_nodename 4
#define Anum_pg_dist_shard_placement_nodeport 5 #define Anum_pg_dist_shard_placement_nodeport 5
#define Anum_pg_dist_shard_placement_placementid 6
#endif /* PG_DIST_SHARD_PLACEMENT_H */ #endif /* PG_DIST_SHARD_PLACEMENT_H */

View File

@ -23,6 +23,7 @@
/* Shard name and identifier related defines */ /* Shard name and identifier related defines */
#define SHARD_NAME_SEPARATOR '_' #define SHARD_NAME_SEPARATOR '_'
#define INVALID_SHARD_ID 0 #define INVALID_SHARD_ID 0
#define INVALID_PLACEMENT_ID 0
/* /*
* RelayFileState represents last known states of shards on a given node. We * RelayFileState represents last known states of shards on a given node. We

View File

@ -98,10 +98,10 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li
GROUP BY l_quantity ORDER BY l_quantity; GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | count | avg | array_agg l_quantity | count | avg | array_agg
------------+-------+-----------------------+-------------------------------------------------------------------------------------------------- ------------+-------+-----------------------+--------------------------------------------------------------------------------------------------
1.00 | 17 | 1477.1258823529411765 | {8997,9026,9158,9184,9220,9222,9348,9383,9476,5543,5633,5634,5698,5766,5856,5857,5986} 1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476}
2.00 | 19 | 3078.4242105263157895 | {9030,9058,9123,9124,9188,9344,9441,9476,5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923} 2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476}
3.00 | 14 | 4714.0392857142857143 | {9124,9157,9184,9223,9254,9349,9414,9475,9477,5509,5543,5605,5606,5827} 3.00 | 14 | 4714.0392857142857143 | {5509,5543,5605,5606,5827,9124,9157,9184,9223,9254,9349,9414,9475,9477}
4.00 | 19 | 5929.7136842105263158 | {9091,9120,9281,9347,9382,9440,9473,5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985} 4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473}
(4 rows) (4 rows)
SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
@ -109,10 +109,10 @@ SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | my_month l_quantity | my_month
------------+------------------------------------------------ ------------+------------------------------------------------
1.00 | {7,7,4,7,4,2,6,3,5,9,5,7,5,9,11,11,4} 1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5}
2.00 | {7,6,6,10,1,12,6,5,11,10,8,5,5,12,3,11,7,11,5} 2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5}
3.00 | {10,6,7,8,5,8,9,11,3,4,9,8,11,7} 3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3}
4.00 | {11,6,2,8,2,6,10,1,5,6,11,12,10,9,6,1,2,5,1} 4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10}
(4 rows) (4 rows)
SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5 SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5
@ -120,10 +120,10 @@ SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity
AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity;
l_quantity | array_agg l_quantity | array_agg
------------+--------------------------------------------- ------------+---------------------------------------------
1.00 | {18317,18445,11269,11397,11713,11715,11973} 1.00 | {11269,11397,11713,11715,11973,18317,18445}
2.00 | {18061,18247,18953,11847} 2.00 | {11847,18061,18247,18953}
3.00 | {18249,18315,18699,18951,18955} 3.00 | {18249,18315,18699,18951,18955}
4.00 | {18241,18765,11653,11659} 4.00 | {11653,11659,18241,18765}
(4 rows) (4 rows)
-- Check that we can execute array_agg() with an expression containing NULL values -- Check that we can execute array_agg() with an expression containing NULL values

View File

@ -72,17 +72,17 @@ SELECT master_create_worker_shards('cluster_management_test', 16, 1);
(1 row) (1 row)
-- see that there are some active placements in the candidate node -- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-----------+----------
1220001 | 1 | 0 | localhost | 57638 1220001 | 1 | localhost | 57638
1220003 | 1 | 0 | localhost | 57638 1220003 | 1 | localhost | 57638
1220005 | 1 | 0 | localhost | 57638 1220005 | 1 | localhost | 57638
1220007 | 1 | 0 | localhost | 57638 1220007 | 1 | localhost | 57638
1220009 | 1 | 0 | localhost | 57638 1220009 | 1 | localhost | 57638
1220011 | 1 | 0 | localhost | 57638 1220011 | 1 | localhost | 57638
1220013 | 1 | 0 | localhost | 57638 1220013 | 1 | localhost | 57638
1220015 | 1 | 0 | localhost | 57638 1220015 | 1 | localhost | 57638
(8 rows) (8 rows)
-- try to remove a node with active placements and see that node removal is failed -- try to remove a node with active placements and see that node removal is failed
@ -97,17 +97,17 @@ SELECT master_get_active_worker_nodes();
-- mark all placements in the candidate node as inactive -- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | nodename | nodeport
---------+------------+-------------+-----------+---------- ---------+------------+-----------+----------
1220001 | 3 | 0 | localhost | 57638 1220001 | 3 | localhost | 57638
1220003 | 3 | 0 | localhost | 57638 1220003 | 3 | localhost | 57638
1220005 | 3 | 0 | localhost | 57638 1220005 | 3 | localhost | 57638
1220007 | 3 | 0 | localhost | 57638 1220007 | 3 | localhost | 57638
1220009 | 3 | 0 | localhost | 57638 1220009 | 3 | localhost | 57638
1220011 | 3 | 0 | localhost | 57638 1220011 | 3 | localhost | 57638
1220013 | 3 | 0 | localhost | 57638 1220013 | 3 | localhost | 57638
1220015 | 3 | 0 | localhost | 57638 1220015 | 3 | localhost | 57638
(8 rows) (8 rows)
-- try to remove a node with only inactive placements and see that node is removed -- try to remove a node with only inactive placements and see that node is removed

View File

@ -444,16 +444,16 @@ LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30 DEBUG: push down of limit count: 30
l_partkey | o_custkey | l_quantity l_partkey | o_custkey | l_quantity
-----------+-----------+------------ -----------+-----------+------------
135912 | 509 | 26.00 154380 | 421 | 26.00
75351 | 1261 | 26.00 103981 | 325 | 26.00
199475 | 1052 | 26.00 77886 | 817 | 26.00
91309 | 8 | 26.00 147369 | 755 | 26.00
53624 | 400 | 26.00 78175 | 1075 | 26.00
182736 | 1048 | 26.00 109784 | 1268 | 26.00
59694 | 163 | 26.00 28635 | 1207 | 26.00
20481 | 173 | 26.00 188845 | 554 | 26.00
78748 | 1499 | 26.00 189398 | 844 | 26.00
7614 | 1397 | 26.00 71383 | 1397 | 26.00
(10 rows) (10 rows)
RESET client_min_messages; RESET client_min_messages;

View File

@ -40,7 +40,7 @@ Distributed Query into pg_merge_job_570000
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> HashAggregate -> HashAggregate
Group Key: l_quantity Group Key: l_quantity
-> Seq Scan on lineitem_290000 lineitem -> Seq Scan on lineitem_290001 lineitem
Master Query Master Query
-> Sort -> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0 Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0
@ -71,7 +71,7 @@ EXPLAIN (COSTS FALSE, FORMAT JSON)
{ {
"Node Type": "Seq Scan", "Node Type": "Seq Scan",
"Parent Relationship": "Outer", "Parent Relationship": "Outer",
"Relation Name": "lineitem_290000", "Relation Name": "lineitem_290001",
"Alias": "lineitem" "Alias": "lineitem"
} }
] ]
@ -140,7 +140,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML)
<Plan> <Plan>
<Node-Type>Seq Scan</Node-Type> <Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship> <Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_290000</Relation-Name> <Relation-Name>lineitem_290001</Relation-Name>
<Alias>lineitem</Alias> <Alias>lineitem</Alias>
</Plan> </Plan>
</Plans> </Plans>
@ -206,7 +206,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML)
Plans: Plans:
- Node Type: "Seq Scan" - Node Type: "Seq Scan"
Parent Relationship: "Outer" Parent Relationship: "Outer"
Relation Name: "lineitem_290000" Relation Name: "lineitem_290001"
Alias: "lineitem" Alias: "lineitem"
Master Query: Master Query:
@ -238,7 +238,7 @@ Distributed Query into pg_merge_job_570006
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> HashAggregate -> HashAggregate
Group Key: l_quantity Group Key: l_quantity
-> Seq Scan on lineitem_290000 lineitem -> Seq Scan on lineitem_290001 lineitem
Master Query Master Query
-> Sort -> Sort
Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0 Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0
@ -256,7 +256,7 @@ Distributed Query into pg_merge_job_570007
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> Aggregate -> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity) Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_290000 lineitem -> Seq Scan on public.lineitem_290001 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query Master Query
-> Aggregate -> Aggregate
@ -279,7 +279,7 @@ Distributed Query into pg_merge_job_570008
Sort Key: lineitem.l_quantity Sort Key: lineitem.l_quantity
-> Hash Join -> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_290000 lineitem -> Seq Scan on lineitem_290001 lineitem
Filter: (l_quantity < 5.0) Filter: (l_quantity < 5.0)
-> Hash -> Hash
-> Seq Scan on orders_290008 orders -> Seq Scan on orders_290008 orders
@ -296,7 +296,7 @@ Distributed Query
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_290000 -> Insert on lineitem_290000
-> Result -> Result
-- Test update -- Test update
@ -309,7 +309,7 @@ Distributed Query
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_290000 -> Update on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1) Recheck Cond: (l_orderkey = 1)
@ -325,7 +325,7 @@ Distributed Query
Task Count: 1 Task Count: 1
Tasks Shown: All Tasks Shown: All
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_290000 -> Delete on lineitem_290000
-> Bitmap Heap Scan on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000
Recheck Cond: (l_orderkey = 1) Recheck Cond: (l_orderkey = 1)
@ -361,7 +361,7 @@ Distributed Query into pg_merge_job_570012
Tasks Shown: One of 8 Tasks Shown: One of 8
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_290000 lineitem -> Seq Scan on lineitem_290001 lineitem
Master Query Master Query
-> Seq Scan on pg_merge_job_570012 -> Seq Scan on pg_merge_job_570012
-- Test all tasks output -- Test all tasks output
@ -375,22 +375,22 @@ Distributed Query into pg_merge_job_570013
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> Aggregate -> Aggregate
-> Seq Scan on lineitem_290004 lineitem -> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030) Filter: (l_orderkey > 9030)
-> Task -> Task
Node: host=localhost port=57638 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Aggregate -> Aggregate
-> Seq Scan on lineitem_290005 lineitem -> Seq Scan on lineitem_290004 lineitem
Filter: (l_orderkey > 9030) Filter: (l_orderkey > 9030)
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> Aggregate -> Aggregate
-> Seq Scan on lineitem_290006 lineitem -> Seq Scan on lineitem_290007 lineitem
Filter: (l_orderkey > 9030) Filter: (l_orderkey > 9030)
-> Task -> Task
Node: host=localhost port=57638 dbname=regression Node: host=localhost port=57638 dbname=regression
-> Aggregate -> Aggregate
-> Seq Scan on lineitem_290007 lineitem -> Seq Scan on lineitem_290006 lineitem
Filter: (l_orderkey > 9030) Filter: (l_orderkey > 9030)
Master Query Master Query
-> Aggregate -> Aggregate
@ -413,7 +413,7 @@ Distributed Query into pg_merge_job_570016
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> Aggregate -> Aggregate
-> Seq Scan on lineitem_290004 lineitem -> Seq Scan on lineitem_290005 lineitem
Filter: (l_orderkey > 9030) Filter: (l_orderkey > 9030)
Master Query Master Query
-> Aggregate -> Aggregate

View File

@ -30,6 +30,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4'; ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -69,14 +69,14 @@ DEBUG: generated sql query for job 1250 and task 21
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: generated sql query for job 1250 and task 24 DEBUG: generated sql query for job 1250 and task 24
DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))"
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 15 to node localhost:57637 DEBUG: assigned task 18 to node localhost:57637
DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 21 to node localhost:57637 DEBUG: assigned task 24 to node localhost:57637
DEBUG: assigned task 24 to node localhost:57638 DEBUG: assigned task 21 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3 DEBUG: generated sql query for job 1251 and task 3
@ -107,8 +107,8 @@ DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 10 DETAIL: Creating dependency on merge taskId 10
DEBUG: pruning merge fetch taskId 7 DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 13 DETAIL: Creating dependency on merge taskId 13
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57637
DEBUG: completed cleanup query for job 1252 DEBUG: completed cleanup query for job 1252
DEBUG: completed cleanup query for job 1252 DEBUG: completed cleanup query for job 1252
@ -178,20 +178,20 @@ DEBUG: generated sql query for job 1253 and task 14
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)" DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)"
DEBUG: generated sql query for job 1253 and task 16 DEBUG: generated sql query for job 1253 and task 16
DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)" DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)"
DEBUG: assigned task 2 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 8 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 10 to node localhost:57637 DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638 DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 14 to node localhost:57637 DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 16 to node localhost:57638 DEBUG: assigned task 14 to node localhost:57638
DEBUG: generated sql query for job 1254 and task 2 DEBUG: generated sql query for job 1254 and task 2
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)" DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)"
DEBUG: generated sql query for job 1254 and task 4 DEBUG: generated sql query for job 1254 and task 4
DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)" DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)"
DEBUG: assigned task 2 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 2
DEBUG: join prunable for task partitionId 0 and 3 DEBUG: join prunable for task partitionId 0 and 3

View File

@ -27,8 +27,8 @@ FROM
WHERE WHERE
o_custkey = c_custkey; o_custkey = c_custkey;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: assigned task 2 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57638
DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [1001,2000]
DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1,1000] and [6001,7000]
DEBUG: join prunable for intervals [1001,2000] and [1,1000] DEBUG: join prunable for intervals [1001,2000] and [1,1000]
@ -41,8 +41,8 @@ DEBUG: pruning merge fetch taskId 4
DETAIL: Creating dependency on merge taskId 8 DETAIL: Creating dependency on merge taskId 8
DEBUG: pruning merge fetch taskId 7 DEBUG: pruning merge fetch taskId 7
DETAIL: Creating dependency on merge taskId 11 DETAIL: Creating dependency on merge taskId 11
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57637
DEBUG: CommitTransactionCommand DEBUG: CommitTransactionCommand
count count
@ -66,10 +66,10 @@ WHERE
o_custkey = c_custkey AND o_custkey = c_custkey AND
o_orderkey = l_orderkey; o_orderkey = l_orderkey;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: assigned task 15 to node localhost:57637 DEBUG: assigned task 9 to node localhost:57637
DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 15 to node localhost:57638
DEBUG: assigned task 18 to node localhost:57637 DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638 DEBUG: assigned task 18 to node localhost:57638
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 3 to node localhost:57637
DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57638
DEBUG: join prunable for intervals [1,1509] and [2951,4455] DEBUG: join prunable for intervals [1,1509] and [2951,4455]
@ -156,12 +156,12 @@ DEBUG: pruning merge fetch taskId 55
DETAIL: Creating dependency on merge taskId 68 DETAIL: Creating dependency on merge taskId 68
DEBUG: pruning merge fetch taskId 58 DEBUG: pruning merge fetch taskId 58
DETAIL: Creating dependency on merge taskId 68 DETAIL: Creating dependency on merge taskId 68
DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 21 to node localhost:57637
DEBUG: assigned task 21 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57638
DEBUG: assigned task 9 to node localhost:57637 DEBUG: assigned task 27 to node localhost:57637
DEBUG: assigned task 27 to node localhost:57638 DEBUG: assigned task 9 to node localhost:57638
DEBUG: assigned task 33 to node localhost:57637 DEBUG: assigned task 48 to node localhost:57637
DEBUG: assigned task 48 to node localhost:57638 DEBUG: assigned task 33 to node localhost:57638
DEBUG: assigned task 39 to node localhost:57637 DEBUG: assigned task 39 to node localhost:57637
DEBUG: assigned task 57 to node localhost:57638 DEBUG: assigned task 57 to node localhost:57638
DEBUG: propagating assignment from merge task 19 to constrained sql task 6 DEBUG: propagating assignment from merge task 19 to constrained sql task 6
@ -208,16 +208,16 @@ FROM
WHERE WHERE
l_partkey = c_nationkey; l_partkey = c_nationkey;
DEBUG: StartTransactionCommand DEBUG: StartTransactionCommand
DEBUG: assigned task 2 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 8 to node localhost:57637
DEBUG: assigned task 8 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 10 to node localhost:57637 DEBUG: assigned task 12 to node localhost:57637
DEBUG: assigned task 12 to node localhost:57638 DEBUG: assigned task 10 to node localhost:57638
DEBUG: assigned task 14 to node localhost:57637 DEBUG: assigned task 16 to node localhost:57637
DEBUG: assigned task 16 to node localhost:57638 DEBUG: assigned task 14 to node localhost:57638
DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57637
DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 1
DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 2

View File

@ -790,11 +790,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh
(1 row) (1 row)
-- verify shardstate -- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport shardstate | nodename | nodeport
---------+------------+-------------+-----------+---------- ------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638 1 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637 1 | localhost | 57637
(2 rows) (2 rows)
--test with search_path is set --test with search_path is set
@ -808,11 +808,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh
(1 row) (1 row)
-- verify shardstate -- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport shardstate | nodename | nodeport
---------+------------+-------------+-----------+---------- ------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638 1 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637 1 | localhost | 57637
(2 rows) (2 rows)
-- test master_apply_delete_command with schemas -- test master_apply_delete_command with schemas

View File

@ -58,8 +58,8 @@ SELECT * FROM pg_dist_shard;
(0 rows) (0 rows)
SELECT * FROM pg_dist_shard_placement; SELECT * FROM pg_dist_shard_placement;
shardid | shardstate | shardlength | nodename | nodeport shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+---------- ---------+------------+-------------+----------+----------+-------------
(0 rows) (0 rows)
-- check that the extension now can be dropped (and recreated) -- check that the extension now can be dropped (and recreated)

View File

@ -353,16 +353,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey l_custkey | r_custkey | t_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | | 1 | |
2 | | 2 | |
3 | | 3 | |
@ -378,6 +368,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- Right join with single shard right most table should error out -- Right join with single shard right most table should error out
@ -399,16 +399,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ] LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ]
t_custkey | r_custkey | l_custkey t_custkey | r_custkey | l_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
11 | 11 | 11 11 | 11 | 11
12 | 12 | 12 12 | 12 | 12
13 | 13 | 13 13 | 13 | 13
@ -419,6 +409,16 @@ LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join
18 | 18 | 18 | 18 |
19 | 19 | 19 | 19 |
20 | 20 | 20 | 20 |
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(20 rows) (20 rows)
-- Make it anti-join, should display values with l_custkey is null -- Make it anti-join, should display values with l_custkey is null
@ -458,16 +458,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ]
l_custkey | r_custkey l_custkey | r_custkey
-----------+----------- -----------+-----------
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
1 | 1 |
2 | 2 |
3 | 3 |
@ -488,6 +478,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
| 18 | 18
| 19 | 19
| 16 | 16
21 | 21
22 | 22
23 | 23
24 | 24
25 | 25
26 | 26
27 | 27
28 | 28
29 | 29
30 | 30
(30 rows) (30 rows)
-- full outer join + anti (right) should work with 1-1 matched shards -- full outer join + anti (right) should work with 1-1 matched shards
@ -577,6 +577,11 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ]
l_custkey | r_custkey | t_custkey l_custkey | r_custkey | t_custkey
-----------+-----------+----------- -----------+-----------+-----------
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
21 | 21 | 21 21 | 21 | 21
22 | 22 | 22 22 | 22 | 22
23 | 23 | 23 23 | 23 | 23
@ -587,11 +592,6 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer
28 | 28 | 28 28 | 28 | 28
29 | 29 | 29 29 | 29 | 29
30 | 30 | 30 30 | 30 | 30
11 | 11 | 11
12 | 12 | 12
13 | 13 | 13
14 | 14 | 14
15 | 15 | 15
(15 rows) (15 rows)
-- inner (broadcast) join + 2 shards left (local) join should work -- inner (broadcast) join + 2 shards left (local) join should work
@ -604,16 +604,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey l_custkey | t_custkey | r_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 3 |
@ -629,6 +619,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- inner (local) join + 2 shards left (dual partition) join should error out -- inner (local) join + 2 shards left (dual partition) join should error out
@ -650,16 +650,6 @@ FROM
LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ]
l_custkey | t_custkey | r_custkey l_custkey | t_custkey | r_custkey
-----------+-----------+----------- -----------+-----------+-----------
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
1 | 1 | 1 | 1 |
2 | 2 | 2 | 2 |
3 | 3 | 3 | 3 |
@ -675,6 +665,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_
13 | 13 | 13 13 | 13 | 13
14 | 14 | 14 14 | 14 | 14
15 | 15 | 15 15 | 15 | 15
21 | 21 | 21
22 | 22 | 22
23 | 23 | 23
24 | 24 | 24
25 | 25 | 25
26 | 26 | 26
27 | 27 | 27
28 | 28 | 28
29 | 29 | 29
30 | 30 | 30
(25 rows) (25 rows)
-- inner (broadcast) join + 2 shards left (local) + anti join should work -- inner (broadcast) join + 2 shards left (local) + anti join should work
@ -712,16 +712,6 @@ FROM
LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ] LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ]
t_custkey t_custkey
----------- -----------
21
22
23
24
25
26
27
28
29
30
11 11
12 12
13 13
@ -732,6 +722,16 @@ LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_oute
18 18
19 19
20 20
21
22
23
24
25
26
27
28
29
30
(20 rows) (20 rows)
-- flattened out subqueries with outer joins are not supported -- flattened out subqueries with outer joins are not supported

View File

@ -29,7 +29,7 @@ SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash
SELECT master_create_worker_shards('cluster_management_test', 16, 1); SELECT master_create_worker_shards('cluster_management_test', 16, 1);
-- see that there are some active placements in the candidate node -- see that there are some active placements in the candidate node
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with active placements and see that node removal is failed -- try to remove a node with active placements and see that node removal is failed
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -37,7 +37,7 @@ SELECT master_get_active_worker_nodes();
-- mark all placements in the candidate node as inactive -- mark all placements in the candidate node as inactive
UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port;
SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port;
-- try to remove a node with only inactive placements and see that node is removed -- try to remove a node with only inactive placements and see that node is removed
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -46,4 +46,4 @@ SELECT master_get_active_worker_nodes();
-- clean-up -- clean-up
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port;
DROP TABLE cluster_management_test; DROP TABLE cluster_management_test;

View File

@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1';
ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-2';
ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-3';
ALTER EXTENSION citus UPDATE TO '6.0-4'; ALTER EXTENSION citus UPDATE TO '6.0-4';
ALTER EXTENSION citus UPDATE TO '6.0-5';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;

View File

@ -504,7 +504,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate -- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
--test with search_path is set --test with search_path is set
@ -515,7 +515,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate -- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000;
-- test master_apply_delete_command with schemas -- test master_apply_delete_command with schemas