mirror of https://github.com/citusdata/citus.git
Compare commits
34 Commits
Author | SHA1 | Date |
---|---|---|
|
6001f753ed | |
|
bf5543f92d | |
|
67c4dac89b | |
|
0ed33515a1 | |
|
cab7185f23 | |
|
ea4e22473b | |
|
e4579f02c8 | |
|
81687585d2 | |
|
14799ef2a0 | |
|
96eca92fc7 | |
|
44eacf14fc | |
|
124b0f9020 | |
|
0849da2708 | |
|
dd6a456bed | |
|
0f01dc3bb8 | |
|
ab509db0d8 | |
|
e735655d82 | |
|
1678deeecd | |
|
43f9758787 | |
|
5bd4583935 | |
|
b839af761f | |
|
16798e38f9 | |
|
fd05849eff | |
|
aa2d2f46f4 | |
|
b0aa0479cf | |
|
0c9901f042 | |
|
52b056c301 | |
|
5b300a7aa9 | |
|
0530974d3d | |
|
c1ca6e6819 | |
|
c59481141f | |
|
cf4592ce28 | |
|
062734a6f5 | |
|
ac5fcbe998 |
86
CHANGELOG.md
86
CHANGELOG.md
|
@ -1,3 +1,89 @@
|
|||
### citus v6.2.5 (January 11, 2018) ###
|
||||
|
||||
* Fixes a bug that could crash the coordinator while reporting a remote error
|
||||
|
||||
### citus v6.2.4 (September 28, 2017) ###
|
||||
|
||||
* Updates task-tracker to limit file access
|
||||
|
||||
### citus v6.2.3 (July 13, 2017) ###
|
||||
|
||||
* Fixes a crash during execution of local CREATE INDEX CONCURRENTLY
|
||||
|
||||
* Fixes a bug preventing usage of quoted column names in COPY
|
||||
|
||||
* Fixes a bug in prepared INSERTs with implicit cast in partition column
|
||||
|
||||
* Relaxes locks in VACUUM to ensure concurrent execution with INSERT
|
||||
|
||||
### citus v6.2.2 (May 31, 2017) ###
|
||||
|
||||
* Fixes a common cause of deadlocks when repairing tables with foreign keys
|
||||
|
||||
### citus v6.2.1 (May 24, 2017) ###
|
||||
|
||||
* Relaxes version-check logic to avoid breaking non-distributed commands
|
||||
|
||||
### citus v6.2.0 (May 16, 2017) ###
|
||||
|
||||
* Increases SQL subquery coverage by pushing down more kinds of queries
|
||||
|
||||
* Adds CustomScan API support to allow read-only transactions
|
||||
|
||||
* Adds support for `CREATE/DROP INDEX CONCURRENTLY`
|
||||
|
||||
* Adds support for `ALTER TABLE ... ADD CONSTRAINT`
|
||||
|
||||
* Adds support for `ALTER TABLE ... RENAME COLUMN`
|
||||
|
||||
* Adds support for `DISABLE/ENABLE TRIGGER ALL`
|
||||
|
||||
* Adds support for expressions in the partition column in INSERTs
|
||||
|
||||
* Adds support for query parameters in combination with function evaluation
|
||||
|
||||
* Adds support for creating distributed tables from non-empty local tables
|
||||
|
||||
* Adds UDFs to get size of distributed tables
|
||||
|
||||
* Adds UDFs to add a new node without replicating reference tables
|
||||
|
||||
* Adds checks to prevent running Citus binaries with wrong metadata tables
|
||||
|
||||
* Improves shard pruning performance for range queries
|
||||
|
||||
* Improves planner performance for joins involving co-located tables
|
||||
|
||||
* Improves shard copy performance by creating indexes after copy
|
||||
|
||||
* Improves task-tracker performance by batching several status checks
|
||||
|
||||
* Enables router planner for queries on range partitioned table
|
||||
|
||||
* Changes `TRUNCATE` to drop local data only if `enable_ddl_propagation` is off
|
||||
|
||||
* Starts to execute DDL on coordinator before workers
|
||||
|
||||
* Fixes a bug causing incorrectly reading invalidated cache
|
||||
|
||||
* Fixes a bug related to creation of schemas of in workers with incorrect owner
|
||||
|
||||
* Fixes a bug related to concurrent run of shard drop functions
|
||||
|
||||
* Fixes a bug related to `EXPLAIN ANALYZE` with DML queries
|
||||
|
||||
* Fixes a bug related to SQL functions in FROM clause
|
||||
|
||||
* Adds a GUC variable to report cross shard queries
|
||||
|
||||
* Fixes a bug related to partition columns without native hash function
|
||||
|
||||
* Adds internal infrastructures and tests to improve development process
|
||||
|
||||
* Addresses various race conditions and deadlocks
|
||||
|
||||
* Improves and standardizes error messages
|
||||
|
||||
### citus v6.1.1 (May 5, 2017) ###
|
||||
|
||||
* Fixes a crash caused by router executor use after connection timeouts
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#! /bin/sh
|
||||
# Guess values for system-dependent variables and create Makefiles.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 6.2devel.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 6.2.5.
|
||||
#
|
||||
#
|
||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
|||
# Identity of this package.
|
||||
PACKAGE_NAME='Citus'
|
||||
PACKAGE_TARNAME='citus'
|
||||
PACKAGE_VERSION='6.2devel'
|
||||
PACKAGE_STRING='Citus 6.2devel'
|
||||
PACKAGE_VERSION='6.2.5'
|
||||
PACKAGE_STRING='Citus 6.2.5'
|
||||
PACKAGE_BUGREPORT=''
|
||||
PACKAGE_URL=''
|
||||
|
||||
|
@ -1195,7 +1195,7 @@ if test "$ac_init_help" = "long"; then
|
|||
# Omit some internal or obsolete options to make the list less imposing.
|
||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||
cat <<_ACEOF
|
||||
\`configure' configures Citus 6.2devel to adapt to many kinds of systems.
|
||||
\`configure' configures Citus 6.2.5 to adapt to many kinds of systems.
|
||||
|
||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||
|
||||
|
@ -1256,7 +1256,7 @@ fi
|
|||
|
||||
if test -n "$ac_init_help"; then
|
||||
case $ac_init_help in
|
||||
short | recursive ) echo "Configuration of Citus 6.2devel:";;
|
||||
short | recursive ) echo "Configuration of Citus 6.2.5:";;
|
||||
esac
|
||||
cat <<\_ACEOF
|
||||
|
||||
|
@ -1344,7 +1344,7 @@ fi
|
|||
test -n "$ac_init_help" && exit $ac_status
|
||||
if $ac_init_version; then
|
||||
cat <<\_ACEOF
|
||||
Citus configure 6.2devel
|
||||
Citus configure 6.2.5
|
||||
generated by GNU Autoconf 2.69
|
||||
|
||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
@ -1401,7 +1401,7 @@ cat >config.log <<_ACEOF
|
|||
This file contains any messages produced by compilers while
|
||||
running configure, to aid debugging if configure makes a mistake.
|
||||
|
||||
It was created by Citus $as_me 6.2devel, which was
|
||||
It was created by Citus $as_me 6.2.5, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
$ $0 $@
|
||||
|
@ -3561,7 +3561,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
|||
# report actual input values of CONFIG_FILES etc. instead of their
|
||||
# values after options handling.
|
||||
ac_log="
|
||||
This file was extended by Citus $as_me 6.2devel, which was
|
||||
This file was extended by Citus $as_me 6.2.5, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
CONFIG_FILES = $CONFIG_FILES
|
||||
|
@ -3623,7 +3623,7 @@ _ACEOF
|
|||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||
ac_cs_version="\\
|
||||
Citus config.status 6.2devel
|
||||
Citus config.status 6.2.5
|
||||
configured by $0, generated by GNU Autoconf 2.69,
|
||||
with options \\"\$ac_cs_config\\"
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
# everyone needing autoconf installed, the resulting files are checked
|
||||
# into the SCM.
|
||||
|
||||
AC_INIT([Citus], [6.2devel])
|
||||
AC_INIT([Citus], [6.2.5])
|
||||
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
|
||||
|
||||
# we'll need sed and awk for some of the version commands
|
||||
|
|
|
@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
|||
bool requireEmpty = true;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
|
||||
{
|
||||
|
@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
|||
char *colocateWithTableName = NULL;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* guard against a binary update without a function update */
|
||||
if (PG_NARGS() >= 4)
|
||||
|
@ -233,13 +235,17 @@ static void
|
|||
CreateReferenceTable(Oid relationId)
|
||||
{
|
||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||
List *workerNodeList = ActiveWorkerNodeList();
|
||||
int replicationFactor = list_length(workerNodeList);
|
||||
List *workerNodeList = NIL;
|
||||
int replicationFactor = 0;
|
||||
char *distributionColumnName = NULL;
|
||||
bool requireEmpty = true;
|
||||
char relationKind = 0;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
workerNodeList = ActiveWorkerNodeList();
|
||||
replicationFactor = list_length(workerNodeList);
|
||||
|
||||
/* if there are no workers, error out */
|
||||
if (replicationFactor == 0)
|
||||
|
@ -749,6 +755,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
List *columnNameList = NIL;
|
||||
Relation distributedRelation = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Var *partitionColumn = NULL;
|
||||
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
||||
bool stopOnFailure = true;
|
||||
|
||||
EState *estate = NULL;
|
||||
|
@ -778,6 +786,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
slot = MakeSingleTupleTableSlot(tupleDescriptor);
|
||||
columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
||||
|
||||
/* determine the partition column in the tuple descriptor */
|
||||
partitionColumn = PartitionColumn(distributedRelationId, 0);
|
||||
if (partitionColumn != NULL)
|
||||
{
|
||||
partitionColumnIndex = partitionColumn->varattno - 1;
|
||||
}
|
||||
|
||||
/* initialise per-tuple memory context */
|
||||
estate = CreateExecutorState();
|
||||
econtext = GetPerTupleExprContext(estate);
|
||||
|
@ -785,8 +800,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
|||
|
||||
copyDest =
|
||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
||||
columnNameList, estate,
|
||||
stopOnFailure);
|
||||
columnNameList,
|
||||
partitionColumnIndex,
|
||||
estate, stopOnFailure);
|
||||
|
||||
/* initialise state for writing to shards, we'll open connections on demand */
|
||||
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
||||
|
|
|
@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS)
|
|||
char *tableName = text_to_cstring(tableNameText);
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName);
|
||||
|
||||
|
|
|
@ -299,6 +299,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
bool *columnNulls = NULL;
|
||||
int columnIndex = 0;
|
||||
List *columnNameList = NIL;
|
||||
Var *partitionColumn = NULL;
|
||||
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
||||
TupleTableSlot *tupleTableSlot = NULL;
|
||||
|
||||
EState *executorState = NULL;
|
||||
|
@ -326,6 +328,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
tupleTableSlot->tts_values = columnValues;
|
||||
tupleTableSlot->tts_isnull = columnNulls;
|
||||
|
||||
/* determine the partition column index in the tuple descriptor */
|
||||
partitionColumn = PartitionColumn(tableId, 0);
|
||||
if (partitionColumn != NULL)
|
||||
{
|
||||
partitionColumnIndex = partitionColumn->varattno - 1;
|
||||
}
|
||||
|
||||
/* build the list of column names for remote COPY statements */
|
||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||
{
|
||||
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
|
||||
|
@ -350,8 +360,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
|||
}
|
||||
|
||||
/* set up the destination for the COPY */
|
||||
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState,
|
||||
stopOnFailure);
|
||||
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
|
||||
executorState, stopOnFailure);
|
||||
dest = (DestReceiver *) copyDest;
|
||||
dest->rStartup(dest, 0, tupleDescriptor);
|
||||
|
||||
|
@ -1133,17 +1143,8 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
|
|||
}
|
||||
else
|
||||
{
|
||||
/* probably a connection problem, get the message from the connection */
|
||||
char *lastNewlineIndex = NULL;
|
||||
|
||||
remoteMessage = PQerrorMessage(connection->pgConn);
|
||||
lastNewlineIndex = strrchr(remoteMessage, '\n');
|
||||
|
||||
/* trim trailing newline, if any */
|
||||
if (lastNewlineIndex != NULL)
|
||||
{
|
||||
*lastNewlineIndex = '\0';
|
||||
}
|
||||
/* trim the trailing characters */
|
||||
remoteMessage = pchomp(PQerrorMessage(connection->pgConn));
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||
errmsg("failed to complete COPY on %s:%d", connection->hostname,
|
||||
|
@ -1638,10 +1639,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
|||
/*
|
||||
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into
|
||||
* a distributed table.
|
||||
*
|
||||
* The caller should provide the list of column names to use in the
|
||||
* remote COPY statement, and the partition column index in the tuple
|
||||
* descriptor (*not* the column name list).
|
||||
*/
|
||||
CitusCopyDestReceiver *
|
||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState,
|
||||
bool stopOnFailure)
|
||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
||||
EState *executorState, bool stopOnFailure)
|
||||
{
|
||||
CitusCopyDestReceiver *copyDest = NULL;
|
||||
|
||||
|
@ -1657,6 +1662,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS
|
|||
/* set up output parameters */
|
||||
copyDest->distributedRelationId = tableId;
|
||||
copyDest->columnNameList = columnNameList;
|
||||
copyDest->partitionColumnIndex = partitionColumnIndex;
|
||||
copyDest->executorState = executorState;
|
||||
copyDest->stopOnFailure = stopOnFailure;
|
||||
copyDest->memoryContext = CurrentMemoryContext;
|
||||
|
@ -1682,14 +1688,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
char *schemaName = get_namespace_name(schemaOid);
|
||||
|
||||
Relation distributedRelation = NULL;
|
||||
int columnIndex = 0;
|
||||
List *columnNameList = copyDest->columnNameList;
|
||||
List *quotedColumnNameList = NIL;
|
||||
|
||||
ListCell *columnNameCell = NULL;
|
||||
|
||||
char partitionMethod = '\0';
|
||||
Var *partitionColumn = PartitionColumn(tableId, 0);
|
||||
int partitionColumnIndex = -1;
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
|
||||
CopyStmt *copyStatement = NULL;
|
||||
|
@ -1773,39 +1777,28 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
copyDest->columnOutputFunctions =
|
||||
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
|
||||
|
||||
/* find the partition column index in the column list */
|
||||
/* ensure the column names are properly quoted in the COPY statement */
|
||||
foreach(columnNameCell, columnNameList)
|
||||
{
|
||||
char *columnName = (char *) lfirst(columnNameCell);
|
||||
char *quotedColumnName = (char *) quote_identifier(columnName);
|
||||
|
||||
/* load the column information from pg_attribute */
|
||||
AttrNumber attrNumber = get_attnum(tableId, columnName);
|
||||
|
||||
/* check whether this is the partition column */
|
||||
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
||||
{
|
||||
Assert(partitionColumnIndex == -1);
|
||||
|
||||
partitionColumnIndex = columnIndex;
|
||||
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
|
||||
}
|
||||
|
||||
columnIndex++;
|
||||
}
|
||||
|
||||
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1)
|
||||
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||
errmsg("the partition column of table %s should have a value",
|
||||
quote_qualified_identifier(schemaName, relationName))));
|
||||
}
|
||||
|
||||
copyDest->partitionColumnIndex = partitionColumnIndex;
|
||||
|
||||
/* define the template for the COPY statement that is sent to workers */
|
||||
copyStatement = makeNode(CopyStmt);
|
||||
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
|
||||
copyStatement->query = NULL;
|
||||
copyStatement->attlist = columnNameList;
|
||||
copyStatement->attlist = quotedColumnNameList;
|
||||
copyStatement->is_from = true;
|
||||
copyStatement->is_program = false;
|
||||
copyStatement->filename = NULL;
|
||||
|
@ -1866,7 +1859,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
|||
* tables. Note that, reference tables has NULL partition column values so
|
||||
* skip the check.
|
||||
*/
|
||||
if (partitionColumnIndex >= 0)
|
||||
if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
|
||||
{
|
||||
if (columnNulls[partitionColumnIndex])
|
||||
{
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "lib/stringinfo.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||
|
@ -116,7 +117,7 @@ ReportConnectionError(MultiConnection *connection, int elevel)
|
|||
int nodePort = connection->port;
|
||||
|
||||
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort),
|
||||
errdetail("%s", PQerrorMessage(connection->pgConn))));
|
||||
errdetail("%s", pchomp(PQerrorMessage(connection->pgConn)))));
|
||||
}
|
||||
|
||||
|
||||
|
@ -154,16 +155,7 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
|
|||
*/
|
||||
if (messagePrimary == NULL)
|
||||
{
|
||||
char *lastNewlineIndex = NULL;
|
||||
|
||||
messagePrimary = PQerrorMessage(connection->pgConn);
|
||||
lastNewlineIndex = strrchr(messagePrimary, '\n');
|
||||
|
||||
/* trim trailing newline, if any */
|
||||
if (lastNewlineIndex != NULL)
|
||||
{
|
||||
*lastNewlineIndex = '\0';
|
||||
}
|
||||
messagePrimary = pchomp(PQerrorMessage(connection->pgConn));
|
||||
}
|
||||
|
||||
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
|
||||
|
@ -182,6 +174,28 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
|
|||
}
|
||||
|
||||
|
||||
/* *INDENT-OFF* */
|
||||
#if (PG_VERSION_NUM < 100000)
|
||||
|
||||
/*
|
||||
* Make copy of string with all trailing newline characters removed.
|
||||
*/
|
||||
char *
|
||||
pchomp(const char *in)
|
||||
{
|
||||
size_t n;
|
||||
|
||||
n = strlen(in);
|
||||
while (n > 0 && in[n - 1] == '\n')
|
||||
n--;
|
||||
return pnstrdup(in, n);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* *INDENT-ON* */
|
||||
|
||||
|
||||
/*
|
||||
* LogRemoteCommand logs commands send to remote nodes if
|
||||
* citus.log_remote_commands wants us to do so.
|
||||
|
|
|
@ -133,7 +133,8 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
|
|||
* error and returns INVALID_CONNECTION_ID.
|
||||
*/
|
||||
int32
|
||||
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
|
||||
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
|
||||
const char *userName)
|
||||
{
|
||||
MultiConnection *connection = NULL;
|
||||
ConnStatusType connStatusType = CONNECTION_OK;
|
||||
|
@ -154,7 +155,8 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
|
|||
}
|
||||
|
||||
/* prepare asynchronous request for worker node connection */
|
||||
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
|
||||
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
|
||||
userName, nodeDatabase);
|
||||
connStatusType = PQstatus(connection->pgConn);
|
||||
|
||||
/*
|
||||
|
@ -311,7 +313,7 @@ MultiClientSendQuery(int32 connectionId, const char *query)
|
|||
querySent = PQsendQuery(connection->pgConn, query);
|
||||
if (querySent == 0)
|
||||
{
|
||||
char *errorMessage = PQerrorMessage(connection->pgConn);
|
||||
char *errorMessage = pchomp(PQerrorMessage(connection->pgConn));
|
||||
ereport(WARNING, (errmsg("could not send remote query \"%s\"", query),
|
||||
errdetail("Client error: %s", errorMessage)));
|
||||
|
||||
|
|
|
@ -272,7 +272,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
/* we use the same database name on the master and worker nodes */
|
||||
nodeDatabase = get_database_name(MyDatabaseId);
|
||||
|
||||
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase);
|
||||
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
|
||||
NULL);
|
||||
connectionIdArray[currentIndex] = connectionId;
|
||||
|
||||
/* if valid, poll the connection until the connection is initiated */
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "commands/dbcommands.h"
|
||||
#include "distributed/citus_nodes.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
@ -70,7 +71,8 @@ static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId,
|
|||
uint32 taskId);
|
||||
static bool TopLevelTask(Task *task);
|
||||
static bool TransmitExecutionCompleted(TaskExecution *taskExecution);
|
||||
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList);
|
||||
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList,
|
||||
char *userName);
|
||||
static HTAB * TrackerHashCreate(const char *taskTrackerHashName,
|
||||
uint32 taskTrackerHashSize);
|
||||
static TaskTracker * TrackerHashEnter(HTAB *taskTrackerHash, char *nodeName,
|
||||
|
@ -159,6 +161,7 @@ MultiTaskTrackerExecute(Job *job)
|
|||
List *workerNodeList = NIL;
|
||||
HTAB *taskTrackerHash = NULL;
|
||||
HTAB *transmitTrackerHash = NULL;
|
||||
char *extensionOwner = CitusExtensionOwnerName();
|
||||
const char *taskTrackerHashName = "Task Tracker Hash";
|
||||
const char *transmitTrackerHashName = "Transmit Tracker Hash";
|
||||
List *jobIdList = NIL;
|
||||
|
@ -193,8 +196,12 @@ MultiTaskTrackerExecute(Job *job)
|
|||
workerNodeList = ActiveWorkerNodeList();
|
||||
taskTrackerCount = (uint32) list_length(workerNodeList);
|
||||
|
||||
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
|
||||
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList);
|
||||
/* connect as the current user for running queries */
|
||||
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList, NULL);
|
||||
|
||||
/* connect as the superuser for fetching result files */
|
||||
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList,
|
||||
extensionOwner);
|
||||
|
||||
TrackerHashConnect(taskTrackerHash);
|
||||
TrackerHashConnect(transmitTrackerHash);
|
||||
|
@ -658,10 +665,11 @@ TransmitExecutionCompleted(TaskExecution *taskExecution)
|
|||
/*
|
||||
* TrackerHash creates a task tracker hash with the given name. The function
|
||||
* then inserts one task tracker entry for each node in the given worker node
|
||||
* list, and initializes state for each task tracker.
|
||||
* list, and initializes state for each task tracker. The userName argument
|
||||
* indicates which user to connect as.
|
||||
*/
|
||||
static HTAB *
|
||||
TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
|
||||
TrackerHash(const char *taskTrackerHashName, List *workerNodeList, char *userName)
|
||||
{
|
||||
/* create task tracker hash */
|
||||
uint32 taskTrackerHashSize = list_length(workerNodeList);
|
||||
|
@ -703,6 +711,7 @@ TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
|
|||
}
|
||||
|
||||
taskTracker->taskStateHash = taskStateHash;
|
||||
taskTracker->userName = userName;
|
||||
}
|
||||
|
||||
return taskTrackerHash;
|
||||
|
@ -836,9 +845,10 @@ TrackerConnectPoll(TaskTracker *taskTracker)
|
|||
char *nodeName = taskTracker->workerName;
|
||||
uint32 nodePort = taskTracker->workerPort;
|
||||
char *nodeDatabase = get_database_name(MyDatabaseId);
|
||||
char *nodeUser = taskTracker->userName;
|
||||
|
||||
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
|
||||
nodeDatabase);
|
||||
nodeDatabase, nodeUser);
|
||||
if (connectionId != INVALID_CONNECTION_ID)
|
||||
{
|
||||
taskTracker->connectionId = connectionId;
|
||||
|
|
|
@ -99,7 +99,6 @@ struct DropRelationCallbackState
|
|||
|
||||
|
||||
/* Local functions forward declarations for deciding when to perform processing/checks */
|
||||
static bool SkipCitusProcessingForUtility(Node *parsetree);
|
||||
static bool IsCitusExtensionStmt(Node *parsetree);
|
||||
|
||||
/* Local functions forward declarations for Transmit statement */
|
||||
|
@ -185,22 +184,28 @@ multi_ProcessUtility(Node *parsetree,
|
|||
Oid savedUserId = InvalidOid;
|
||||
int savedSecurityContext = 0;
|
||||
List *ddlJobs = NIL;
|
||||
bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree);
|
||||
bool checkExtensionVersion = false;
|
||||
|
||||
if (skipCitusProcessing)
|
||||
if (IsA(parsetree, TransactionStmt))
|
||||
{
|
||||
bool checkExtensionVersion = IsCitusExtensionStmt(parsetree);
|
||||
|
||||
/*
|
||||
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
|
||||
* transactions in which case a lot of checks cannot be done safely in
|
||||
* that state. Since we never need to intercept transaction statements,
|
||||
* skip our checks and immediately fall into standard_ProcessUtility.
|
||||
*/
|
||||
standard_ProcessUtility(parsetree, queryString, context,
|
||||
params, dest, completionTag);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
|
||||
if (EnableVersionChecks && checkExtensionVersion)
|
||||
{
|
||||
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
|
@ -390,7 +395,11 @@ multi_ProcessUtility(Node *parsetree,
|
|||
standard_ProcessUtility(parsetree, queryString, context,
|
||||
params, dest, completionTag);
|
||||
|
||||
/* don't run post-process code for local commands */
|
||||
if (ddlJobs != NIL)
|
||||
{
|
||||
PostProcessUtility(parsetree);
|
||||
}
|
||||
|
||||
if (commandMustRunAsOwner)
|
||||
{
|
||||
|
@ -447,63 +456,6 @@ multi_ProcessUtility(Node *parsetree,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SkipCitusProcessingForUtility simply returns whether a given utility should
|
||||
* bypass Citus processing and checks and be handled exclusively by standard
|
||||
* PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION,
|
||||
* ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus.
|
||||
*/
|
||||
static bool
|
||||
SkipCitusProcessingForUtility(Node *parsetree)
|
||||
{
|
||||
switch (parsetree->type)
|
||||
{
|
||||
/*
|
||||
* In the CitusHasBeenLoaded check, we compare versions of loaded code,
|
||||
* the installed extension, and available extension. If they differ, we
|
||||
* force user to execute ALTER EXTENSION citus UPDATE. To allow this,
|
||||
* CREATE/DROP/ALTER extension must be omitted from Citus processing.
|
||||
*/
|
||||
case T_DropStmt:
|
||||
{
|
||||
DropStmt *dropStatement = (DropStmt *) parsetree;
|
||||
|
||||
if (dropStatement->removeType != OBJECT_EXTENSION)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/* no break, fall through */
|
||||
|
||||
case T_CreateExtensionStmt:
|
||||
case T_AlterExtensionStmt:
|
||||
|
||||
/*
|
||||
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
|
||||
* transactions in which case a lot of checks cannot be done safely in
|
||||
* that state. Since we never need to intercept transaction statements,
|
||||
* skip our checks and immediately fall into standard_ProcessUtility.
|
||||
*/
|
||||
case T_TransactionStmt:
|
||||
|
||||
/*
|
||||
* Skip processing of variable set statements, to allow changing the
|
||||
* enable_version_checks GUC during testing.
|
||||
*/
|
||||
case T_VariableSetStmt:
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
|
||||
* EXTENSION statement which references the citus extension. This function
|
||||
|
@ -563,6 +515,10 @@ IsTransmitStmt(Node *parsetree)
|
|||
static void
|
||||
VerifyTransmitStmt(CopyStmt *copyStatement)
|
||||
{
|
||||
char *fileName = NULL;
|
||||
|
||||
EnsureSuperUser();
|
||||
|
||||
/* do some minimal option verification */
|
||||
if (copyStatement->relation == NULL ||
|
||||
copyStatement->relation->relname == NULL)
|
||||
|
@ -571,6 +527,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
|||
errmsg("FORMAT 'transmit' requires a target file")));
|
||||
}
|
||||
|
||||
fileName = copyStatement->relation->relname;
|
||||
|
||||
if (is_absolute_path(fileName))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
(errmsg("absolute path not allowed"))));
|
||||
}
|
||||
else if (!path_is_relative_and_below_cwd(fileName))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
(errmsg("path must be in or below the current directory"))));
|
||||
}
|
||||
|
||||
if (copyStatement->filename != NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
|
@ -1296,8 +1266,13 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
|
|||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *tableName = get_rel_name(relationId);
|
||||
|
||||
/* lock relation metadata before getting shard list */
|
||||
LockRelationDistributionMetadata(relationId, ShareLock);
|
||||
/*
|
||||
* We obtain ShareUpdateExclusiveLock here to not conflict with INSERT's
|
||||
* RowExclusiveLock. However if VACUUM FULL is used, we already obtain
|
||||
* AccessExclusiveLock before reaching to that point and INSERT's will be
|
||||
* blocked anyway. This is inline with PostgreSQL's own behaviour.
|
||||
*/
|
||||
LockRelationOid(relationId, ShareUpdateExclusiveLock);
|
||||
|
||||
shardIntervalList = LoadShardIntervalList(relationId);
|
||||
|
||||
|
@ -1471,11 +1446,9 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
|
|||
{
|
||||
/*
|
||||
* No version was specified, so PostgreSQL will use the default_version
|
||||
* from the citus.control file. In case a new default is available, we
|
||||
* will force a compatibility check of the latest available version.
|
||||
* from the citus.control file.
|
||||
*/
|
||||
availableExtensionVersion = NULL;
|
||||
ErrorIfAvailableVersionMismatch();
|
||||
CheckAvailableVersion(ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS)
|
|||
int commandIndex = 0;
|
||||
int commandCount = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
|
@ -437,7 +439,12 @@ StoreErrorMessage(PGconn *connection, StringInfo queryResultString)
|
|||
char *errorMessage = PQerrorMessage(connection);
|
||||
if (errorMessage != NULL)
|
||||
{
|
||||
char *firstNewlineIndex = strchr(errorMessage, '\n');
|
||||
char *firstNewlineIndex = NULL;
|
||||
|
||||
/* copy the error message to a writable memory */
|
||||
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));
|
||||
|
||||
firstNewlineIndex = strchr(errorMessage, '\n');
|
||||
|
||||
/* trim the error message at the line break */
|
||||
if (firstNewlineIndex != NULL)
|
||||
|
|
|
@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
|
|||
Oid distributedTableId = ResolveRelationId(tableNameText);
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
|
||||
|
||||
|
@ -112,8 +113,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
|||
*/
|
||||
EnsureTableOwner(distributedTableId);
|
||||
|
||||
/* we plan to add shards: get an exclusive metadata lock */
|
||||
LockRelationDistributionMetadata(distributedTableId, ExclusiveLock);
|
||||
/* we plan to add shards: get an exclusive lock on relation oid */
|
||||
LockRelationOid(distributedTableId, ExclusiveLock);
|
||||
|
||||
relationOwner = TableOwner(distributedTableId);
|
||||
|
||||
|
@ -263,8 +264,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
|||
*/
|
||||
EnsureTableOwner(targetRelationId);
|
||||
|
||||
/* we plan to add shards: get an exclusive metadata lock on the target relation */
|
||||
LockRelationDistributionMetadata(targetRelationId, ExclusiveLock);
|
||||
/* we plan to add shards: get an exclusive lock on target relation oid */
|
||||
LockRelationOid(targetRelationId, ExclusiveLock);
|
||||
|
||||
/* we don't want source table to get dropped before we colocate with it */
|
||||
LockRelationOid(sourceRelationId, AccessShareLock);
|
||||
|
@ -368,8 +369,8 @@ CreateReferenceTableShard(Oid distributedTableId)
|
|||
*/
|
||||
EnsureTableOwner(distributedTableId);
|
||||
|
||||
/* we plan to add shards: get an exclusive metadata lock */
|
||||
LockRelationDistributionMetadata(distributedTableId, ExclusiveLock);
|
||||
/* we plan to add shards: get an exclusive lock on relation oid */
|
||||
LockRelationOid(distributedTableId, ExclusiveLock);
|
||||
|
||||
relationOwner = TableOwner(distributedTableId);
|
||||
|
||||
|
|
|
@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
|
|||
bool failOK = false;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
queryTreeNode = ParseTreeNode(queryString);
|
||||
if (!IsA(queryTreeNode, DeleteStmt))
|
||||
|
@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
|
|||
char *relationName = text_to_cstring(relationNameText);
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName);
|
||||
|
||||
|
@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS)
|
|||
StringInfo dropSeqCommand = makeStringInfo();
|
||||
bool coordinator = IsCoordinator();
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* do nothing if DDL propagation is switched off or this is not the coordinator */
|
||||
if (!EnableDDLPropagation || !coordinator)
|
||||
{
|
||||
|
|
|
@ -46,14 +46,21 @@ Datum
|
|||
master_expire_table_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
List *workerNodeList = ActiveWorkerNodeList();
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
List *workerNodeList = NIL;
|
||||
ListCell *workerNodeCell = NULL;
|
||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||
int shardCount = 0;
|
||||
ShardInterval **shardIntervalArray = NULL;
|
||||
List **placementListArray = NULL;
|
||||
int shardIndex = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
workerNodeList = ActiveWorkerNodeList();
|
||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
||||
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||
|
||||
if (shardCount == 0)
|
||||
{
|
||||
ereport(WARNING, (errmsg("Table has no shards, no action is taken")));
|
||||
|
|
|
@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
|
|||
Oid relationId = PG_GETARG_OID(0);
|
||||
uint64 totalRelationSize = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
totalRelationSize = DistributedTableSize(relationId,
|
||||
PG_TOTAL_RELATION_SIZE_FUNCTION);
|
||||
|
||||
|
@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS)
|
|||
Oid relationId = PG_GETARG_OID(0);
|
||||
uint64 tableSize = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION);
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
|
@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
|||
Oid relationId = PG_GETARG_OID(0);
|
||||
uint64 relationSize = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION);
|
||||
|
||||
PG_RETURN_INT64(relationSize);
|
||||
|
|
|
@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
|||
int32 affectedTupleCount = 0;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
|
||||
queryTreeNode = ParseTreeNode(queryString);
|
||||
if (IsA(queryTreeNode, DeleteStmt))
|
||||
|
|
|
@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
|
|||
Datum values[TABLE_METADATA_FIELDS];
|
||||
bool isNulls[TABLE_METADATA_FIELDS];
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* find partition tuple for partitioned relation */
|
||||
partitionEntry = DistributedTableCacheEntry(relationId);
|
||||
|
||||
|
@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
|
|||
FuncCallContext *functionContext = NULL;
|
||||
ListCell *tableDDLEventCell = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* On the very first call to this function, we first use the given relation
|
||||
* name to get to the relation. We then recreate the list of DDL statements
|
||||
|
@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
|
|||
Datum shardIdDatum = 0;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
shardId = GetNextShardId();
|
||||
shardIdDatum = Int64GetDatum(shardId);
|
||||
|
@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
|
|||
Datum placementIdDatum = 0;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
placementId = GetNextPlacementId();
|
||||
placementIdDatum = Int64GetDatum(placementId);
|
||||
|
@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
|
|||
uint32 workerNodeIndex = 0;
|
||||
uint32 workerNodeCount = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
MemoryContext oldContext = NULL;
|
||||
|
|
|
@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* RepairShardPlacement function repairs only given shard */
|
||||
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
|
|
|
@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS)
|
|||
FmgrInfo *hashFunction = NULL;
|
||||
Oid valueDataType = InvalidOid;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* figure out hash function from the data type */
|
||||
valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0);
|
||||
typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO);
|
||||
|
|
|
@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
{
|
||||
text *relationNameText = PG_GETARG_TEXT_P(0);
|
||||
char *relationName = text_to_cstring(relationNameText);
|
||||
List *workerNodeList = ActiveWorkerNodeList();
|
||||
List *workerNodeList = NIL;
|
||||
uint64 shardId = INVALID_SHARD_ID;
|
||||
List *ddlEventList = NULL;
|
||||
uint32 attemptableNodeCount = 0;
|
||||
|
@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
|||
char replicationModel = REPLICATION_MODEL_INVALID;
|
||||
bool includeSequenceDefaults = false;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
workerNodeList = ActiveWorkerNodeList();
|
||||
|
||||
EnsureTablePermissions(relationId, ACL_INSERT);
|
||||
CheckDistributedTable(relationId);
|
||||
|
||||
|
@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
float4 shardFillLevel = 0.0;
|
||||
char partitionMethod = 0;
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid relationId = shardInterval->relationId;
|
||||
bool cstoreTable = CStoreTable(relationId);
|
||||
ShardInterval *shardInterval = NULL;
|
||||
Oid relationId = InvalidOid;
|
||||
bool cstoreTable = false;
|
||||
|
||||
char storageType = shardInterval->storageType;
|
||||
char storageType = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
shardInterval = LoadShardInterval(shardId);
|
||||
relationId = shardInterval->relationId;
|
||||
cstoreTable = CStoreTable(relationId);
|
||||
storageType = shardInterval->storageType;
|
||||
|
||||
EnsureTablePermissions(relationId, ACL_INSERT);
|
||||
|
||||
|
@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS)
|
|||
int64 shardId = PG_GETARG_INT64(0);
|
||||
uint64 shardSize = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
shardSize = UpdateShardStatistics(shardId);
|
||||
|
||||
PG_RETURN_INT64(shardSize);
|
||||
|
@ -750,9 +763,8 @@ ForeignConstraintGetReferencedTableId(char *queryString)
|
|||
if (constraint->contype == CONSTR_FOREIGN)
|
||||
{
|
||||
RangeVar *referencedTable = constraint->pktable;
|
||||
LOCKMODE lockmode = AlterTableGetLockLevel(foreignConstraintStmt->cmds);
|
||||
|
||||
return RangeVarGetRelid(referencedTable, lockmode,
|
||||
return RangeVarGetRelid(referencedTable, NoLock,
|
||||
foreignConstraintStmt->missing_ok);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
|
||||
EnsureCoordinator();
|
||||
EnsureSuperUser();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
PreventTransactionChain(true, "start_metadata_sync_to_node");
|
||||
|
||||
|
@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
|
|||
|
||||
EnsureCoordinator();
|
||||
EnsureSuperUser();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
workerNode = FindWorkerNode(nodeNameString, nodePort);
|
||||
if (workerNode == NULL)
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
|
@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS)
|
|||
errmsg("shard_id cannot be null")));
|
||||
}
|
||||
|
||||
|
||||
relationId = PG_GETARG_OID(0);
|
||||
shardId = PG_GETARG_INT64(1);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (shardId <= 0)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
|
|
|
@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
|
|||
{
|
||||
int recoveredTransactionCount = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
recoveredTransactionCount = RecoverPreparedTransactions();
|
||||
|
||||
PG_RETURN_INT32(recoveredTransactionCount);
|
||||
|
|
|
@ -236,76 +236,36 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *
|
|||
static Node *
|
||||
EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState)
|
||||
{
|
||||
if (IsA(expression, FuncExpr))
|
||||
if (expression == NULL || IsA(expression, Const))
|
||||
{
|
||||
FuncExpr *expr = (FuncExpr *) expression;
|
||||
return expression;
|
||||
}
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||
expr->funcresulttype,
|
||||
exprTypmod((Node *) expr),
|
||||
expr->funccollid,
|
||||
switch (nodeTag(expression))
|
||||
{
|
||||
case T_FuncExpr:
|
||||
case T_OpExpr:
|
||||
case T_DistinctExpr:
|
||||
case T_NullIfExpr:
|
||||
case T_CoerceViaIO:
|
||||
case T_ArrayCoerceExpr:
|
||||
case T_ScalarArrayOpExpr:
|
||||
case T_RowCompareExpr:
|
||||
case T_Param:
|
||||
case T_RelabelType:
|
||||
case T_CoerceToDomain:
|
||||
{
|
||||
return (Node *) citus_evaluate_expr((Expr *) expression,
|
||||
exprType(expression),
|
||||
exprTypmod(expression),
|
||||
exprCollation(expression),
|
||||
planState);
|
||||
}
|
||||
|
||||
if (IsA(expression, OpExpr) ||
|
||||
IsA(expression, DistinctExpr) ||
|
||||
IsA(expression, NullIfExpr))
|
||||
default:
|
||||
{
|
||||
/* structural equivalence */
|
||||
OpExpr *expr = (OpExpr *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||
expr->opresulttype, -1,
|
||||
expr->opcollid,
|
||||
planState);
|
||||
break;
|
||||
}
|
||||
|
||||
if (IsA(expression, CoerceViaIO))
|
||||
{
|
||||
CoerceViaIO *expr = (CoerceViaIO *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||
expr->resulttype, -1,
|
||||
expr->resultcollid,
|
||||
planState);
|
||||
}
|
||||
|
||||
if (IsA(expression, ArrayCoerceExpr))
|
||||
{
|
||||
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr,
|
||||
expr->resulttype,
|
||||
expr->resulttypmod,
|
||||
expr->resultcollid,
|
||||
planState);
|
||||
}
|
||||
|
||||
if (IsA(expression, ScalarArrayOpExpr))
|
||||
{
|
||||
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid,
|
||||
planState);
|
||||
}
|
||||
|
||||
if (IsA(expression, RowCompareExpr))
|
||||
{
|
||||
RowCompareExpr *expr = (RowCompareExpr *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid,
|
||||
planState);
|
||||
}
|
||||
|
||||
if (IsA(expression, Param))
|
||||
{
|
||||
Param *param = (Param *) expression;
|
||||
|
||||
return (Node *) citus_evaluate_expr((Expr *) param,
|
||||
param->paramtype,
|
||||
param->paramtypmod,
|
||||
param->paramcollid,
|
||||
planState);
|
||||
}
|
||||
|
||||
return expression;
|
||||
|
|
|
@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/primnodes.h"
|
||||
|
@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS)
|
|||
char *columnNodeString = NULL;
|
||||
text *columnNodeText = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
relation = relation_open(relationId, AccessShareLock);
|
||||
|
||||
column = BuildDistributionKeyFromColumnName(relation, columnName);
|
||||
|
@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS)
|
|||
char *columnName = NULL;
|
||||
text *columnText = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
columnName = ColumnNameToColumn(relationId, columnNodeString);
|
||||
|
||||
columnText = cstring_to_text(columnName);
|
||||
|
|
|
@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid;
|
|||
/* Citus extension version variables */
|
||||
bool EnableVersionChecks = true; /* version checks are enabled */
|
||||
|
||||
char *availableExtensionVersion = NULL;
|
||||
static char *installedExtensionVersion = NULL;
|
||||
static bool citusVersionKnownCompatible = false;
|
||||
|
||||
/* Hash table for informations about each partition */
|
||||
static HTAB *DistTableCacheHash = NULL;
|
||||
|
@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1];
|
|||
|
||||
|
||||
/* local function forward declarations */
|
||||
static bool IsDistributedTableViaCatalog(Oid relationId);
|
||||
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
|
||||
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
|
||||
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
||||
|
@ -142,7 +142,7 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
|||
int shardIntervalArrayLength);
|
||||
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||
int shardCount);
|
||||
static void ErrorIfInstalledVersionMismatch(void);
|
||||
static bool CheckInstalledVersion(int elevel);
|
||||
static char * AvailableExtensionVersion(void);
|
||||
static char * InstalledExtensionVersion(void);
|
||||
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
|
||||
|
@ -184,22 +184,59 @@ IsDistributedTable(Oid relationId)
|
|||
{
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
|
||||
cacheEntry = LookupDistTableCacheEntry(relationId);
|
||||
|
||||
/*
|
||||
* Can't be a distributed relation if the extension hasn't been loaded
|
||||
* yet. As we can't do lookups in nonexistent tables, directly return
|
||||
* false.
|
||||
* If extension hasn't been created, or has the wrong version and the
|
||||
* table isn't a distributed one, LookupDistTableCacheEntry() will return NULL.
|
||||
*/
|
||||
if (!CitusHasBeenLoaded())
|
||||
if (!cacheEntry)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
cacheEntry = LookupDistTableCacheEntry(relationId);
|
||||
|
||||
return cacheEntry->isDistributedTable;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsDistributedTableViaCatalog returns whether the given relation is a
|
||||
* distributed table or not.
|
||||
*
|
||||
* It does so by searching pg_dist_partition, explicitly bypassing caches,
|
||||
* because this function is designed to be used in cases where accessing
|
||||
* metadata tables is not safe.
|
||||
*
|
||||
* NB: Currently this still hardcodes pg_dist_partition logicalrelid column
|
||||
* offset and the corresponding index. If we ever come close to changing
|
||||
* that, we'll have to work a bit harder.
|
||||
*/
|
||||
static bool
|
||||
IsDistributedTableViaCatalog(Oid relationId)
|
||||
{
|
||||
HeapTuple partitionTuple = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
const int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[scanKeyCount];
|
||||
bool indexOK = true;
|
||||
|
||||
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||
|
||||
scanDescriptor = systable_beginscan(pgDistPartition,
|
||||
DistPartitionLogicalRelidIndexId(),
|
||||
indexOK, NULL, scanKeyCount, scanKey);
|
||||
|
||||
partitionTuple = systable_getnext(scanDescriptor);
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgDistPartition, AccessShareLock);
|
||||
|
||||
return HeapTupleIsValid(partitionTuple);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributedTableList returns a list that includes all the valid distributed table
|
||||
* cache entries.
|
||||
|
@ -211,6 +248,8 @@ DistributedTableList(void)
|
|||
List *distributedTableList = NIL;
|
||||
ListCell *distTableOidCell = NULL;
|
||||
|
||||
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
|
||||
|
||||
/* first, we need to iterate over pg_dist_partition */
|
||||
distTableOidList = DistTableOidList();
|
||||
|
||||
|
@ -360,6 +399,8 @@ LookupShardCacheEntry(int64 shardId)
|
|||
bool foundInCache = false;
|
||||
bool recheck = false;
|
||||
|
||||
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
|
||||
|
||||
/* probably not reachable */
|
||||
if (DistShardCacheHash == NULL)
|
||||
{
|
||||
|
@ -435,27 +476,16 @@ DistributedTableCacheEntry(Oid distributedRelationId)
|
|||
{
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
|
||||
/*
|
||||
* Can't be a distributed relation if the extension hasn't been loaded
|
||||
* yet. As we can't do lookups in nonexistent tables, directly return NULL
|
||||
* here.
|
||||
*/
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
cacheEntry = LookupDistTableCacheEntry(distributedRelationId);
|
||||
|
||||
if (cacheEntry->isDistributedTable)
|
||||
if (cacheEntry && cacheEntry->isDistributedTable)
|
||||
{
|
||||
return cacheEntry;
|
||||
}
|
||||
else
|
||||
{
|
||||
char *relationName = get_rel_name(distributedRelationId);
|
||||
ereport(ERROR, (errmsg("relation %s is not distributed",
|
||||
relationName)));
|
||||
ereport(ERROR, (errmsg("relation %s is not distributed", relationName)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -471,11 +501,48 @@ LookupDistTableCacheEntry(Oid relationId)
|
|||
bool foundInCache = false;
|
||||
void *hashKey = (void *) &relationId;
|
||||
|
||||
/*
|
||||
* Can't be a distributed relation if the extension hasn't been loaded
|
||||
* yet. As we can't do lookups in nonexistent tables, directly return NULL
|
||||
* here.
|
||||
*/
|
||||
if (!CitusHasBeenLoaded())
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (DistTableCacheHash == NULL)
|
||||
{
|
||||
InitializeDistTableCache();
|
||||
}
|
||||
|
||||
/*
|
||||
* If the version is not known to be compatible, perform thorough check,
|
||||
* unless such checks are disabled.
|
||||
*/
|
||||
if (!citusVersionKnownCompatible && EnableVersionChecks)
|
||||
{
|
||||
bool isDistributed = IsDistributedTableViaCatalog(relationId);
|
||||
int reportLevel = DEBUG1;
|
||||
|
||||
/*
|
||||
* If there's a version-mismatch, and we're dealing with a distributed
|
||||
* table, we have to error out as we can't return a valid entry. We
|
||||
* want to check compatibility in the non-distributed case as well, so
|
||||
* future lookups can use the cache if compatible.
|
||||
*/
|
||||
if (isDistributed)
|
||||
{
|
||||
reportLevel = ERROR;
|
||||
}
|
||||
|
||||
if (!CheckCitusVersion(reportLevel))
|
||||
{
|
||||
/* incompatible, can't access cache, so return before doing so */
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache);
|
||||
|
||||
/* return valid matches */
|
||||
|
@ -1066,86 +1133,116 @@ CitusHasBeenLoaded(void)
|
|||
DistPartitionRelationId();
|
||||
|
||||
/*
|
||||
* We also set installedExtensionVersion to NULL so that it will be re-read
|
||||
* in case of extension update.
|
||||
* We also reset citusVersionKnownCompatible, so it will be re-read in
|
||||
* case of extension update.
|
||||
*/
|
||||
installedExtensionVersion = NULL;
|
||||
citusVersionKnownCompatible = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (extensionLoaded)
|
||||
{
|
||||
ErrorIfAvailableVersionMismatch();
|
||||
ErrorIfInstalledVersionMismatch();
|
||||
}
|
||||
|
||||
return extensionLoaded;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and
|
||||
* currently available version from citus.control file. If they are not same in
|
||||
* major or minor version numbers, this function errors out. It ignores the schema
|
||||
* version.
|
||||
* CheckCitusVersion checks whether there is a version mismatch between the
|
||||
* available version and the loaded version or between the installed version
|
||||
* and the loaded version. Returns true if compatible, false otherwise.
|
||||
*
|
||||
* As a side effect, this function also sets citusVersionKnownCompatible global
|
||||
* variable to true which reduces version check cost of next calls.
|
||||
*/
|
||||
void
|
||||
ErrorIfAvailableVersionMismatch(void)
|
||||
bool
|
||||
CheckCitusVersion(int elevel)
|
||||
{
|
||||
if (citusVersionKnownCompatible ||
|
||||
!CitusHasBeenLoaded() ||
|
||||
!EnableVersionChecks)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel))
|
||||
{
|
||||
citusVersionKnownCompatible = true;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently
|
||||
* available version from the citus.control file. If they are not compatible,
|
||||
* this function logs an error with the specified elevel and returns false,
|
||||
* otherwise it returns true.
|
||||
*/
|
||||
bool
|
||||
CheckAvailableVersion(int elevel)
|
||||
{
|
||||
char *availableVersion = NULL;
|
||||
|
||||
if (!EnableVersionChecks)
|
||||
{
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
availableVersion = AvailableExtensionVersion();
|
||||
|
||||
if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION))
|
||||
{
|
||||
ereport(ERROR, (errmsg("loaded Citus library version differs from latest "
|
||||
ereport(elevel, (errmsg("loaded Citus library version differs from latest "
|
||||
"available extension version"),
|
||||
errdetail("Loaded library requires %s, but the latest control "
|
||||
"file specifies %s.", CITUS_MAJORVERSION,
|
||||
availableVersion),
|
||||
errhint("Restart the database to load the latest Citus "
|
||||
"library.")));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently
|
||||
* and catalog version from pg_extemsion catalog table. If they are not same in
|
||||
* major or minor version numbers, this function errors out. It ignores the schema
|
||||
* version.
|
||||
* CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the
|
||||
* extension's current version from the pg_extemsion catalog table. If they
|
||||
* are not compatible, this function logs an error with the specified elevel,
|
||||
* otherwise it returns true.
|
||||
*/
|
||||
static void
|
||||
ErrorIfInstalledVersionMismatch(void)
|
||||
static bool
|
||||
CheckInstalledVersion(int elevel)
|
||||
{
|
||||
char *installedVersion = NULL;
|
||||
|
||||
if (!EnableVersionChecks)
|
||||
{
|
||||
return;
|
||||
}
|
||||
Assert(CitusHasBeenLoaded());
|
||||
Assert(EnableVersionChecks);
|
||||
|
||||
installedVersion = InstalledExtensionVersion();
|
||||
|
||||
if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION))
|
||||
{
|
||||
ereport(ERROR, (errmsg("loaded Citus library version differs from installed "
|
||||
ereport(elevel, (errmsg("loaded Citus library version differs from installed "
|
||||
"extension version"),
|
||||
errdetail("Loaded library requires %s, but the installed "
|
||||
"extension version is %s.", CITUS_MAJORVERSION,
|
||||
installedVersion),
|
||||
errhint("Run ALTER EXTENSION citus UPDATE and try again.")));
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MajorVersionsCompatible compares given two versions. If they are same in major
|
||||
* and minor version numbers, this function returns true. It ignores the schema
|
||||
* version.
|
||||
* MajorVersionsCompatible checks whether both versions are compatible. They
|
||||
* are if major and minor version numbers match, the schema version is
|
||||
* ignored. Returns true if compatible, false otherwise.
|
||||
*/
|
||||
bool
|
||||
MajorVersionsCompatible(char *leftVersion, char *rightVersion)
|
||||
|
@ -1203,12 +1300,7 @@ AvailableExtensionVersion(void)
|
|||
bool hasTuple = false;
|
||||
bool goForward = true;
|
||||
bool doCopy = false;
|
||||
|
||||
/* if we cached the result before, return it*/
|
||||
if (availableExtensionVersion != NULL)
|
||||
{
|
||||
return availableExtensionVersion;
|
||||
}
|
||||
char *availableExtensionVersion;
|
||||
|
||||
estate = CreateExecutorState();
|
||||
extensionsResultSet = makeNode(ReturnSetInfo);
|
||||
|
@ -1274,8 +1366,6 @@ AvailableExtensionVersion(void)
|
|||
|
||||
/*
|
||||
* InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table.
|
||||
* It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion
|
||||
* will not read the catalog tables again.
|
||||
*/
|
||||
static char *
|
||||
InstalledExtensionVersion(void)
|
||||
|
@ -1284,12 +1374,7 @@ InstalledExtensionVersion(void)
|
|||
SysScanDesc scandesc;
|
||||
ScanKeyData entry[1];
|
||||
HeapTuple extensionTuple = NULL;
|
||||
|
||||
/* if we cached the result before, return it*/
|
||||
if (installedExtensionVersion != NULL)
|
||||
{
|
||||
return installedExtensionVersion;
|
||||
}
|
||||
char *installedExtensionVersion = NULL;
|
||||
|
||||
relation = heap_open(ExtensionRelationId, AccessShareLock);
|
||||
|
||||
|
@ -1670,6 +1755,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
|
|||
errmsg("must be called as trigger")));
|
||||
}
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
newTuple = triggerData->tg_newtuple;
|
||||
oldTuple = triggerData->tg_trigtuple;
|
||||
|
||||
|
@ -1731,6 +1818,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
|
|||
errmsg("must be called as trigger")));
|
||||
}
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
newTuple = triggerData->tg_newtuple;
|
||||
oldTuple = triggerData->tg_trigtuple;
|
||||
|
||||
|
@ -1792,6 +1881,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
|
|||
errmsg("must be called as trigger")));
|
||||
}
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
newTuple = triggerData->tg_newtuple;
|
||||
oldTuple = triggerData->tg_trigtuple;
|
||||
|
||||
|
@ -1848,6 +1939,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
|
|||
errmsg("must be called as trigger")));
|
||||
}
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||
|
||||
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||
|
@ -1871,6 +1964,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
|
|||
errmsg("must be called as trigger")));
|
||||
}
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
|
||||
|
||||
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||
|
|
|
@ -95,8 +95,11 @@ master_add_node(PG_FUNCTION_ARGS)
|
|||
bool hasMetadata = false;
|
||||
bool isActive = false;
|
||||
bool nodeAlreadyExists = false;
|
||||
Datum nodeRecord;
|
||||
|
||||
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
hasMetadata, isActive, &nodeAlreadyExists);
|
||||
|
||||
/*
|
||||
|
@ -129,8 +132,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
|
|||
bool hasMetadata = false;
|
||||
bool isActive = false;
|
||||
bool nodeAlreadyExists = false;
|
||||
Datum nodeRecord;
|
||||
|
||||
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
|
||||
hasMetadata, isActive, &nodeAlreadyExists);
|
||||
|
||||
PG_RETURN_CSTRING(nodeRecord);
|
||||
|
@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS)
|
|||
int32 nodePort = PG_GETARG_INT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
RemoveNodeFromCluster(nodeNameString, nodePort);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
|
@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS)
|
|||
bool hasShardPlacements = false;
|
||||
bool isActive = false;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
|
||||
|
||||
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
|
||||
|
@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS)
|
|||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
Datum nodeRecord = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
nodeRecord = ActivateNode(nodeNameString, nodePort);
|
||||
|
||||
PG_RETURN_CSTRING(nodeRecord);
|
||||
|
@ -263,9 +275,12 @@ Datum
|
|||
master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ListCell *workerNodeCell = NULL;
|
||||
List *workerNodes = ParseWorkerNodeFileAndRename();
|
||||
List *workerNodes = NULL;
|
||||
bool nodeAlreadyExists = false;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
workerNodes = ParseWorkerNodeFileAndRename();
|
||||
foreach(workerNodeCell, workerNodes)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
|
@ -273,8 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
|
|||
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
|
||||
workerNode->workerRack, false, workerNode->isActive,
|
||||
&nodeAlreadyExists);
|
||||
|
||||
ActivateNode(workerNode->workerName, workerNode->workerPort);
|
||||
}
|
||||
|
||||
PG_RETURN_BOOL(true);
|
||||
|
@ -293,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
|||
char distributionMethod = 0;
|
||||
Oid relationId = InvalidOid;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
|
||||
* we need to check all parameters for NULL values.
|
||||
|
@ -1045,19 +1060,19 @@ ParseWorkerNodeFileAndRename()
|
|||
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
|
||||
workerNode->workerPort = nodePort;
|
||||
workerNode->hasMetadata = false;
|
||||
workerNode->isActive = false;
|
||||
workerNode->isActive = true;
|
||||
|
||||
workerNodeList = lappend(workerNodeList, workerNode);
|
||||
}
|
||||
|
||||
FreeFile(workerFileStream);
|
||||
free(workerFilePath);
|
||||
|
||||
/* rename the file, marking that it is not used anymore */
|
||||
appendStringInfo(renamedWorkerFilePath, "%s", workerFilePath);
|
||||
appendStringInfo(renamedWorkerFilePath, ".obsolete");
|
||||
rename(workerFilePath, renamedWorkerFilePath->data);
|
||||
|
||||
FreeFile(workerFileStream);
|
||||
free(workerFilePath);
|
||||
|
||||
return workerNodeList;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
|||
DistTableCacheEntry *tableEntry = NULL;
|
||||
|
||||
EnsureCoordinator();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (!IsDistributedTable(relationId))
|
||||
{
|
||||
|
|
|
@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS)
|
|||
int shardIdCount = 0;
|
||||
int shardIdIndex = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (ARR_NDIM(shardIdArrayObject) == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("no locks specified")));
|
||||
|
@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS)
|
|||
int shardIdCount = 0;
|
||||
int shardIdIndex = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (ARR_NDIM(shardIdArrayObject) == 0)
|
||||
{
|
||||
ereport(ERROR, (errmsg("no locks specified")));
|
||||
|
@ -180,21 +184,6 @@ TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockRelationDistributionMetadata returns after getting a the lock used for a
|
||||
* relation's distribution metadata, blocking if required. Only ExclusiveLock
|
||||
* and ShareLock modes are supported. Any locks acquired using this method are
|
||||
* released at transaction end.
|
||||
*/
|
||||
void
|
||||
LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode)
|
||||
{
|
||||
Assert(lockMode == ExclusiveLock || lockMode == ShareLock);
|
||||
|
||||
(void) LockRelationOid(relationId, lockMode);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockShardResource acquires a lock needed to modify data on a remote shard.
|
||||
* This task may be assigned to multiple backends at the same time, so the lock
|
||||
|
|
|
@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
|
|||
char *taskCallString = text_to_cstring(taskCallStringText);
|
||||
uint32 taskCallStringLength = strlen(taskCallString);
|
||||
|
||||
bool taskTrackerRunning = false;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* check that we have a running task tracker on this host */
|
||||
bool taskTrackerRunning = TaskTrackerRunning();
|
||||
taskTrackerRunning = TaskTrackerRunning();
|
||||
if (!taskTrackerRunning)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW),
|
||||
|
@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
|
|||
WorkerTask *workerTask = NULL;
|
||||
uint32 taskStatus = 0;
|
||||
|
||||
bool taskTrackerRunning = TaskTrackerRunning();
|
||||
bool taskTrackerRunning = false;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
taskTrackerRunning = TaskTrackerRunning();
|
||||
|
||||
if (taskTrackerRunning)
|
||||
{
|
||||
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
||||
|
@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
|||
StringInfo jobDirectoryName = NULL;
|
||||
StringInfo jobSchemaName = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* We first clean up any open connections, and remove tasks belonging to
|
||||
* this job from the shared hash.
|
||||
|
|
|
@ -48,10 +48,12 @@ bool ExpireCachedShards = false;
|
|||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void FetchRegularFile(const char *nodeName, uint32 nodePort,
|
||||
StringInfo remoteFilename, StringInfo localFilename);
|
||||
static void FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
|
||||
StringInfo remoteFilename,
|
||||
StringInfo localFilename);
|
||||
static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
|
||||
StringInfo transmitCommand, StringInfo filePath);
|
||||
const char *nodeUser, StringInfo transmitCommand,
|
||||
StringInfo filePath);
|
||||
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
||||
int32 fileDescriptor);
|
||||
static void DeleteFile(const char *filename);
|
||||
|
@ -114,13 +116,18 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
|
|||
* task directory does not exist. We then lock and create the directory.
|
||||
*/
|
||||
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (!taskDirectoryExists)
|
||||
{
|
||||
InitTaskDirectory(jobId, upstreamTaskId);
|
||||
}
|
||||
|
||||
nodeName = text_to_cstring(nodeNameText);
|
||||
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
|
||||
|
||||
/* we've made sure the file names are sanitized, safe to fetch as superuser */
|
||||
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -155,13 +162,18 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
|
|||
* task directory does not exist. We then lock and create the directory.
|
||||
*/
|
||||
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (!taskDirectoryExists)
|
||||
{
|
||||
InitTaskDirectory(jobId, upstreamTaskId);
|
||||
}
|
||||
|
||||
nodeName = text_to_cstring(nodeNameText);
|
||||
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
|
||||
|
||||
/* we've made sure the file names are sanitized, safe to fetch as superuser */
|
||||
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -180,11 +192,16 @@ TaskFilename(StringInfo directoryName, uint32 taskId)
|
|||
}
|
||||
|
||||
|
||||
/* Helper function to transfer the remote file in an idempotent manner. */
|
||||
/*
|
||||
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
|
||||
* manner. It connects to the remote node as superuser to give file access.
|
||||
* Callers must make sure that the file names are sanitized.
|
||||
*/
|
||||
static void
|
||||
FetchRegularFile(const char *nodeName, uint32 nodePort,
|
||||
FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
|
||||
StringInfo remoteFilename, StringInfo localFilename)
|
||||
{
|
||||
char *nodeUser = NULL;
|
||||
StringInfo attemptFilename = NULL;
|
||||
StringInfo transmitCommand = NULL;
|
||||
uint32 randomId = (uint32) random();
|
||||
|
@ -203,7 +220,11 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
|
|||
transmitCommand = makeStringInfo();
|
||||
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data);
|
||||
|
||||
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, attemptFilename);
|
||||
/* connect as superuser to give file access */
|
||||
nodeUser = CitusExtensionOwnerName();
|
||||
|
||||
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
|
||||
attemptFilename);
|
||||
if (!received)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not receive file \"%s\" from %s:%u",
|
||||
|
@ -230,7 +251,7 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
|
|||
* and returns false.
|
||||
*/
|
||||
static bool
|
||||
ReceiveRegularFile(const char *nodeName, uint32 nodePort,
|
||||
ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
|
||||
StringInfo transmitCommand, StringInfo filePath)
|
||||
{
|
||||
int32 fileDescriptor = -1;
|
||||
|
@ -262,7 +283,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort,
|
|||
nodeDatabase = get_database_name(MyDatabaseId);
|
||||
|
||||
/* connect to remote node */
|
||||
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
|
||||
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
|
||||
if (connectionId == INVALID_CONNECTION_ID)
|
||||
{
|
||||
ReceiveResourceCleanup(connectionId, filename, fileDescriptor);
|
||||
|
@ -415,6 +436,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
|
|||
const char *ddlCommand = text_to_cstring(ddlCommandText);
|
||||
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* extend names in ddl command and apply extended command */
|
||||
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
|
||||
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
|
||||
|
@ -443,6 +466,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
|
|||
const char *ddlCommand = text_to_cstring(ddlCommandText);
|
||||
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* extend names in ddl command and apply extended command */
|
||||
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
|
||||
leftShardSchemaName, rightShardId,
|
||||
|
@ -472,6 +497,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
|||
Oid sequenceRelationId = InvalidOid;
|
||||
|
||||
NodeTag nodeType = nodeTag(commandNode);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (nodeType != T_CreateSeqStmt)
|
||||
{
|
||||
ereport(ERROR,
|
||||
|
@ -512,6 +540,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
|
|||
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* Run common logic to fetch the remote table, and use the provided function
|
||||
* pointer to perform the actual table fetching.
|
||||
|
@ -536,6 +566,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
|
|||
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* Run common logic to fetch the remote table, and use the provided function
|
||||
* pointer to perform the actual table fetching.
|
||||
|
@ -795,7 +827,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
|||
remoteCopyCommand = makeStringInfo();
|
||||
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
|
||||
|
||||
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
|
||||
received = ReceiveRegularFile(nodeName, nodePort, NULL, remoteCopyCommand,
|
||||
localFilePath);
|
||||
if (!received)
|
||||
{
|
||||
return false;
|
||||
|
@ -835,8 +868,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
|||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
|
||||
/*
|
||||
* Copy local file into the relation. We call ProcessUtility() instead of
|
||||
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
|
||||
|
@ -855,6 +886,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
|||
/* finally delete the temporary file we created */
|
||||
DeleteFile(localFilePath->data);
|
||||
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -870,6 +903,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
|||
static bool
|
||||
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||
{
|
||||
const char *nodeUser = NULL;
|
||||
StringInfo localFilePath = NULL;
|
||||
StringInfo remoteFilePath = NULL;
|
||||
StringInfo transmitCommand = NULL;
|
||||
|
@ -896,7 +930,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
|||
transmitCommand = makeStringInfo();
|
||||
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilePath->data);
|
||||
|
||||
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, localFilePath);
|
||||
/*
|
||||
* We allow some arbitrary input in the file name and connect to the remote
|
||||
* node as superuser to transmit. Therefore, we only allow calling this
|
||||
* function when already running as superuser.
|
||||
*/
|
||||
EnsureSuperUser();
|
||||
nodeUser = CitusExtensionOwnerName();
|
||||
|
||||
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
|
||||
localFilePath);
|
||||
if (!received)
|
||||
{
|
||||
return false;
|
||||
|
@ -1210,6 +1253,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
bool received = false;
|
||||
StringInfo queryString = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* We extract schema names and table names from qualified names */
|
||||
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
|
||||
|
||||
|
@ -1233,7 +1278,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
sourceCopyCommand = makeStringInfo();
|
||||
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
||||
|
||||
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand,
|
||||
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
|
||||
localFilePath);
|
||||
if (!received)
|
||||
{
|
||||
|
|
|
@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
|||
|
||||
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
|
||||
Relation distributedRelation = NULL;
|
||||
List *shardList = LoadShardList(relationId);
|
||||
List *shardList = NULL;
|
||||
ListCell *shardCell = NULL;
|
||||
char relationKind = '\0';
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
shardList = LoadShardList(relationId);
|
||||
|
||||
/* first check the relation type */
|
||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||
relationKind = distributedRelation->rd_rel->relkind;
|
||||
|
|
|
@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS)
|
|||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||
|
||||
ListCell *optionCell = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
foreach(optionCell, foreignTable->options)
|
||||
{
|
||||
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||
|
@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS)
|
|||
(void) blockId;
|
||||
(void) dataDirectoryObject;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("called function is currently unsupported")));
|
||||
|
||||
|
|
|
@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
|||
/* we should have the same number of column names and types */
|
||||
int32 columnNameCount = ArrayObjectCount(columnNameObject);
|
||||
int32 columnTypeCount = ArrayObjectCount(columnTypeObject);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (columnNameCount != columnTypeCount)
|
||||
{
|
||||
ereport(ERROR, (errmsg("column name array size: %d and type array size: %d"
|
||||
|
@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
|||
int createIntermediateTableResult = 0;
|
||||
int finished = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/*
|
||||
* If the schema for the job isn't already created by the task tracker
|
||||
* protocol, we fall to using the default 'public' schema.
|
||||
|
@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS)
|
|||
int scanKeyCount = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock);
|
||||
scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey);
|
||||
|
||||
|
|
|
@ -111,6 +111,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
|
|||
|
||||
/* first check that array element's and partition column's types match */
|
||||
Oid splitPointType = ARR_ELEMTYPE(splitPointObject);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (splitPointType != partitionColumnType)
|
||||
{
|
||||
ereport(ERROR, (errmsg("partition column type %u and split point type %u "
|
||||
|
@ -183,6 +186,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
|
|||
FileOutputStream *partitionFileArray = NULL;
|
||||
uint32 fileCount = partitionCount;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* use column's type information to get the hashing function */
|
||||
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC);
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS)
|
|||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
EnsureSuperUser();
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* Create the truncate trigger */
|
||||
CreateTruncateTrigger(relationId);
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
#include "utils/hsearch.h"
|
||||
|
||||
extern bool EnableVersionChecks;
|
||||
extern char *availableExtensionVersion;
|
||||
|
||||
/*
|
||||
* Representation of a table's metadata that is frequently used for
|
||||
|
@ -79,7 +78,8 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
|||
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
|
||||
|
||||
extern bool CitusHasBeenLoaded(void);
|
||||
void ErrorIfAvailableVersionMismatch(void);
|
||||
extern bool CheckCitusVersion(int elevel);
|
||||
extern bool CheckAvailableVersion(int elevel);
|
||||
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
|
||||
|
||||
/* access WorkerNodeHash */
|
||||
|
|
|
@ -98,7 +98,7 @@ typedef struct WaitInfo
|
|||
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
|
||||
const char *nodeDatabase, const char *nodeUser);
|
||||
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
|
||||
const char *nodeDatabase);
|
||||
const char *nodeDatabase, const char *nodeUser);
|
||||
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
|
||||
extern void MultiClientDisconnect(int32 connectionId);
|
||||
extern bool MultiClientConnectionUp(int32 connectionId);
|
||||
|
|
|
@ -20,6 +20,9 @@
|
|||
#include "tcop/dest.h"
|
||||
|
||||
|
||||
#define INVALID_PARTITION_COLUMN_INDEX -1
|
||||
|
||||
|
||||
/*
|
||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||
|
@ -90,6 +93,7 @@ typedef struct CitusCopyDestReceiver
|
|||
/* function declarations for copying into a distributed table */
|
||||
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||
List *columnNameList,
|
||||
int partitionColumnIndex,
|
||||
EState *executorState,
|
||||
bool stopOnFailure);
|
||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
|
|
|
@ -154,6 +154,7 @@ typedef struct TaskTracker
|
|||
{
|
||||
uint32 workerPort; /* node's port; part of hash table key */
|
||||
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
|
||||
char *userName; /* which user to connect as */
|
||||
TrackerStatus trackerStatus;
|
||||
int32 connectionId;
|
||||
uint32 connectPollCount;
|
||||
|
|
|
@ -32,6 +32,7 @@ extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
|
|||
extern void ReportConnectionError(MultiConnection *connection, int elevel);
|
||||
extern void ReportResultError(MultiConnection *connection, struct pg_result *result,
|
||||
int elevel);
|
||||
extern char * pchomp(const char *in);
|
||||
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
|
||||
|
||||
/* wrappers around libpq functions, with command logging support */
|
||||
|
|
|
@ -66,7 +66,6 @@ typedef enum AdvisoryLocktagClass
|
|||
/* Lock shard/relation metadata for safe modifications */
|
||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
extern void LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode);
|
||||
|
||||
/* Lock shard data, for DML commands or remote fetches */
|
||||
extern void LockShardResource(uint64 shardId, LOCKMODE lockmode);
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-vacuum-analyze s1-commit
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
INSERT INTO test_insert_vacuum VALUES(1, 1);
|
||||
|
||||
step s2-vacuum-analyze:
|
||||
VACUUM ANALYZE test_insert_vacuum;
|
||||
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-vacuum-full s1-commit
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-insert:
|
||||
INSERT INTO test_insert_vacuum VALUES(1, 1);
|
||||
|
||||
step s2-vacuum-full:
|
||||
VACUUM FULL test_insert_vacuum;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-vacuum-full: <... completed>
|
|
@ -483,24 +483,31 @@ CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema
|
|||
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
|
||||
END;
|
||||
-- Test data loading after dropping a column
|
||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
|
||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
||||
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||
ALTER TABLE data_load_test DROP COLUMN col2;
|
||||
SELECT create_distributed_table('data_load_test', 'col1');
|
||||
ALTER TABLE data_load_test DROP COLUMN col1;
|
||||
SELECT create_distributed_table('data_load_test', 'col3');
|
||||
NOTICE: Copying data from local table...
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM data_load_test;
|
||||
col1 | col3
|
||||
------+-------
|
||||
132 | world
|
||||
243 | hello
|
||||
SELECT * FROM data_load_test ORDER BY col2;
|
||||
col2 | col3 | CoL4")
|
||||
-------+-------+--------
|
||||
hello | world |
|
||||
world | hello |
|
||||
(2 rows)
|
||||
|
||||
-- make sure the tuple went to the right shard
|
||||
SELECT * FROM data_load_test WHERE col3 = 'world';
|
||||
col2 | col3 | CoL4")
|
||||
-------+-------+--------
|
||||
hello | world |
|
||||
(1 row)
|
||||
|
||||
DROP TABLE data_load_test;
|
||||
SET citus.shard_replication_factor TO default;
|
||||
SET citus.shard_count to 4;
|
||||
|
|
|
@ -84,7 +84,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
|
|||
SHOW citus.version;
|
||||
citus.version
|
||||
---------------
|
||||
6.2devel
|
||||
6.2.5
|
||||
(1 row)
|
||||
|
||||
-- ensure no objects were created outside pg_catalog
|
||||
|
@ -108,6 +108,91 @@ CREATE EXTENSION citus VERSION '5.0';
|
|||
ERROR: specified version incompatible with loaded Citus library
|
||||
DETAIL: Loaded library requires 6.2, but 5.0 was specified.
|
||||
HINT: If a newer library is present, restart the database and try the command again.
|
||||
-- Test non-distributed queries work even in version mismatch
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
CREATE EXTENSION citus VERSION '6.1-17';
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
-- Test CREATE TABLE
|
||||
CREATE TABLE version_mismatch_table(column1 int);
|
||||
-- Test COPY
|
||||
\copy version_mismatch_table FROM STDIN;
|
||||
-- Test INSERT
|
||||
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
||||
|
||||
-- Test SELECT
|
||||
SELECT * FROM version_mismatch_table ORDER BY column1;
|
||||
column1
|
||||
---------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
(6 rows)
|
||||
|
||||
-- Test SELECT from pg_catalog
|
||||
SELECT d.datname as "Name",
|
||||
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
|
||||
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
|
||||
FROM pg_catalog.pg_database d
|
||||
ORDER BY 1;
|
||||
Name | Owner | Access privileges
|
||||
------------+----------+-----------------------
|
||||
postgres | postgres |
|
||||
regression | postgres |
|
||||
template0 | postgres | =c/postgres +
|
||||
| | postgres=CTc/postgres
|
||||
template1 | postgres | =c/postgres +
|
||||
| | postgres=CTc/postgres
|
||||
(4 rows)
|
||||
|
||||
-- We should not distribute table in version mistmatch
|
||||
SELECT create_distributed_table('version_mismatch_table', 'column1');
|
||||
ERROR: loaded Citus library version differs from installed extension version
|
||||
DETAIL: Loaded library requires 6.2, but the installed extension version is 6.1-17.
|
||||
HINT: Run ALTER EXTENSION citus UPDATE and try again.
|
||||
-- This function will cause fail in next ALTER EXTENSION
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
|
||||
RETURNS bigint LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
END;
|
||||
$function$;
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
-- This will fail because of previous function declaration
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||
ERROR: function "citus_table_size" already exists with same argument types
|
||||
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
DROP FUNCTION citus_table_size(regclass);
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||
-- Test updating to the latest version without specifying the version number
|
||||
ALTER EXTENSION citus UPDATE;
|
||||
-- re-create in newest version
|
||||
DROP EXTENSION citus;
|
||||
\c
|
||||
CREATE EXTENSION citus;
|
||||
-- test cache invalidation in workers
|
||||
\c - - - :worker_1_port
|
||||
-- this will initialize the cache
|
||||
\d
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
--------+------+------+-------
|
||||
(0 rows)
|
||||
|
||||
DROP EXTENSION citus;
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
CREATE EXTENSION citus VERSION '5.2-4';
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
-- during ALTER EXTENSION, we should invalidate the cache
|
||||
ALTER EXTENSION citus UPDATE;
|
||||
-- if cache is invalidated succesfull, this \d should work without any problem
|
||||
\d
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
--------+------+------+-------
|
||||
(0 rows)
|
||||
|
||||
|
|
|
@ -92,6 +92,10 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
|
|||
NOTICE: relation "lineitem_orderkey_index" already exists, skipping
|
||||
-- Verify that we can create indexes concurrently
|
||||
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
|
||||
-- Verify that no-name local CREATE INDEX CONCURRENTLY works
|
||||
CREATE TABLE local_table (id integer, name text);
|
||||
CREATE INDEX CONCURRENTLY ON local_table(id);
|
||||
DROP TABLE local_table;
|
||||
-- Verify that all indexes got created on the master node and one of the workers
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
|
|
|
@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000;
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 2;
|
||||
CREATE TABLE test (id integer);
|
||||
CREATE TABLE test (id integer, val integer);
|
||||
SELECT create_distributed_table('test', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
@ -56,6 +56,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
|
|||
-- create prepare tests
|
||||
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
||||
PREPARE prepare_select AS SELECT count(*) FROM test;
|
||||
-- not allowed to read absolute paths, even as superuser
|
||||
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
|
||||
ERROR: absolute path not allowed
|
||||
-- check full permission
|
||||
SET ROLE full_access;
|
||||
EXECUTE prepare_insert(1);
|
||||
|
@ -85,7 +88,22 @@ SELECT count(*) FROM test;
|
|||
2
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
HINT: Run the command with a superuser.
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
HINT: Run the command with a superuser.
|
||||
-- check read permission
|
||||
SET ROLE read_access;
|
||||
EXECUTE prepare_insert(1);
|
||||
|
@ -117,6 +135,17 @@ SELECT count(*) FROM test;
|
|||
2
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
HINT: Run the command with a superuser.
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
-- check no permission
|
||||
SET ROLE no_access;
|
||||
|
@ -133,6 +162,13 @@ ERROR: permission denied for relation test
|
|||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
ERROR: permission denied for relation test
|
||||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
ERROR: permission denied for relation test
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
ERROR: operation is not allowed
|
||||
HINT: Run the command with a superuser.
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
RESET ROLE;
|
||||
DROP TABLE test;
|
||||
|
|
|
@ -900,6 +900,78 @@ SELECT key, value2 FROM prepare_func_table;
|
|||
(6 rows)
|
||||
|
||||
DROP TABLE prepare_func_table;
|
||||
-- Text columns can give issues when there is an implicit cast from varchar
|
||||
CREATE TABLE text_partition_column_table (
|
||||
key text NOT NULL,
|
||||
value int
|
||||
);
|
||||
SELECT create_distributed_table('text_partition_column_table', 'key');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
PREPARE prepared_relabel_insert(varchar) AS
|
||||
INSERT INTO text_partition_column_table VALUES ($1, 1);
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
SELECT key, value FROM text_partition_column_table ORDER BY key;
|
||||
key | value
|
||||
------+-------
|
||||
test | 1
|
||||
test | 1
|
||||
test | 1
|
||||
test | 1
|
||||
test | 1
|
||||
test | 1
|
||||
(6 rows)
|
||||
|
||||
DROP TABLE text_partition_column_table;
|
||||
-- Domain type columns can give issues
|
||||
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||
SELECT run_command_on_workers($$
|
||||
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
|
||||
$$);
|
||||
run_command_on_workers
|
||||
-------------------------------------
|
||||
(localhost,57637,t,"CREATE DOMAIN")
|
||||
(localhost,57638,t,"CREATE DOMAIN")
|
||||
(2 rows)
|
||||
|
||||
CREATE TABLE domain_partition_column_table (
|
||||
key test_key NOT NULL,
|
||||
value int
|
||||
);
|
||||
SELECT create_distributed_table('domain_partition_column_table', 'key');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
PREPARE prepared_coercion_to_domain_insert(text) AS
|
||||
INSERT INTO domain_partition_column_table VALUES ($1, 1);
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-1');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-2');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-3');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-4');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-5');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-6');
|
||||
SELECT key, value FROM domain_partition_column_table ORDER BY key;
|
||||
key | value
|
||||
--------+-------
|
||||
test-1 | 1
|
||||
test-2 | 1
|
||||
test-3 | 1
|
||||
test-4 | 1
|
||||
test-5 | 1
|
||||
test-6 | 1
|
||||
(6 rows)
|
||||
|
||||
DROP TABLE domain_partition_column_table;
|
||||
-- verify placement state updates invalidate shard state
|
||||
--
|
||||
-- We use a immutable function to check for that. The planner will
|
||||
|
|
|
@ -2163,7 +2163,6 @@ BEGIN;
|
|||
INSERT INTO failure_test VALUES (1, 1);
|
||||
WARNING: connection error: localhost:57638
|
||||
DETAIL: no connection to the server
|
||||
|
||||
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
|
||||
WHERE shardid IN (
|
||||
SELECT shardid FROM pg_dist_shard
|
||||
|
@ -2182,7 +2181,6 @@ ROLLBACK;
|
|||
INSERT INTO failure_test VALUES (2, 1);
|
||||
WARNING: connection error: localhost:57638
|
||||
DETAIL: no connection to the server
|
||||
|
||||
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
|
||||
WHERE shardid IN (
|
||||
SELECT shardid FROM pg_dist_shard
|
||||
|
|
|
@ -8,3 +8,5 @@ test: isolation_cluster_management
|
|||
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement
|
||||
test: isolation_concurrent_dml isolation_data_migration
|
||||
test: isolation_drop_shards isolation_copy_placement_vs_modification
|
||||
|
||||
test: isolation_insert_vs_vacuum
|
||||
|
|
|
@ -878,19 +878,15 @@ ALTER USER test_user WITH nologin;
|
|||
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash, line 1: "1,1"
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash, line 2: "2,2"
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash, line 3: "3,3"
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash, line 6: "6,6"
|
||||
-- verify shards in the first worker as marked invalid
|
||||
SELECT shardid, shardstate, nodename, nodeport
|
||||
|
@ -912,7 +908,6 @@ SELECT shardid, shardstate, nodename, nodeport
|
|||
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
|
||||
ERROR: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_reference, line 1: "3,1"
|
||||
-- verify shards for reference table are still valid
|
||||
SELECT shardid, shardstate, nodename, nodeport
|
||||
|
@ -930,11 +925,9 @@ SELECT shardid, shardstate, nodename, nodeport
|
|||
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
||||
WARNING: connection error: localhost:57637
|
||||
DETAIL: FATAL: role "test_user" is not permitted to log in
|
||||
|
||||
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
||||
ERROR: could not connect to any active placements
|
||||
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
setup
|
||||
{
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE test_insert_vacuum(column1 int, column2 int);
|
||||
SELECT create_distributed_table('test_insert_vacuum', 'column1');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE test_insert_vacuum;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-insert"
|
||||
{
|
||||
INSERT INTO test_insert_vacuum VALUES(1, 1);
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-vacuum-analyze"
|
||||
{
|
||||
VACUUM ANALYZE test_insert_vacuum;
|
||||
}
|
||||
|
||||
step "s2-vacuum-full"
|
||||
{
|
||||
VACUUM FULL test_insert_vacuum;
|
||||
}
|
||||
|
||||
# INSERT and VACUUM ANALYZE should not block each other.
|
||||
permutation "s1-begin" "s1-insert" "s2-vacuum-analyze" "s1-commit"
|
||||
|
||||
# INSERT and VACUUM FULL should block each other.
|
||||
permutation "s1-begin" "s1-insert" "s2-vacuum-full" "s1-commit"
|
||||
|
|
@ -260,12 +260,14 @@ DROP TABLE data_load_test;
|
|||
END;
|
||||
|
||||
-- Test data loading after dropping a column
|
||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
|
||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
||||
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||
ALTER TABLE data_load_test DROP COLUMN col2;
|
||||
SELECT create_distributed_table('data_load_test', 'col1');
|
||||
SELECT * FROM data_load_test;
|
||||
ALTER TABLE data_load_test DROP COLUMN col1;
|
||||
SELECT create_distributed_table('data_load_test', 'col3');
|
||||
SELECT * FROM data_load_test ORDER BY col2;
|
||||
-- make sure the tuple went to the right shard
|
||||
SELECT * FROM data_load_test WHERE col3 = 'world';
|
||||
DROP TABLE data_load_test;
|
||||
|
||||
SET citus.shard_replication_factor TO default;
|
||||
|
|
|
@ -100,6 +100,77 @@ RESET citus.enable_version_checks;
|
|||
DROP EXTENSION citus;
|
||||
CREATE EXTENSION citus VERSION '5.0';
|
||||
|
||||
-- Test non-distributed queries work even in version mismatch
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
CREATE EXTENSION citus VERSION '6.1-17';
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
|
||||
-- Test CREATE TABLE
|
||||
CREATE TABLE version_mismatch_table(column1 int);
|
||||
|
||||
-- Test COPY
|
||||
\copy version_mismatch_table FROM STDIN;
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
\.
|
||||
|
||||
-- Test INSERT
|
||||
INSERT INTO version_mismatch_table(column1) VALUES(5);
|
||||
|
||||
-- Test SELECT
|
||||
SELECT * FROM version_mismatch_table ORDER BY column1;
|
||||
|
||||
-- Test SELECT from pg_catalog
|
||||
SELECT d.datname as "Name",
|
||||
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
|
||||
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
|
||||
FROM pg_catalog.pg_database d
|
||||
ORDER BY 1;
|
||||
|
||||
-- We should not distribute table in version mistmatch
|
||||
SELECT create_distributed_table('version_mismatch_table', 'column1');
|
||||
|
||||
-- This function will cause fail in next ALTER EXTENSION
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
|
||||
RETURNS bigint LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
END;
|
||||
$function$;
|
||||
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
-- This will fail because of previous function declaration
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||
|
||||
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
DROP FUNCTION citus_table_size(regclass);
|
||||
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
ALTER EXTENSION citus UPDATE TO '6.2-2';
|
||||
|
||||
-- Test updating to the latest version without specifying the version number
|
||||
ALTER EXTENSION citus UPDATE;
|
||||
|
||||
-- re-create in newest version
|
||||
DROP EXTENSION citus;
|
||||
\c
|
||||
CREATE EXTENSION citus;
|
||||
|
||||
-- test cache invalidation in workers
|
||||
\c - - - :worker_1_port
|
||||
|
||||
-- this will initialize the cache
|
||||
\d
|
||||
DROP EXTENSION citus;
|
||||
SET citus.enable_version_checks TO 'false';
|
||||
CREATE EXTENSION citus VERSION '5.2-4';
|
||||
SET citus.enable_version_checks TO 'true';
|
||||
-- during ALTER EXTENSION, we should invalidate the cache
|
||||
ALTER EXTENSION citus UPDATE;
|
||||
|
||||
-- if cache is invalidated succesfull, this \d should work without any problem
|
||||
\d
|
||||
|
|
|
@ -66,6 +66,11 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
|
|||
-- Verify that we can create indexes concurrently
|
||||
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
|
||||
|
||||
-- Verify that no-name local CREATE INDEX CONCURRENTLY works
|
||||
CREATE TABLE local_table (id integer, name text);
|
||||
CREATE INDEX CONCURRENTLY ON local_table(id);
|
||||
DROP TABLE local_table;
|
||||
|
||||
-- Verify that all indexes got created on the master node and one of the workers
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
\c - - - :worker_1_port
|
||||
|
|
|
@ -10,7 +10,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
|
|||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 2;
|
||||
|
||||
CREATE TABLE test (id integer);
|
||||
CREATE TABLE test (id integer, val integer);
|
||||
SELECT create_distributed_table('test', 'id');
|
||||
|
||||
-- turn off propagation to avoid Enterprise processing the following section
|
||||
|
@ -47,6 +47,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
|
|||
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
|
||||
PREPARE prepare_select AS SELECT count(*) FROM test;
|
||||
|
||||
-- not allowed to read absolute paths, even as superuser
|
||||
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
|
||||
|
||||
-- check full permission
|
||||
SET ROLE full_access;
|
||||
|
||||
|
@ -59,8 +62,18 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
-- check read permission
|
||||
SET ROLE read_access;
|
||||
|
||||
|
@ -73,6 +86,13 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
|
||||
-- check no permission
|
||||
|
@ -87,6 +107,13 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
|
||||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
||||
-- should not be able to transmit directly
|
||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||
|
||||
SET citus.task_executor_type TO 'real-time';
|
||||
|
||||
RESET ROLE;
|
||||
|
|
|
@ -476,6 +476,53 @@ SELECT key, value2 FROM prepare_func_table;
|
|||
|
||||
DROP TABLE prepare_func_table;
|
||||
|
||||
-- Text columns can give issues when there is an implicit cast from varchar
|
||||
CREATE TABLE text_partition_column_table (
|
||||
key text NOT NULL,
|
||||
value int
|
||||
);
|
||||
SELECT create_distributed_table('text_partition_column_table', 'key');
|
||||
|
||||
PREPARE prepared_relabel_insert(varchar) AS
|
||||
INSERT INTO text_partition_column_table VALUES ($1, 1);
|
||||
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
EXECUTE prepared_relabel_insert('test');
|
||||
|
||||
SELECT key, value FROM text_partition_column_table ORDER BY key;
|
||||
|
||||
DROP TABLE text_partition_column_table;
|
||||
|
||||
-- Domain type columns can give issues
|
||||
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
|
||||
SELECT run_command_on_workers($$
|
||||
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
|
||||
$$);
|
||||
|
||||
CREATE TABLE domain_partition_column_table (
|
||||
key test_key NOT NULL,
|
||||
value int
|
||||
);
|
||||
SELECT create_distributed_table('domain_partition_column_table', 'key');
|
||||
|
||||
PREPARE prepared_coercion_to_domain_insert(text) AS
|
||||
INSERT INTO domain_partition_column_table VALUES ($1, 1);
|
||||
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-1');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-2');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-3');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-4');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-5');
|
||||
EXECUTE prepared_coercion_to_domain_insert('test-6');
|
||||
|
||||
SELECT key, value FROM domain_partition_column_table ORDER BY key;
|
||||
|
||||
DROP TABLE domain_partition_column_table;
|
||||
|
||||
-- verify placement state updates invalidate shard state
|
||||
--
|
||||
-- We use a immutable function to check for that. The planner will
|
||||
|
|
Loading…
Reference in New Issue