mirror of https://github.com/citusdata/citus.git
parent
a8bd2d58f5
commit
1b2c438e69
|
@ -3216,14 +3216,14 @@ GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection)
|
||||||
CopyConnectionState *connectionState = NULL;
|
CopyConnectionState *connectionState = NULL;
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
|
||||||
int socket = PQsocket(connection->pgConn);
|
int sock = PQsocket(connection->pgConn);
|
||||||
Assert(socket != -1);
|
Assert(sock != -1);
|
||||||
|
|
||||||
connectionState = (CopyConnectionState *) hash_search(connectionStateHash, &socket,
|
connectionState = (CopyConnectionState *) hash_search(connectionStateHash, &sock,
|
||||||
HASH_ENTER, &found);
|
HASH_ENTER, &found);
|
||||||
if (!found)
|
if (!found)
|
||||||
{
|
{
|
||||||
connectionState->socket = socket;
|
connectionState->socket = sock;
|
||||||
connectionState->connection = connection;
|
connectionState->connection = connection;
|
||||||
connectionState->activePlacementState = NULL;
|
connectionState->activePlacementState = NULL;
|
||||||
dlist_init(&connectionState->bufferedPlacementList);
|
dlist_init(&connectionState->bufferedPlacementList);
|
||||||
|
|
|
@ -598,7 +598,7 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
|
||||||
{
|
{
|
||||||
MultiConnectionPollState *connectionState = (MultiConnectionPollState *) lfirst(
|
MultiConnectionPollState *connectionState = (MultiConnectionPollState *) lfirst(
|
||||||
connectionCell);
|
connectionCell);
|
||||||
int socket = 0;
|
int sock = 0;
|
||||||
int eventMask = 0;
|
int eventMask = 0;
|
||||||
|
|
||||||
if (numEventsAdded >= eventSetSize)
|
if (numEventsAdded >= eventSetSize)
|
||||||
|
@ -613,11 +613,11 @@ WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket = PQsocket(connectionState->connection->pgConn);
|
sock = PQsocket(connectionState->connection->pgConn);
|
||||||
|
|
||||||
eventMask = MultiConnectionStateEventMask(connectionState);
|
eventMask = MultiConnectionStateEventMask(connectionState);
|
||||||
|
|
||||||
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, connectionState);
|
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, connectionState);
|
||||||
numEventsAdded++;
|
numEventsAdded++;
|
||||||
|
|
||||||
if (waitCount)
|
if (waitCount)
|
||||||
|
|
|
@ -706,7 +706,7 @@ static bool
|
||||||
FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
|
FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
|
||||||
{
|
{
|
||||||
PGconn *pgConn = connection->pgConn;
|
PGconn *pgConn = connection->pgConn;
|
||||||
int socket = PQsocket(pgConn);
|
int sock = PQsocket(pgConn);
|
||||||
|
|
||||||
Assert(pgConn);
|
Assert(pgConn);
|
||||||
Assert(PQisnonblocking(pgConn));
|
Assert(PQisnonblocking(pgConn));
|
||||||
|
@ -752,7 +752,7 @@ FinishConnectionIO(MultiConnection *connection, bool raiseInterrupts)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = WaitLatchOrSocket(MyLatch, waitFlags, socket, 0, PG_WAIT_EXTENSION);
|
rc = WaitLatchOrSocket(MyLatch, waitFlags, sock, 0, PG_WAIT_EXTENSION);
|
||||||
if (rc & WL_POSTMASTER_DEATH)
|
if (rc & WL_POSTMASTER_DEATH)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
|
||||||
|
@ -1047,7 +1047,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
{
|
{
|
||||||
MultiConnection *connection = allConnections[pendingConnectionsStartIndex +
|
MultiConnection *connection = allConnections[pendingConnectionsStartIndex +
|
||||||
connectionIndex];
|
connectionIndex];
|
||||||
int socket = PQsocket(connection->pgConn);
|
int sock = PQsocket(connection->pgConn);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Always start by polling for both readability (server sent bytes)
|
* Always start by polling for both readability (server sent bytes)
|
||||||
|
@ -1055,7 +1055,7 @@ BuildWaitEventSet(MultiConnection **allConnections, int totalConnectionCount,
|
||||||
*/
|
*/
|
||||||
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
int eventMask = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
|
||||||
|
|
||||||
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, (void *) connection);
|
AddWaitEventToSet(waitEventSet, eventMask, sock, NULL, (void *) connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -3611,7 +3611,7 @@ BuildWaitEventSet(List *sessionList)
|
||||||
{
|
{
|
||||||
WorkerSession *session = lfirst(sessionCell);
|
WorkerSession *session = lfirst(sessionCell);
|
||||||
MultiConnection *connection = session->connection;
|
MultiConnection *connection = session->connection;
|
||||||
int socket = 0;
|
int sock = 0;
|
||||||
int waitEventSetIndex = 0;
|
int waitEventSetIndex = 0;
|
||||||
|
|
||||||
if (connection->pgConn == NULL)
|
if (connection->pgConn == NULL)
|
||||||
|
@ -3626,14 +3626,14 @@ BuildWaitEventSet(List *sessionList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket = PQsocket(connection->pgConn);
|
sock = PQsocket(connection->pgConn);
|
||||||
if (socket == -1)
|
if (sock == -1)
|
||||||
{
|
{
|
||||||
/* connection was closed */
|
/* connection was closed */
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, socket,
|
waitEventSetIndex = AddWaitEventToSet(waitEventSet, connection->waitFlags, sock,
|
||||||
NULL, (void *) session);
|
NULL, (void *) session);
|
||||||
session->waitEventSetIndex = waitEventSetIndex;
|
session->waitEventSetIndex = waitEventSetIndex;
|
||||||
}
|
}
|
||||||
|
@ -3658,7 +3658,7 @@ UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
|
||||||
{
|
{
|
||||||
WorkerSession *session = lfirst(sessionCell);
|
WorkerSession *session = lfirst(sessionCell);
|
||||||
MultiConnection *connection = session->connection;
|
MultiConnection *connection = session->connection;
|
||||||
int socket = 0;
|
int sock = 0;
|
||||||
int waitEventSetIndex = session->waitEventSetIndex;
|
int waitEventSetIndex = session->waitEventSetIndex;
|
||||||
|
|
||||||
if (connection->pgConn == NULL)
|
if (connection->pgConn == NULL)
|
||||||
|
@ -3673,8 +3673,8 @@ UpdateWaitEventSetFlags(WaitEventSet *waitEventSet, List *sessionList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket = PQsocket(connection->pgConn);
|
sock = PQsocket(connection->pgConn);
|
||||||
if (socket == -1)
|
if (sock == -1)
|
||||||
{
|
{
|
||||||
/* connection was closed */
|
/* connection was closed */
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -1348,8 +1348,8 @@ multi_join_restriction_hook(PlannerInfo *root,
|
||||||
* it to retrieve restrictions on relations.
|
* it to retrieve restrictions on relations.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index,
|
multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
RangeTblEntry *rte)
|
Index restrictionIndex, RangeTblEntry *rte)
|
||||||
{
|
{
|
||||||
PlannerRestrictionContext *plannerRestrictionContext = NULL;
|
PlannerRestrictionContext *plannerRestrictionContext = NULL;
|
||||||
RelationRestrictionContext *relationRestrictionContext = NULL;
|
RelationRestrictionContext *relationRestrictionContext = NULL;
|
||||||
|
@ -1379,7 +1379,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index
|
||||||
localTable = !distributedTable;
|
localTable = !distributedTable;
|
||||||
|
|
||||||
relationRestriction = palloc0(sizeof(RelationRestriction));
|
relationRestriction = palloc0(sizeof(RelationRestriction));
|
||||||
relationRestriction->index = index;
|
relationRestriction->index = restrictionIndex;
|
||||||
relationRestriction->relationId = rte->relid;
|
relationRestriction->relationId = rte->relid;
|
||||||
relationRestriction->rte = rte;
|
relationRestriction->rte = rte;
|
||||||
relationRestriction->relOptInfo = relOptInfo;
|
relationRestriction->relOptInfo = relOptInfo;
|
||||||
|
|
|
@ -457,7 +457,6 @@ FullCompositeFieldList(List *compositeFieldList)
|
||||||
foreach(fieldSelectCell, compositeFieldList)
|
foreach(fieldSelectCell, compositeFieldList)
|
||||||
{
|
{
|
||||||
FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell);
|
FieldSelect *fieldSelect = (FieldSelect *) lfirst(fieldSelectCell);
|
||||||
uint32 compositeFieldIndex = 0;
|
|
||||||
|
|
||||||
Expr *fieldExpression = fieldSelect->arg;
|
Expr *fieldExpression = fieldSelect->arg;
|
||||||
if (!IsA(fieldExpression, Var))
|
if (!IsA(fieldExpression, Var))
|
||||||
|
@ -467,7 +466,6 @@ FullCompositeFieldList(List *compositeFieldList)
|
||||||
|
|
||||||
if (compositeFieldArray == NULL)
|
if (compositeFieldArray == NULL)
|
||||||
{
|
{
|
||||||
uint32 index = 0;
|
|
||||||
Var *compositeColumn = (Var *) fieldExpression;
|
Var *compositeColumn = (Var *) fieldExpression;
|
||||||
Oid compositeTypeId = compositeColumn->vartype;
|
Oid compositeTypeId = compositeColumn->vartype;
|
||||||
Oid compositeRelationId = get_typ_typrelid(compositeTypeId);
|
Oid compositeRelationId = get_typ_typrelid(compositeTypeId);
|
||||||
|
@ -478,13 +476,15 @@ FullCompositeFieldList(List *compositeFieldList)
|
||||||
compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool));
|
compositeFieldArray = palloc0(compositeFieldCount * sizeof(bool));
|
||||||
relation_close(relation, AccessShareLock);
|
relation_close(relation, AccessShareLock);
|
||||||
|
|
||||||
for (index = 0; index < compositeFieldCount; index++)
|
for (uint32 compositeFieldIndex = 0;
|
||||||
|
compositeFieldIndex < compositeFieldCount;
|
||||||
|
compositeFieldIndex++)
|
||||||
{
|
{
|
||||||
compositeFieldArray[index] = false;
|
compositeFieldArray[compositeFieldIndex] = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
compositeFieldIndex = fieldSelect->fieldnum - 1;
|
uint32 compositeFieldIndex = fieldSelect->fieldnum - 1;
|
||||||
compositeFieldArray[compositeFieldIndex] = true;
|
compositeFieldArray[compositeFieldIndex] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -108,7 +108,7 @@ extern List * ExtractRangeTableEntryList(Query *query);
|
||||||
extern bool NeedsDistributedPlanning(Query *query);
|
extern bool NeedsDistributedPlanning(Query *query);
|
||||||
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
||||||
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
Index index, RangeTblEntry *rte);
|
Index restrictionIndex, RangeTblEntry *rte);
|
||||||
extern void multi_join_restriction_hook(PlannerInfo *root,
|
extern void multi_join_restriction_hook(PlannerInfo *root,
|
||||||
RelOptInfo *joinrel,
|
RelOptInfo *joinrel,
|
||||||
RelOptInfo *outerrel,
|
RelOptInfo *outerrel,
|
||||||
|
|
Loading…
Reference in New Issue