mirror of https://github.com/citusdata/citus.git
parent
e2a24b921e
commit
639588bee0
|
@ -745,19 +745,6 @@ GetForeignKeysFromLocalTables(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* HasForeignKeyToCitusLocalTable returns true if any of the foreign key constraints
|
|
||||||
* on the relation with relationId references to a citus local table.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
HasForeignKeyToCitusLocalTable(Oid relationId)
|
|
||||||
{
|
|
||||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_CITUS_LOCAL_TABLES;
|
|
||||||
List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
|
|
||||||
return list_length(foreignKeyOidList) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HasForeignKeyToReferenceTable returns true if any of the foreign key
|
* HasForeignKeyToReferenceTable returns true if any of the foreign key
|
||||||
* constraints on the relation with relationId references to a reference
|
* constraints on the relation with relationId references to a reference
|
||||||
|
|
|
@ -217,15 +217,6 @@ struct CopyShardState
|
||||||
List *placementStateList;
|
List *placementStateList;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* ShardConnections represents a set of connections for each placement of a shard */
|
|
||||||
typedef struct ShardConnections
|
|
||||||
{
|
|
||||||
int64 shardId;
|
|
||||||
|
|
||||||
/* list of MultiConnection structs */
|
|
||||||
List *connectionList;
|
|
||||||
} ShardConnections;
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Represents the state for allowing copy via local
|
* Represents the state for allowing copy via local
|
||||||
|
@ -1195,7 +1186,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
|
||||||
bool haveDetail = remoteDetail != NULL;
|
bool haveDetail = remoteDetail != NULL;
|
||||||
|
|
||||||
ereport(ERROR, (errmsg("%s", remoteMessage),
|
ereport(ERROR, (errmsg("%s", remoteMessage),
|
||||||
haveDetail ? errdetail("%s", ApplyLogRedaction(remoteDetail)) :
|
haveDetail ? errdetail("%s", remoteDetail) :
|
||||||
0));
|
0));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1206,7 +1197,7 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
|
||||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
errmsg("failed to complete COPY on %s:%d", connection->hostname,
|
errmsg("failed to complete COPY on %s:%d", connection->hostname,
|
||||||
connection->port),
|
connection->port),
|
||||||
errdetail("%s", ApplyLogRedaction(remoteMessage))));
|
errdetail("%s", remoteMessage)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1415,53 +1415,6 @@ set_indexsafe_procflags(void)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
|
|
||||||
* of shards of a distributed table. The command to be applied is generated by the
|
|
||||||
* TableDDLCommand structure passed in.
|
|
||||||
*/
|
|
||||||
DDLJob *
|
|
||||||
CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
|
|
||||||
{
|
|
||||||
List *taskList = NIL;
|
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
|
||||||
Oid namespace = get_rel_namespace(relationId);
|
|
||||||
char *namespaceName = get_namespace_name(namespace);
|
|
||||||
int taskId = 1;
|
|
||||||
|
|
||||||
/* lock metadata before getting placement lists */
|
|
||||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
|
||||||
|
|
||||||
ShardInterval *shardInterval = NULL;
|
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
|
||||||
{
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
|
||||||
|
|
||||||
char *commandStr = GetShardedTableDDLCommand(command, shardId, namespaceName);
|
|
||||||
|
|
||||||
Task *task = CitusMakeNode(Task);
|
|
||||||
task->jobId = jobId;
|
|
||||||
task->taskId = taskId++;
|
|
||||||
task->taskType = DDL_TASK;
|
|
||||||
SetTaskQueryString(task, commandStr);
|
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
|
||||||
task->dependentTaskList = NULL;
|
|
||||||
task->anchorShardId = shardId;
|
|
||||||
task->taskPlacementList = ActiveShardPlacementList(shardId);
|
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
|
||||||
}
|
|
||||||
|
|
||||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
|
||||||
ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
|
|
||||||
ddlJob->metadataSyncCommand = GetTableDDLCommand(command);
|
|
||||||
ddlJob->taskList = taskList;
|
|
||||||
|
|
||||||
return ddlJob;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SetSearchPathToCurrentSearchPathCommand generates a command which can
|
* SetSearchPathToCurrentSearchPathCommand generates a command which can
|
||||||
* set the search path to the exact same search path that the issueing node
|
* set the search path to the exact same search path that the issueing node
|
||||||
|
|
|
@ -258,10 +258,6 @@ ReportConnectionError(MultiConnection *connection, int elevel)
|
||||||
|
|
||||||
if (messageDetail)
|
if (messageDetail)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* We don't use ApplyLogRedaction(messageDetail) as we expect any error
|
|
||||||
* detail that requires log reduction should have done it locally.
|
|
||||||
*/
|
|
||||||
ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(elevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("connection to the remote node %s:%d failed with the "
|
errmsg("connection to the remote node %s:%d failed with the "
|
||||||
"following error: %s", nodeName, nodePort,
|
"following error: %s", nodeName, nodePort,
|
||||||
|
@ -315,7 +311,7 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
|
||||||
|
|
||||||
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||||
messageDetail ?
|
messageDetail ?
|
||||||
errdetail("%s", ApplyLogRedaction(messageDetail)) : 0,
|
errdetail("%s", messageDetail) : 0,
|
||||||
messageHint ? errhint("%s", messageHint) : 0,
|
messageHint ? errhint("%s", messageHint) : 0,
|
||||||
messageContext ? errcontext("%s", messageContext) : 0,
|
messageContext ? errcontext("%s", messageContext) : 0,
|
||||||
errcontext("while executing command on %s:%d",
|
errcontext("while executing command on %s:%d",
|
||||||
|
@ -349,7 +345,7 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
|
ereport(NOTICE, (errmsg("issuing %s", command),
|
||||||
errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
|
errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
|
||||||
connection->hostname,
|
connection->hostname,
|
||||||
connection->port, connection->connectionId)));
|
connection->port, connection->connectionId)));
|
||||||
|
|
|
@ -173,7 +173,7 @@ DefaultCitusNoticeReceiver(void *arg, const PGresult *result)
|
||||||
|
|
||||||
ereport(logLevel,
|
ereport(logLevel,
|
||||||
(errcode(sqlState),
|
(errcode(sqlState),
|
||||||
errmsg("%s", ApplyLogRedaction(trimmedMessage)),
|
errmsg("%s", trimmedMessage),
|
||||||
errdetail("from %s:%d", nodeName, nodePort)));
|
errdetail("from %s:%d", nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -519,7 +519,7 @@ LogLocalCommand(Task *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||||
ApplyLogRedaction(command))));
|
command)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -155,9 +155,9 @@ MultiClientSendQuery(int32 connectionId, const char *query)
|
||||||
* we cannot send the queries that Citus itself produced.
|
* we cannot send the queries that Citus itself produced.
|
||||||
*/
|
*/
|
||||||
ereport(WARNING, (errmsg("could not send remote query \"%s\"",
|
ereport(WARNING, (errmsg("could not send remote query \"%s\"",
|
||||||
ApplyLogRedaction(query)),
|
query),
|
||||||
errdetail("Client error: %s",
|
errdetail("Client error: %s",
|
||||||
ApplyLogRedaction(errorMessage))));
|
errorMessage)));
|
||||||
|
|
||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2807,42 +2807,6 @@ TextOutFunctionId(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* PgTableVisibleFuncId returns oid of the pg_table_is_visible function.
|
|
||||||
*/
|
|
||||||
Oid
|
|
||||||
PgTableVisibleFuncId(void)
|
|
||||||
{
|
|
||||||
if (MetadataCache.pgTableIsVisibleFuncId == InvalidOid)
|
|
||||||
{
|
|
||||||
const int argCount = 1;
|
|
||||||
|
|
||||||
MetadataCache.pgTableIsVisibleFuncId =
|
|
||||||
FunctionOid("pg_catalog", "pg_table_is_visible", argCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
return MetadataCache.pgTableIsVisibleFuncId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CitusTableVisibleFuncId returns oid of the citus_table_is_visible function.
|
|
||||||
*/
|
|
||||||
Oid
|
|
||||||
CitusTableVisibleFuncId(void)
|
|
||||||
{
|
|
||||||
if (MetadataCache.citusTableIsVisibleFuncId == InvalidOid)
|
|
||||||
{
|
|
||||||
const int argCount = 1;
|
|
||||||
|
|
||||||
MetadataCache.citusTableIsVisibleFuncId =
|
|
||||||
FunctionOid("pg_catalog", "citus_table_is_visible", argCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
return MetadataCache.citusTableIsVisibleFuncId;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RelationIsAKnownShardFuncId returns oid of the relation_is_a_known_shard function.
|
* RelationIsAKnownShardFuncId returns oid of the relation_is_a_known_shard function.
|
||||||
*/
|
*/
|
||||||
|
@ -4400,17 +4364,6 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ClusterHasReferenceTable returns true if the cluster has
|
|
||||||
* any reference table.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
ClusterHasReferenceTable(void)
|
|
||||||
{
|
|
||||||
return list_length(CitusTableTypeIdList(REFERENCE_TABLE)) > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
|
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
|
||||||
* any change happens on pg_dist_node table. It also set WorkerNodeHash to
|
* any change happens on pg_dist_node table. It also set WorkerNodeHash to
|
||||||
|
|
|
@ -2135,21 +2135,6 @@ EnsureTableOwner(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check that the current user has owner rights to the schema, error out if
|
|
||||||
* not. Superusers are regarded as owners.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
EnsureSchemaOwner(Oid schemaId)
|
|
||||||
{
|
|
||||||
if (!pg_namespace_ownercheck(schemaId, GetUserId()))
|
|
||||||
{
|
|
||||||
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
|
|
||||||
get_namespace_name(schemaId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check that the current user has owner rights to functionId, error out if
|
* Check that the current user has owner rights to functionId, error out if
|
||||||
* not. Superusers are regarded as owners. Functions and procedures are
|
* not. Superusers are regarded as owners. Functions and procedures are
|
||||||
|
|
|
@ -78,7 +78,7 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt))
|
if (!IsA(queryTreeNode, DeleteStmt) && !IsA(queryTreeNode, UpdateStmt))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("query \"%s\" is not a delete or update "
|
ereport(ERROR, (errmsg("query \"%s\" is not a delete or update "
|
||||||
"statement", ApplyLogRedaction(queryString))));
|
"statement", queryString)));
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(WARNING, (errmsg("master_modify_multiple_shards is deprecated and will be "
|
ereport(WARNING, (errmsg("master_modify_multiple_shards is deprecated and will be "
|
||||||
|
|
|
@ -137,7 +137,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
|
||||||
!isQueryObjectOrText
|
!isQueryObjectOrText
|
||||||
? "(null)"
|
? "(null)"
|
||||||
: ApplyLogRedaction(TaskQueryString(task)))));
|
: TaskQueryString(task))));
|
||||||
|
|
||||||
UpdateTaskQueryString(query, task);
|
UpdateTaskQueryString(query, task);
|
||||||
|
|
||||||
|
@ -148,7 +148,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
task->parametersInQueryStringResolved = workerJob->parametersInJobQueryResolved;
|
||||||
|
|
||||||
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
|
||||||
ApplyLogRedaction(TaskQueryString(task)))));
|
TaskQueryString(task))));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -927,7 +927,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery,
|
||||||
deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId,
|
deparse_shard_query(copiedQuery, distributedTableId, shardInterval->shardId,
|
||||||
queryString);
|
queryString);
|
||||||
ereport(DEBUG2, (errmsg("distributed statement: %s",
|
ereport(DEBUG2, (errmsg("distributed statement: %s",
|
||||||
ApplyLogRedaction(queryString->data))));
|
queryString->data)));
|
||||||
|
|
||||||
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
|
Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK,
|
||||||
queryString->data);
|
queryString->data);
|
||||||
|
|
|
@ -627,7 +627,7 @@ PrintJoinOrderList(List *joinOrder)
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(LOG, (errmsg("join order: %s",
|
ereport(LOG, (errmsg("join order: %s",
|
||||||
ApplyLogRedaction(printBuffer->data))));
|
printBuffer->data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2566,7 +2566,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
{
|
{
|
||||||
pg_get_query_def(taskQuery, queryString);
|
pg_get_query_def(taskQuery, queryString);
|
||||||
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
||||||
ApplyLogRedaction(queryString->data))));
|
queryString->data)));
|
||||||
SetTaskQueryString(subqueryTask, queryString->data);
|
SetTaskQueryString(subqueryTask, queryString->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2721,7 +2721,7 @@ SqlTaskList(Job *job)
|
||||||
/* log the query string we generated */
|
/* log the query string we generated */
|
||||||
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
|
ereport(DEBUG4, (errmsg("generated sql query for task %d", sqlTask->taskId),
|
||||||
errdetail("query string: \"%s\"",
|
errdetail("query string: \"%s\"",
|
||||||
ApplyLogRedaction(sqlQueryString->data))));
|
sqlQueryString->data)));
|
||||||
|
|
||||||
sqlTask->anchorShardId = INVALID_SHARD_ID;
|
sqlTask->anchorShardId = INVALID_SHARD_ID;
|
||||||
if (anchorRangeTableBasedAssignment)
|
if (anchorRangeTableBasedAssignment)
|
||||||
|
@ -3236,45 +3236,6 @@ BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SimpleOpExpression checks that given expression is a simple operator
|
|
||||||
* expression. A simple operator expression is a binary operator expression with
|
|
||||||
* operands of a var and a non-null constant.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
SimpleOpExpression(Expr *clause)
|
|
||||||
{
|
|
||||||
Const *constantClause = NULL;
|
|
||||||
|
|
||||||
Node *leftOperand;
|
|
||||||
Node *rightOperand;
|
|
||||||
if (!BinaryOpExpression(clause, &leftOperand, &rightOperand))
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsA(rightOperand, Const) && IsA(leftOperand, Var))
|
|
||||||
{
|
|
||||||
constantClause = (Const *) rightOperand;
|
|
||||||
}
|
|
||||||
else if (IsA(leftOperand, Const) && IsA(rightOperand, Var))
|
|
||||||
{
|
|
||||||
constantClause = (Const *) leftOperand;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (constantClause->constisnull)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MakeInt4Column creates a column of int4 type with invalid table id and max
|
* MakeInt4Column creates a column of int4 type with invalid table id and max
|
||||||
* attribute number.
|
* attribute number.
|
||||||
|
|
|
@ -242,7 +242,7 @@ GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery,
|
||||||
ereport(DEBUG1, (errmsg(
|
ereport(DEBUG1, (errmsg(
|
||||||
"Plan " UINT64_FORMAT
|
"Plan " UINT64_FORMAT
|
||||||
" query after replacing subqueries and CTEs: %s", planId,
|
" query after replacing subqueries and CTEs: %s", planId,
|
||||||
ApplyLogRedaction(subPlanString->data))));
|
subPlanString->data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
recursivePlanningDepth--;
|
recursivePlanningDepth--;
|
||||||
|
@ -763,7 +763,7 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
|
||||||
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
|
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
|
||||||
"_%u for CTE %s: %s", planId, subPlanId,
|
"_%u for CTE %s: %s", planId, subPlanId,
|
||||||
cteName,
|
cteName,
|
||||||
ApplyLogRedaction(subPlanString->data))));
|
subPlanString->data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* build a sub plan for the CTE */
|
/* build a sub plan for the CTE */
|
||||||
|
@ -1181,7 +1181,7 @@ RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningConte
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
|
ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT
|
||||||
"_%u for subquery %s", planId, subPlanId,
|
"_%u for subquery %s", planId, subPlanId,
|
||||||
ApplyLogRedaction(subqueryString->data))));
|
subqueryString->data)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* finally update the input subquery to point the result query */
|
/* finally update the input subquery to point the result query */
|
||||||
|
|
|
@ -657,7 +657,7 @@ LogDistributedDeadlockDebugMessage(const char *errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(LOG, (errmsg("[%s] %s", timestamptz_to_str(GetCurrentTimestamp()),
|
ereport(LOG, (errmsg("[%s] %s", timestamptz_to_str(GetCurrentTimestamp()),
|
||||||
ApplyLogRedaction(errorMessage))));
|
errorMessage)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -44,13 +44,7 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
|
||||||
const Oid *parameterTypes,
|
const Oid *parameterTypes,
|
||||||
const char *const *parameterValues);
|
const char *const *parameterValues);
|
||||||
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
|
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
|
||||||
static List * OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet,
|
|
||||||
const char *user);
|
|
||||||
static void GetConnectionsResults(List *connectionList, bool failOnError);
|
|
||||||
static void SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet,
|
|
||||||
const char *command, const char *user,
|
|
||||||
bool
|
|
||||||
failOnError);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendCommandToWorker sends a command to a particular worker as part of the
|
* SendCommandToWorker sends a command to a particular worker as part of the
|
||||||
|
@ -238,129 +232,6 @@ SendCommandToMetadataWorkersParams(const char *command,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SendCommandToWorkersOptionalInParallel sends the given command to workers in parallel.
|
|
||||||
* It does error if there is a problem while sending the query, but it doesn't error
|
|
||||||
* if there is a problem while executing the query.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const
|
|
||||||
char *command,
|
|
||||||
const char *user)
|
|
||||||
{
|
|
||||||
bool failOnError = false;
|
|
||||||
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
|
|
||||||
failOnError);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SendCommandToWorkersInParallel sends the given command to workers in parallel.
|
|
||||||
* It does error if there is a problem while sending the query, it errors if there
|
|
||||||
* was any problem when sending/receiving.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet, const
|
|
||||||
char *command,
|
|
||||||
const char *user)
|
|
||||||
{
|
|
||||||
bool failOnError = true;
|
|
||||||
SendCommandToWorkersOutsideTransaction(targetWorkerSet, command, user,
|
|
||||||
failOnError);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* SendCommandToWorkersOutsideTransaction sends the given command to workers in parallel.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
SendCommandToWorkersOutsideTransaction(TargetWorkerSet targetWorkerSet, const
|
|
||||||
char *command, const char *user, bool
|
|
||||||
failOnError)
|
|
||||||
{
|
|
||||||
List *connectionList = OpenConnectionsToWorkersInParallel(targetWorkerSet, user);
|
|
||||||
|
|
||||||
/* finish opening connections */
|
|
||||||
FinishConnectionListEstablishment(connectionList);
|
|
||||||
|
|
||||||
/* send commands in parallel */
|
|
||||||
MultiConnection *connection = NULL;
|
|
||||||
foreach_ptr(connection, connectionList)
|
|
||||||
{
|
|
||||||
int querySent = SendRemoteCommand(connection, command);
|
|
||||||
if (failOnError && querySent == 0)
|
|
||||||
{
|
|
||||||
ReportConnectionError(connection, ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
GetConnectionsResults(connectionList, failOnError);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* OpenConnectionsToWorkersInParallel opens connections to the given target worker set in parallel,
|
|
||||||
* as the given user.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
OpenConnectionsToWorkersInParallel(TargetWorkerSet targetWorkerSet, const char *user)
|
|
||||||
{
|
|
||||||
List *connectionList = NIL;
|
|
||||||
|
|
||||||
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock);
|
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
|
||||||
{
|
|
||||||
const char *nodeName = workerNode->workerName;
|
|
||||||
int nodePort = workerNode->workerPort;
|
|
||||||
int32 connectionFlags = OUTSIDE_TRANSACTION;
|
|
||||||
|
|
||||||
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
|
|
||||||
nodeName, nodePort,
|
|
||||||
user, NULL);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* connection can only be NULL for optional connections, which we don't
|
|
||||||
* support in this codepath.
|
|
||||||
*/
|
|
||||||
Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
|
|
||||||
Assert(connection != NULL);
|
|
||||||
connectionList = lappend(connectionList, connection);
|
|
||||||
}
|
|
||||||
return connectionList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GetConnectionsResults gets remote command results
|
|
||||||
* for the given connections. It raises any error if failOnError is true.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
GetConnectionsResults(List *connectionList, bool failOnError)
|
|
||||||
{
|
|
||||||
MultiConnection *connection = NULL;
|
|
||||||
foreach_ptr(connection, connectionList)
|
|
||||||
{
|
|
||||||
bool raiseInterrupt = false;
|
|
||||||
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupt);
|
|
||||||
|
|
||||||
bool isResponseOK = result != NULL && IsResponseOK(result);
|
|
||||||
if (failOnError && !isResponseOK)
|
|
||||||
{
|
|
||||||
ReportResultError(connection, result, ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
|
|
||||||
if (isResponseOK)
|
|
||||||
{
|
|
||||||
ForgetResults(connection);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
|
* SendCommandToWorkersParamsInternal sends a command to all workers in parallel.
|
||||||
* Commands are committed on the workers when the local transaction commits. The
|
* Commands are committed on the workers when the local transaction commits. The
|
||||||
|
|
|
@ -22,35 +22,6 @@
|
||||||
static bool FileIsLink(const char *filename, struct stat filestat);
|
static bool FileIsLink(const char *filename, struct stat filestat);
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CacheDirectoryElement takes in a filename, and checks if this name lives in
|
|
||||||
* the directory path that is used for job, task, table etc. files.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
CacheDirectoryElement(const char *filename)
|
|
||||||
{
|
|
||||||
bool directoryElement = false;
|
|
||||||
|
|
||||||
StringInfo directoryPath = makeStringInfo();
|
|
||||||
appendStringInfo(directoryPath, "base/%s/", PG_JOB_CACHE_DIR);
|
|
||||||
|
|
||||||
char *directoryPathFound = strstr(filename, directoryPath->data);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If directoryPath occurs at the beginning of the filename, then the
|
|
||||||
* pointers should now be equal.
|
|
||||||
*/
|
|
||||||
if (directoryPathFound == filename)
|
|
||||||
{
|
|
||||||
directoryElement = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
pfree(directoryPath);
|
|
||||||
|
|
||||||
return directoryElement;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCreateDirectory creates a new directory with the given directory name.
|
* CitusCreateDirectory creates a new directory with the given directory name.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -39,14 +39,3 @@ IsLoggableLevel(int logLevel)
|
||||||
{
|
{
|
||||||
return log_min_messages <= logLevel || client_min_messages <= logLevel;
|
return log_min_messages <= logLevel || client_min_messages <= logLevel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* HashLogMessage is deprecated and doesn't do anything anymore. Its indirect
|
|
||||||
* usage will be removed later.
|
|
||||||
*/
|
|
||||||
char *
|
|
||||||
HashLogMessage(const char *logText)
|
|
||||||
{
|
|
||||||
return (char *) logText;
|
|
||||||
}
|
|
||||||
|
|
|
@ -431,7 +431,7 @@ ParseTreeRawStmt(const char *ddlCommand)
|
||||||
/* log immediately if dictated by log statement */
|
/* log immediately if dictated by log statement */
|
||||||
if (check_log_statement(parseTreeList))
|
if (check_log_statement(parseTreeList))
|
||||||
{
|
{
|
||||||
ereport(LOG, (errmsg("statement: %s", ApplyLogRedaction(ddlCommand)),
|
ereport(LOG, (errmsg("statement: %s", ddlCommand),
|
||||||
errhidestmt(true)));
|
errhidestmt(true)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -264,7 +264,6 @@ extern List * GetForeignConstraintFromDistributedTablesCommands(Oid relationId);
|
||||||
extern List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
|
extern List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
|
||||||
extern bool AnyForeignKeyDependsOnIndex(Oid indexId);
|
extern bool AnyForeignKeyDependsOnIndex(Oid indexId);
|
||||||
extern bool HasForeignKeyWithLocalTable(Oid relationId);
|
extern bool HasForeignKeyWithLocalTable(Oid relationId);
|
||||||
extern bool HasForeignKeyToCitusLocalTable(Oid relationId);
|
|
||||||
extern bool HasForeignKeyToReferenceTable(Oid relationOid);
|
extern bool HasForeignKeyToReferenceTable(Oid relationOid);
|
||||||
extern List * GetForeignKeysFromLocalTables(Oid relationId);
|
extern List * GetForeignKeysFromLocalTables(Oid relationId);
|
||||||
extern bool TableReferenced(Oid relationOid);
|
extern bool TableReferenced(Oid relationOid);
|
||||||
|
|
|
@ -104,7 +104,4 @@ extern void ResetConstraintDropped(void);
|
||||||
extern void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
extern void ExecuteDistributedDDLJob(DDLJob *ddlJob);
|
||||||
extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options);
|
extern void ColumnarTableSetOptionsHook(Oid relationId, ColumnarOptions options);
|
||||||
|
|
||||||
/* forward declarations for sending custom commands to a distributed table */
|
|
||||||
extern DDLJob * CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command);
|
|
||||||
|
|
||||||
#endif /* MULTI_UTILITY_H */
|
#endif /* MULTI_UTILITY_H */
|
||||||
|
|
|
@ -19,10 +19,6 @@
|
||||||
extern bool EnableUnsupportedFeatureMessages;
|
extern bool EnableUnsupportedFeatureMessages;
|
||||||
|
|
||||||
extern bool IsLoggableLevel(int logLevel);
|
extern bool IsLoggableLevel(int logLevel);
|
||||||
extern char * HashLogMessage(const char *text);
|
|
||||||
|
|
||||||
#define ApplyLogRedaction(text) \
|
|
||||||
(log_min_messages <= ereport_loglevel ? HashLogMessage(text) : text)
|
|
||||||
|
|
||||||
#undef ereport
|
#undef ereport
|
||||||
|
|
||||||
|
|
|
@ -180,7 +180,6 @@ extern void FlushDistTableCache(void);
|
||||||
extern void InvalidateMetadataSystemCache(void);
|
extern void InvalidateMetadataSystemCache(void);
|
||||||
extern List * CitusTableTypeIdList(CitusTableType citusTableType);
|
extern List * CitusTableTypeIdList(CitusTableType citusTableType);
|
||||||
extern Datum DistNodeMetadata(void);
|
extern Datum DistNodeMetadata(void);
|
||||||
extern bool ClusterHasReferenceTable(void);
|
|
||||||
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||||
int shardIntervalArrayLength);
|
int shardIntervalArrayLength);
|
||||||
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||||
|
@ -258,8 +257,6 @@ extern Oid CitusExtraDataContainerFuncId(void);
|
||||||
extern Oid CitusAnyValueFunctionId(void);
|
extern Oid CitusAnyValueFunctionId(void);
|
||||||
extern Oid CitusTextSendAsJsonbFunctionId(void);
|
extern Oid CitusTextSendAsJsonbFunctionId(void);
|
||||||
extern Oid TextOutFunctionId(void);
|
extern Oid TextOutFunctionId(void);
|
||||||
extern Oid PgTableVisibleFuncId(void);
|
|
||||||
extern Oid CitusTableVisibleFuncId(void);
|
|
||||||
extern Oid RelationIsAKnownShardFuncId(void);
|
extern Oid RelationIsAKnownShardFuncId(void);
|
||||||
extern Oid JsonbExtractPathFuncId(void);
|
extern Oid JsonbExtractPathFuncId(void);
|
||||||
extern Oid JsonbExtractPathTextFuncId(void);
|
extern Oid JsonbExtractPathTextFuncId(void);
|
||||||
|
|
|
@ -276,7 +276,6 @@ extern Oid TableOwnerOid(Oid relationId);
|
||||||
extern char * TableOwner(Oid relationId);
|
extern char * TableOwner(Oid relationId);
|
||||||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||||
extern void EnsureTableOwner(Oid relationId);
|
extern void EnsureTableOwner(Oid relationId);
|
||||||
extern void EnsureSchemaOwner(Oid schemaId);
|
|
||||||
extern void EnsureHashDistributedTable(Oid relationId);
|
extern void EnsureHashDistributedTable(Oid relationId);
|
||||||
extern void EnsureFunctionOwner(Oid functionId);
|
extern void EnsureFunctionOwner(Oid functionId);
|
||||||
extern void EnsureSuperUser(void);
|
extern void EnsureSuperUser(void);
|
||||||
|
|
|
@ -551,7 +551,6 @@ extern CollateExpr * RelabelTypeToCollateExpr(RelabelType *relabelType);
|
||||||
extern Node * BuildBaseConstraint(Var *column);
|
extern Node * BuildBaseConstraint(Var *column);
|
||||||
extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
|
extern void UpdateConstraint(Node *baseConstraint, ShardInterval *shardInterval);
|
||||||
extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand);
|
extern bool BinaryOpExpression(Expr *clause, Node **leftOperand, Node **rightOperand);
|
||||||
extern bool SimpleOpExpression(Expr *clause);
|
|
||||||
|
|
||||||
/* helper functions */
|
/* helper functions */
|
||||||
extern Var * MakeInt4Column(void);
|
extern Var * MakeInt4Column(void);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
#define PG_JOB_CACHE_DIR "pgsql_job_cache"
|
#define PG_JOB_CACHE_DIR "pgsql_job_cache"
|
||||||
|
|
||||||
|
|
||||||
extern bool CacheDirectoryElement(const char *filename);
|
|
||||||
extern void CleanupJobCacheDirectory(void);
|
extern void CleanupJobCacheDirectory(void);
|
||||||
extern void CitusCreateDirectory(StringInfo directoryName);
|
extern void CitusCreateDirectory(StringInfo directoryName);
|
||||||
extern void CitusRemoveDirectory(const char *filename);
|
extern void CitusRemoveDirectory(const char *filename);
|
||||||
|
|
|
@ -79,11 +79,6 @@ extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
|
||||||
const char *
|
const char *
|
||||||
nodeUser,
|
nodeUser,
|
||||||
List *commandList);
|
List *commandList);
|
||||||
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,
|
|
||||||
const char *command,
|
|
||||||
const char *user);
|
|
||||||
void SendCommandToWorkersInParallel(TargetWorkerSet targetWorkerSet,
|
|
||||||
const char *command, const char *user);
|
|
||||||
extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
|
extern void RemoveWorkerTransaction(const char *nodeName, int32 nodePort);
|
||||||
|
|
||||||
/* helper functions for worker transactions */
|
/* helper functions for worker transactions */
|
||||||
|
|
Loading…
Reference in New Issue