Compare commits

...

34 Commits
main ... v6.2.5

Author SHA1 Message Date
velioglu 6001f753ed Bump Citus version to 6.2.5 2018-01-11 13:01:30 +03:00
velioglu bf5543f92d Add changelog entry for 6.2.5 2018-01-11 12:56:20 +03:00
Onder Kalaci 67c4dac89b Properly copy and trim the error messages that come from pg_conn
When a NULL connection is provided to PQerrorMessage(), the
returned error message is a static text. Modifying that static
text, which doesn't necessarly be in a writeable memory, is
dangreous and might cause a segfault.
2018-01-11 12:52:55 +03:00
Burak Yucesoy 0ed33515a1 Bump Citus version to 6.2.4 2017-09-28 10:35:27 -07:00
Burak Yucesoy cab7185f23 Add CHANGELOG entry for 6.2.4 2017-09-28 10:32:28 -07:00
Marco Slot ea4e22473b Add multi-user re-partitioning regression tests 2017-09-28 15:38:05 +02:00
Marco Slot e4579f02c8 Execute transmit commands as superuser during task-tracker queries 2017-09-28 15:38:05 +02:00
Marco Slot 81687585d2 Check for absolute paths in COPY with format transmit 2017-09-28 15:29:50 +02:00
Marco Slot 14799ef2a0 Allow read-only users to run task-tracker queries 2017-09-28 15:29:50 +02:00
Marco Slot 96eca92fc7 Consider dropped columns that precede the partition column in COPY 2017-08-23 13:55:14 +02:00
Eren BaĹźak 44eacf14fc Fix pg_worker_list use-after-free bug
This change fixes a use-after-free bug while renaming obsolete
`pg_worker_list.conf` file, which causes Citus to crash during upgrade
(or even extension creation) if `pg_worker_list.conf` exists.
2017-08-15 11:03:51 +03:00
Burak Yucesoy 124b0f9020 Bump configure PACKAGE_VERSION to 6.2.3 2017-07-13 00:06:46 -06:00
Burak Yucesoy 0849da2708 Add CHANGELOG entry for 6.2.3 2017-07-13 00:06:46 -06:00
Burak Yucesoy dd6a456bed Add tests for concurrent INSERT and VACUUM behaviour 2017-07-13 00:06:46 -06:00
Burak Yucesoy 0f01dc3bb8 Remove LockRelationDistributionMetadata function 2017-07-13 00:06:46 -06:00
Burak Yucesoy ab509db0d8 Use ShareUpdateExclusiveLock instead ShareLock in VACUUM
Before this change, we used ShareLock to acquire lock on distributed tables while
running VACUUM. This makes VACUUM and INSERT block each other. With this change we
changed lock mode from ShareLock to ShareUpdateExclusiveLock, which does not conflict
with the locks INSERT acquire.
2017-07-13 00:06:46 -06:00
Marco Slot e735655d82 Add weird column name to create_distributed_table test 2017-07-06 23:00:56 +02:00
Marco Slot 1678deeecd Handle implicit casts in prepared INSERTs
Backported from citusdata/citus#1487
2017-07-06 22:11:16 +02:00
Marco Slot 43f9758787 Support quoted column-names in COPY logic 2017-06-30 17:07:06 +02:00
Jason Petersen 5bd4583935
Don't call PostProcessUtility for local commands
Backported from citusdata/citus#1457
cherry-pick of citusdata/citus@294aeff

It is intended only to aid in processing of distributed DDL commands,
but as written could execute during local CREATE INDEX CONCURRENTLY
commands.
2017-06-19 16:00:27 -06:00
Jason Petersen b839af761f
Bump configure PACKAGE_VERSION
Forgot this for the OSS 6.2.2 release (which will display 6.2.1 for
SHOW citus.version), but we still can get it into Enterprise.
2017-06-12 16:57:55 -06:00
Jason Petersen 16798e38f9 Add 6.2.2 CHANGELOG entry 2017-06-06 14:29:24 +03:00
Marco Slot fd05849eff Don't take a table lock in ForeignConstraintGetReferencedTableId 2017-05-31 15:27:18 -07:00
Jason Petersen aa2d2f46f4
Bump configure PACKAGE_VERSION
Prepping for release.
2017-05-24 13:56:28 -06:00
Jason Petersen b0aa0479cf
Bump CHANGELOG for 6.2.1 2017-05-24 13:56:28 -06:00
Burak Yucesoy 0c9901f042
Add tests for version check 2017-05-24 13:16:04 -06:00
Burak Yucesoy 52b056c301
Register cache invalidation callback before version checks
With this commit we start to register InvalidateDistRelationCacheCallback
function as cache invalidation callback function before version checks
because during version checks we use cache to look up relation ids of some
relations like pg_dist_relation or pg_dist_partition_logical_relid_index
and we want to know about cache invalidation before accessing them.
2017-05-24 13:16:04 -06:00
Burak Yucesoy 5b300a7aa9
Fix incorrect call to CheckInstalledVersion
During version update, we indirectly calld CheckInstalledVersion via
ChackCitusVersions. This obviously fails because during version update it is
expected to have version mismatch between installed version and binary version.
Thus, we remove that ChackCitusVersions. We now only call ChackAvailableVersion.
2017-05-24 13:16:03 -06:00
Burak Yucesoy 0530974d3d
Add tests for version checks 2017-05-24 13:14:08 -06:00
Burak Yucesoy c1ca6e6819 Add version checks to necessary UDFs 2017-05-22 10:43:40 +03:00
Burak Yucesoy c59481141f Only error out on distributed queries when there is version mismatch
Before this commit, we were erroring out at almost all queries if there is a
version mismatch. With this commit, we started to error out only requested
operation touches distributed tables.

Normally we would need to use distributed cache to understand whether a table
is distributed or not. However, it is not safe to read our metadata tables when
there is a version mismatch, thus it is not safe to create distributed cache.
Therefore for this specific occasion, we directly read from pg_dist_partition
table. However; reading from catalog is costly and we should not use this
method in other places as much as possible.
2017-05-22 10:43:40 +03:00
Burak Yucesoy cf4592ce28 Fix crash during upgrade from 5.2 to 6.2
This commit fixes the problem where we incorrectly try to reach distributed table
cache when the extension is not loaded completely. We tried to reach the cache
because we wanted to get reference table information to activate the node. However
it is actually not necessary to explicitly activate the nodes which come from
master_initialize_node_metadata. Because it only runs during extension creation and
at that time there are no reference tables and all nodes are considered as active.
2017-05-22 10:43:40 +03:00
Jason Petersen 062734a6f5
Bump configure PACKAGE_VERSION
Prepping for release.
2017-05-16 22:40:56 -06:00
Burak Yucesoy ac5fcbe998
Update CHANGELOG for v6.2.0
CHANGELOG changes for 6.2 release
2017-05-16 22:33:29 -06:00
59 changed files with 1113 additions and 381 deletions

View File

@ -1,3 +1,89 @@
### citus v6.2.5 (January 11, 2018) ###
* Fixes a bug that could crash the coordinator while reporting a remote error
### citus v6.2.4 (September 28, 2017) ###
* Updates task-tracker to limit file access
### citus v6.2.3 (July 13, 2017) ###
* Fixes a crash during execution of local CREATE INDEX CONCURRENTLY
* Fixes a bug preventing usage of quoted column names in COPY
* Fixes a bug in prepared INSERTs with implicit cast in partition column
* Relaxes locks in VACUUM to ensure concurrent execution with INSERT
### citus v6.2.2 (May 31, 2017) ###
* Fixes a common cause of deadlocks when repairing tables with foreign keys
### citus v6.2.1 (May 24, 2017) ###
* Relaxes version-check logic to avoid breaking non-distributed commands
### citus v6.2.0 (May 16, 2017) ###
* Increases SQL subquery coverage by pushing down more kinds of queries
* Adds CustomScan API support to allow read-only transactions
* Adds support for `CREATE/DROP INDEX CONCURRENTLY`
* Adds support for `ALTER TABLE ... ADD CONSTRAINT`
* Adds support for `ALTER TABLE ... RENAME COLUMN`
* Adds support for `DISABLE/ENABLE TRIGGER ALL`
* Adds support for expressions in the partition column in INSERTs
* Adds support for query parameters in combination with function evaluation
* Adds support for creating distributed tables from non-empty local tables
* Adds UDFs to get size of distributed tables
* Adds UDFs to add a new node without replicating reference tables
* Adds checks to prevent running Citus binaries with wrong metadata tables
* Improves shard pruning performance for range queries
* Improves planner performance for joins involving co-located tables
* Improves shard copy performance by creating indexes after copy
* Improves task-tracker performance by batching several status checks
* Enables router planner for queries on range partitioned table
* Changes `TRUNCATE` to drop local data only if `enable_ddl_propagation` is off
* Starts to execute DDL on coordinator before workers
* Fixes a bug causing incorrectly reading invalidated cache
* Fixes a bug related to creation of schemas of in workers with incorrect owner
* Fixes a bug related to concurrent run of shard drop functions
* Fixes a bug related to `EXPLAIN ANALYZE` with DML queries
* Fixes a bug related to SQL functions in FROM clause
* Adds a GUC variable to report cross shard queries
* Fixes a bug related to partition columns without native hash function
* Adds internal infrastructures and tests to improve development process
* Addresses various race conditions and deadlocks
* Improves and standardizes error messages
### citus v6.1.1 (May 5, 2017) ###
* Fixes a crash caused by router executor use after connection timeouts

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 6.2devel.
# Generated by GNU Autoconf 2.69 for Citus 6.2.5.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='6.2devel'
PACKAGE_STRING='Citus 6.2devel'
PACKAGE_VERSION='6.2.5'
PACKAGE_STRING='Citus 6.2.5'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1195,7 +1195,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 6.2devel to adapt to many kinds of systems.
\`configure' configures Citus 6.2.5 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1256,7 +1256,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 6.2devel:";;
short | recursive ) echo "Configuration of Citus 6.2.5:";;
esac
cat <<\_ACEOF
@ -1344,7 +1344,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 6.2devel
Citus configure 6.2.5
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1401,7 +1401,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 6.2devel, which was
It was created by Citus $as_me 6.2.5, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -3561,7 +3561,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 6.2devel, which was
This file was extended by Citus $as_me 6.2.5, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -3623,7 +3623,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 6.2devel
Citus config.status 6.2.5
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [6.2devel])
AC_INIT([Citus], [6.2.5])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -110,6 +110,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
bool requireEmpty = true;
EnsureCoordinator();
CheckCitusVersion(ERROR);
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
{
@ -147,6 +148,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
char *colocateWithTableName = NULL;
EnsureCoordinator();
CheckCitusVersion(ERROR);
/* guard against a binary update without a function update */
if (PG_NARGS() >= 4)
@ -233,13 +235,17 @@ static void
CreateReferenceTable(Oid relationId)
{
uint32 colocationId = INVALID_COLOCATION_ID;
List *workerNodeList = ActiveWorkerNodeList();
int replicationFactor = list_length(workerNodeList);
List *workerNodeList = NIL;
int replicationFactor = 0;
char *distributionColumnName = NULL;
bool requireEmpty = true;
char relationKind = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
workerNodeList = ActiveWorkerNodeList();
replicationFactor = list_length(workerNodeList);
/* if there are no workers, error out */
if (replicationFactor == 0)
@ -749,6 +755,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
List *columnNameList = NIL;
Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL;
Var *partitionColumn = NULL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
bool stopOnFailure = true;
EState *estate = NULL;
@ -778,6 +786,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
slot = MakeSingleTupleTableSlot(tupleDescriptor);
columnNameList = TupleDescColumnNameList(tupleDescriptor);
/* determine the partition column in the tuple descriptor */
partitionColumn = PartitionColumn(distributedRelationId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* initialise per-tuple memory context */
estate = CreateExecutorState();
econtext = GetPerTupleExprContext(estate);
@ -785,8 +800,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList, estate,
stopOnFailure);
columnNameList,
partitionColumnIndex,
estate, stopOnFailure);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -39,6 +39,7 @@ master_drop_distributed_table_metadata(PG_FUNCTION_ARGS)
char *tableName = text_to_cstring(tableNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CheckTableSchemaNameForDrop(relationId, &schemaName, &tableName);

View File

@ -299,6 +299,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
bool *columnNulls = NULL;
int columnIndex = 0;
List *columnNameList = NIL;
Var *partitionColumn = NULL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
TupleTableSlot *tupleTableSlot = NULL;
EState *executorState = NULL;
@ -326,6 +328,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
tupleTableSlot->tts_values = columnValues;
tupleTableSlot->tts_isnull = columnNulls;
/* determine the partition column index in the tuple descriptor */
partitionColumn = PartitionColumn(tableId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* build the list of column names for remote COPY statements */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
@ -350,8 +360,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
}
/* set up the destination for the COPY */
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState,
stopOnFailure);
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
executorState, stopOnFailure);
dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor);
@ -1133,17 +1143,8 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
}
else
{
/* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL;
remoteMessage = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
/* trim the trailing characters */
remoteMessage = pchomp(PQerrorMessage(connection->pgConn));
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to complete COPY on %s:%d", connection->hostname,
@ -1638,10 +1639,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
/*
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into
* a distributed table.
*
* The caller should provide the list of column names to use in the
* remote COPY statement, and the partition column index in the tuple
* descriptor (*not* the column name list).
*/
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState,
bool stopOnFailure)
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState, bool stopOnFailure)
{
CitusCopyDestReceiver *copyDest = NULL;
@ -1657,6 +1662,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS
/* set up output parameters */
copyDest->distributedRelationId = tableId;
copyDest->columnNameList = columnNameList;
copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure;
copyDest->memoryContext = CurrentMemoryContext;
@ -1682,14 +1688,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
char *schemaName = get_namespace_name(schemaOid);
Relation distributedRelation = NULL;
int columnIndex = 0;
List *columnNameList = copyDest->columnNameList;
List *quotedColumnNameList = NIL;
ListCell *columnNameCell = NULL;
char partitionMethod = '\0';
Var *partitionColumn = PartitionColumn(tableId, 0);
int partitionColumnIndex = -1;
DistTableCacheEntry *cacheEntry = NULL;
CopyStmt *copyStatement = NULL;
@ -1773,39 +1777,28 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->columnOutputFunctions =
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
/* find the partition column index in the column list */
/* ensure the column names are properly quoted in the COPY statement */
foreach(columnNameCell, columnNameList)
{
char *columnName = (char *) lfirst(columnNameCell);
char *quotedColumnName = (char *) quote_identifier(columnName);
/* load the column information from pg_attribute */
AttrNumber attrNumber = get_attnum(tableId, columnName);
/* check whether this is the partition column */
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
{
Assert(partitionColumnIndex == -1);
partitionColumnIndex = columnIndex;
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
}
columnIndex++;
}
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1)
if (partitionMethod != DISTRIBUTE_BY_NONE &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s should have a value",
quote_qualified_identifier(schemaName, relationName))));
}
copyDest->partitionColumnIndex = partitionColumnIndex;
/* define the template for the COPY statement that is sent to workers */
copyStatement = makeNode(CopyStmt);
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
copyStatement->query = NULL;
copyStatement->attlist = columnNameList;
copyStatement->attlist = quotedColumnNameList;
copyStatement->is_from = true;
copyStatement->is_program = false;
copyStatement->filename = NULL;
@ -1866,7 +1859,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* tables. Note that, reference tables has NULL partition column values so
* skip the check.
*/
if (partitionColumnIndex >= 0)
if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
{
if (columnNulls[partitionColumnIndex])
{

View File

@ -17,6 +17,7 @@
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/palloc.h"
/* GUC, determining whether statements sent to remote nodes are logged */
@ -116,7 +117,7 @@ ReportConnectionError(MultiConnection *connection, int elevel)
int nodePort = connection->port;
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort),
errdetail("%s", PQerrorMessage(connection->pgConn))));
errdetail("%s", pchomp(PQerrorMessage(connection->pgConn)))));
}
@ -154,16 +155,7 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
*/
if (messagePrimary == NULL)
{
char *lastNewlineIndex = NULL;
messagePrimary = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(messagePrimary, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
messagePrimary = pchomp(PQerrorMessage(connection->pgConn));
}
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
@ -182,6 +174,28 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
}
/* *INDENT-OFF* */
#if (PG_VERSION_NUM < 100000)
/*
* Make copy of string with all trailing newline characters removed.
*/
char *
pchomp(const char *in)
{
size_t n;
n = strlen(in);
while (n > 0 && in[n - 1] == '\n')
n--;
return pnstrdup(in, n);
}
#endif
/* *INDENT-ON* */
/*
* LogRemoteCommand logs commands send to remote nodes if
* citus.log_remote_commands wants us to do so.

View File

@ -133,7 +133,8 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
* error and returns INVALID_CONNECTION_ID.
*/
int32
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
const char *userName)
{
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
@ -154,7 +155,8 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
/* prepare asynchronous request for worker node connection */
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
userName, nodeDatabase);
connStatusType = PQstatus(connection->pgConn);
/*
@ -311,7 +313,7 @@ MultiClientSendQuery(int32 connectionId, const char *query)
querySent = PQsendQuery(connection->pgConn, query);
if (querySent == 0)
{
char *errorMessage = PQerrorMessage(connection->pgConn);
char *errorMessage = pchomp(PQerrorMessage(connection->pgConn));
ereport(WARNING, (errmsg("could not send remote query \"%s\"", query),
errdetail("Client error: %s", errorMessage)));

View File

@ -272,7 +272,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
/* we use the same database name on the master and worker nodes */
nodeDatabase = get_database_name(MyDatabaseId);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
NULL);
connectionIdArray[currentIndex] = connectionId;
/* if valid, poll the connection until the connection is initiated */

View File

@ -27,6 +27,7 @@
#include "commands/dbcommands.h"
#include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
@ -70,7 +71,8 @@ static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId,
uint32 taskId);
static bool TopLevelTask(Task *task);
static bool TransmitExecutionCompleted(TaskExecution *taskExecution);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList,
char *userName);
static HTAB * TrackerHashCreate(const char *taskTrackerHashName,
uint32 taskTrackerHashSize);
static TaskTracker * TrackerHashEnter(HTAB *taskTrackerHash, char *nodeName,
@ -159,6 +161,7 @@ MultiTaskTrackerExecute(Job *job)
List *workerNodeList = NIL;
HTAB *taskTrackerHash = NULL;
HTAB *transmitTrackerHash = NULL;
char *extensionOwner = CitusExtensionOwnerName();
const char *taskTrackerHashName = "Task Tracker Hash";
const char *transmitTrackerHashName = "Transmit Tracker Hash";
List *jobIdList = NIL;
@ -193,8 +196,12 @@ MultiTaskTrackerExecute(Job *job)
workerNodeList = ActiveWorkerNodeList();
taskTrackerCount = (uint32) list_length(workerNodeList);
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList);
/* connect as the current user for running queries */
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList, NULL);
/* connect as the superuser for fetching result files */
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList,
extensionOwner);
TrackerHashConnect(taskTrackerHash);
TrackerHashConnect(transmitTrackerHash);
@ -658,10 +665,11 @@ TransmitExecutionCompleted(TaskExecution *taskExecution)
/*
* TrackerHash creates a task tracker hash with the given name. The function
* then inserts one task tracker entry for each node in the given worker node
* list, and initializes state for each task tracker.
* list, and initializes state for each task tracker. The userName argument
* indicates which user to connect as.
*/
static HTAB *
TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
TrackerHash(const char *taskTrackerHashName, List *workerNodeList, char *userName)
{
/* create task tracker hash */
uint32 taskTrackerHashSize = list_length(workerNodeList);
@ -703,6 +711,7 @@ TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
}
taskTracker->taskStateHash = taskStateHash;
taskTracker->userName = userName;
}
return taskTrackerHash;
@ -836,9 +845,10 @@ TrackerConnectPoll(TaskTracker *taskTracker)
char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
char *nodeDatabase = get_database_name(MyDatabaseId);
char *nodeUser = taskTracker->userName;
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
nodeDatabase);
nodeDatabase, nodeUser);
if (connectionId != INVALID_CONNECTION_ID)
{
taskTracker->connectionId = connectionId;

View File

@ -99,7 +99,6 @@ struct DropRelationCallbackState
/* Local functions forward declarations for deciding when to perform processing/checks */
static bool SkipCitusProcessingForUtility(Node *parsetree);
static bool IsCitusExtensionStmt(Node *parsetree);
/* Local functions forward declarations for Transmit statement */
@ -185,22 +184,28 @@ multi_ProcessUtility(Node *parsetree,
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
List *ddlJobs = NIL;
bool skipCitusProcessing = SkipCitusProcessingForUtility(parsetree);
bool checkExtensionVersion = false;
if (skipCitusProcessing)
if (IsA(parsetree, TransactionStmt))
{
bool checkExtensionVersion = IsCitusExtensionStmt(parsetree);
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
return;
}
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
if (EnableVersionChecks && checkExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
return;
}
if (!CitusHasBeenLoaded())
{
@ -390,7 +395,11 @@ multi_ProcessUtility(Node *parsetree,
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
/* don't run post-process code for local commands */
if (ddlJobs != NIL)
{
PostProcessUtility(parsetree);
}
if (commandMustRunAsOwner)
{
@ -447,63 +456,6 @@ multi_ProcessUtility(Node *parsetree,
}
/*
* SkipCitusProcessingForUtility simply returns whether a given utility should
* bypass Citus processing and checks and be handled exclusively by standard
* PostgreSQL utility processing. At present, CREATE/ALTER/DROP EXTENSION,
* ABORT, COMMIT, ROLLBACK, and SET (GUC) statements are exempt from Citus.
*/
static bool
SkipCitusProcessingForUtility(Node *parsetree)
{
switch (parsetree->type)
{
/*
* In the CitusHasBeenLoaded check, we compare versions of loaded code,
* the installed extension, and available extension. If they differ, we
* force user to execute ALTER EXTENSION citus UPDATE. To allow this,
* CREATE/DROP/ALTER extension must be omitted from Citus processing.
*/
case T_DropStmt:
{
DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType != OBJECT_EXTENSION)
{
return false;
}
}
/* no break, fall through */
case T_CreateExtensionStmt:
case T_AlterExtensionStmt:
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
case T_TransactionStmt:
/*
* Skip processing of variable set statements, to allow changing the
* enable_version_checks GUC during testing.
*/
case T_VariableSetStmt:
{
return true;
}
default:
{
return false;
}
}
}
/*
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
* EXTENSION statement which references the citus extension. This function
@ -563,6 +515,10 @@ IsTransmitStmt(Node *parsetree)
static void
VerifyTransmitStmt(CopyStmt *copyStatement)
{
char *fileName = NULL;
EnsureSuperUser();
/* do some minimal option verification */
if (copyStatement->relation == NULL ||
copyStatement->relation->relname == NULL)
@ -571,6 +527,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
errmsg("FORMAT 'transmit' requires a target file")));
}
fileName = copyStatement->relation->relname;
if (is_absolute_path(fileName))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("absolute path not allowed"))));
}
else if (!path_is_relative_and_below_cwd(fileName))
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("path must be in or below the current directory"))));
}
if (copyStatement->filename != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -1296,8 +1266,13 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(relationId);
/* lock relation metadata before getting shard list */
LockRelationDistributionMetadata(relationId, ShareLock);
/*
* We obtain ShareUpdateExclusiveLock here to not conflict with INSERT's
* RowExclusiveLock. However if VACUUM FULL is used, we already obtain
* AccessExclusiveLock before reaching to that point and INSERT's will be
* blocked anyway. This is inline with PostgreSQL's own behaviour.
*/
LockRelationOid(relationId, ShareUpdateExclusiveLock);
shardIntervalList = LoadShardIntervalList(relationId);
@ -1471,11 +1446,9 @@ ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
{
/*
* No version was specified, so PostgreSQL will use the default_version
* from the citus.control file. In case a new default is available, we
* will force a compatibility check of the latest available version.
* from the citus.control file.
*/
availableExtensionVersion = NULL;
ErrorIfAvailableVersionMismatch();
CheckAvailableVersion(ERROR);
}
}

View File

@ -83,6 +83,8 @@ master_run_on_worker(PG_FUNCTION_ARGS)
int commandIndex = 0;
int commandCount = 0;
CheckCitusVersion(ERROR);
/* check to see if caller supports us returning a tuplestore */
if (!rsinfo || !(rsinfo->allowedModes & SFRM_Materialize))
{
@ -437,7 +439,12 @@ StoreErrorMessage(PGconn *connection, StringInfo queryResultString)
char *errorMessage = PQerrorMessage(connection);
if (errorMessage != NULL)
{
char *firstNewlineIndex = strchr(errorMessage, '\n');
char *firstNewlineIndex = NULL;
/* copy the error message to a writable memory */
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));
firstNewlineIndex = strchr(errorMessage, '\n');
/* trim the error message at the line break */
if (firstNewlineIndex != NULL)

View File

@ -69,6 +69,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
Oid distributedTableId = ResolveRelationId(tableNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
@ -112,8 +113,8 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
*/
EnsureTableOwner(distributedTableId);
/* we plan to add shards: get an exclusive metadata lock */
LockRelationDistributionMetadata(distributedTableId, ExclusiveLock);
/* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(distributedTableId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);
@ -263,8 +264,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
*/
EnsureTableOwner(targetRelationId);
/* we plan to add shards: get an exclusive metadata lock on the target relation */
LockRelationDistributionMetadata(targetRelationId, ExclusiveLock);
/* we plan to add shards: get an exclusive lock on target relation oid */
LockRelationOid(targetRelationId, ExclusiveLock);
/* we don't want source table to get dropped before we colocate with it */
LockRelationOid(sourceRelationId, AccessShareLock);
@ -368,8 +369,8 @@ CreateReferenceTableShard(Oid distributedTableId)
*/
EnsureTableOwner(distributedTableId);
/* we plan to add shards: get an exclusive metadata lock */
LockRelationDistributionMetadata(distributedTableId, ExclusiveLock);
/* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(distributedTableId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);

View File

@ -111,6 +111,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
bool failOK = false;
EnsureCoordinator();
CheckCitusVersion(ERROR);
queryTreeNode = ParseTreeNode(queryString);
if (!IsA(queryTreeNode, DeleteStmt))
@ -214,6 +215,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText);
EnsureCoordinator();
CheckCitusVersion(ERROR);
CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName);
@ -250,6 +252,8 @@ master_drop_sequences(PG_FUNCTION_ARGS)
StringInfo dropSeqCommand = makeStringInfo();
bool coordinator = IsCoordinator();
CheckCitusVersion(ERROR);
/* do nothing if DDL propagation is switched off or this is not the coordinator */
if (!EnableDDLPropagation || !coordinator)
{

View File

@ -46,14 +46,21 @@ Datum
master_expire_table_cache(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *workerNodeList = ActiveWorkerNodeList();
DistTableCacheEntry *cacheEntry = NULL;
List *workerNodeList = NIL;
ListCell *workerNodeCell = NULL;
int shardCount = cacheEntry->shardIntervalArrayLength;
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
int shardCount = 0;
ShardInterval **shardIntervalArray = NULL;
List **placementListArray = NULL;
int shardIndex = 0;
CheckCitusVersion(ERROR);
cacheEntry = DistributedTableCacheEntry(relationId);
workerNodeList = ActiveWorkerNodeList();
shardCount = cacheEntry->shardIntervalArrayLength;
shardIntervalArray = cacheEntry->sortedShardIntervalArray;
if (shardCount == 0)
{
ereport(WARNING, (errmsg("Table has no shards, no action is taken")));

View File

@ -87,6 +87,8 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 totalRelationSize = 0;
CheckCitusVersion(ERROR);
totalRelationSize = DistributedTableSize(relationId,
PG_TOTAL_RELATION_SIZE_FUNCTION);
@ -104,6 +106,8 @@ citus_table_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 tableSize = 0;
CheckCitusVersion(ERROR);
tableSize = DistributedTableSize(relationId, PG_TABLE_SIZE_FUNCTION);
PG_RETURN_INT64(tableSize);
@ -120,6 +124,8 @@ citus_relation_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
uint64 relationSize = 0;
CheckCitusVersion(ERROR);
relationSize = DistributedTableSize(relationId, PG_RELATION_SIZE_FUNCTION);
PG_RETURN_INT64(relationSize);

View File

@ -87,6 +87,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
int32 affectedTupleCount = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
queryTreeNode = ParseTreeNode(queryString);
if (IsA(queryTreeNode, DeleteStmt))

View File

@ -105,6 +105,8 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
Datum values[TABLE_METADATA_FIELDS];
bool isNulls[TABLE_METADATA_FIELDS];
CheckCitusVersion(ERROR);
/* find partition tuple for partitioned relation */
partitionEntry = DistributedTableCacheEntry(relationId);
@ -194,6 +196,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
FuncCallContext *functionContext = NULL;
ListCell *tableDDLEventCell = NULL;
CheckCitusVersion(ERROR);
/*
* On the very first call to this function, we first use the given relation
* name to get to the relation. We then recreate the list of DDL statements
@ -264,6 +268,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
Datum shardIdDatum = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
shardId = GetNextShardId();
shardIdDatum = Int64GetDatum(shardId);
@ -321,6 +326,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
Datum placementIdDatum = 0;
EnsureCoordinator();
CheckCitusVersion(ERROR);
placementId = GetNextPlacementId();
placementIdDatum = Int64GetDatum(placementId);
@ -376,6 +382,8 @@ master_get_active_worker_nodes(PG_FUNCTION_ARGS)
uint32 workerNodeIndex = 0;
uint32 workerNodeCount = 0;
CheckCitusVersion(ERROR);
if (SRF_IS_FIRSTCALL())
{
MemoryContext oldContext = NULL;

View File

@ -85,6 +85,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
/* RepairShardPlacement function repairs only given shard */
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,

View File

@ -67,6 +67,8 @@ worker_hash(PG_FUNCTION_ARGS)
FmgrInfo *hashFunction = NULL;
Oid valueDataType = InvalidOid;
CheckCitusVersion(ERROR);
/* figure out hash function from the data type */
valueDataType = get_fn_expr_argtype(fcinfo->flinfo, 0);
typeEntry = lookup_type_cache(valueDataType, TYPECACHE_HASH_PROC_FINFO);

View File

@ -71,7 +71,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
{
text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = ActiveWorkerNodeList();
List *workerNodeList = NIL;
uint64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0;
@ -90,6 +90,10 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char replicationModel = REPLICATION_MODEL_INVALID;
bool includeSequenceDefaults = false;
CheckCitusVersion(ERROR);
workerNodeList = ActiveWorkerNodeList();
EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId);
@ -219,11 +223,18 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
float4 shardFillLevel = 0.0;
char partitionMethod = 0;
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
bool cstoreTable = CStoreTable(relationId);
ShardInterval *shardInterval = NULL;
Oid relationId = InvalidOid;
bool cstoreTable = false;
char storageType = shardInterval->storageType;
char storageType = 0;
CheckCitusVersion(ERROR);
shardInterval = LoadShardInterval(shardId);
relationId = shardInterval->relationId;
cstoreTable = CStoreTable(relationId);
storageType = shardInterval->storageType;
EnsureTablePermissions(relationId, ACL_INSERT);
@ -318,6 +329,8 @@ master_update_shard_statistics(PG_FUNCTION_ARGS)
int64 shardId = PG_GETARG_INT64(0);
uint64 shardSize = 0;
CheckCitusVersion(ERROR);
shardSize = UpdateShardStatistics(shardId);
PG_RETURN_INT64(shardSize);
@ -750,9 +763,8 @@ ForeignConstraintGetReferencedTableId(char *queryString)
if (constraint->contype == CONSTR_FOREIGN)
{
RangeVar *referencedTable = constraint->pktable;
LOCKMODE lockmode = AlterTableGetLockLevel(foreignConstraintStmt->cmds);
return RangeVarGetRelid(referencedTable, lockmode,
return RangeVarGetRelid(referencedTable, NoLock,
foreignConstraintStmt->missing_ok);
}
}

View File

@ -86,6 +86,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureCoordinator();
EnsureSuperUser();
CheckCitusVersion(ERROR);
PreventTransactionChain(true, "start_metadata_sync_to_node");
@ -154,6 +155,7 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
EnsureCoordinator();
EnsureSuperUser();
CheckCitusVersion(ERROR);
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)

View File

@ -28,6 +28,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_constraint.h"
#include "distributed/metadata_cache.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "mb/pg_wchar.h"
@ -673,10 +674,11 @@ shard_name(PG_FUNCTION_ARGS)
errmsg("shard_id cannot be null")));
}
relationId = PG_GETARG_OID(0);
shardId = PG_GETARG_INT64(1);
CheckCitusVersion(ERROR);
if (shardId <= 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

View File

@ -65,6 +65,8 @@ recover_prepared_transactions(PG_FUNCTION_ARGS)
{
int recoveredTransactionCount = 0;
CheckCitusVersion(ERROR);
recoveredTransactionCount = RecoverPreparedTransactions();
PG_RETURN_INT32(recoveredTransactionCount);

View File

@ -236,76 +236,36 @@ PartiallyEvaluateExpressionMutator(Node *expression, FunctionEvaluationContext *
static Node *
EvaluateNodeIfReferencesFunction(Node *expression, PlanState *planState)
{
if (IsA(expression, FuncExpr))
if (expression == NULL || IsA(expression, Const))
{
FuncExpr *expr = (FuncExpr *) expression;
return expression;
}
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->funcresulttype,
exprTypmod((Node *) expr),
expr->funccollid,
switch (nodeTag(expression))
{
case T_FuncExpr:
case T_OpExpr:
case T_DistinctExpr:
case T_NullIfExpr:
case T_CoerceViaIO:
case T_ArrayCoerceExpr:
case T_ScalarArrayOpExpr:
case T_RowCompareExpr:
case T_Param:
case T_RelabelType:
case T_CoerceToDomain:
{
return (Node *) citus_evaluate_expr((Expr *) expression,
exprType(expression),
exprTypmod(expression),
exprCollation(expression),
planState);
}
if (IsA(expression, OpExpr) ||
IsA(expression, DistinctExpr) ||
IsA(expression, NullIfExpr))
default:
{
/* structural equivalence */
OpExpr *expr = (OpExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->opresulttype, -1,
expr->opcollid,
planState);
break;
}
if (IsA(expression, CoerceViaIO))
{
CoerceViaIO *expr = (CoerceViaIO *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->resulttype, -1,
expr->resultcollid,
planState);
}
if (IsA(expression, ArrayCoerceExpr))
{
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr,
expr->resulttype,
expr->resulttypmod,
expr->resultcollid,
planState);
}
if (IsA(expression, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid,
planState);
}
if (IsA(expression, RowCompareExpr))
{
RowCompareExpr *expr = (RowCompareExpr *) expression;
return (Node *) citus_evaluate_expr((Expr *) expr, BOOLOID, -1, InvalidOid,
planState);
}
if (IsA(expression, Param))
{
Param *param = (Param *) expression;
return (Node *) citus_evaluate_expr((Expr *) param,
param->paramtype,
param->paramtypmod,
param->paramcollid,
planState);
}
return expression;

View File

@ -75,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);

View File

@ -17,6 +17,7 @@
#include "access/heapam.h"
#include "access/htup_details.h"
#include "distributed/distribution_column.h"
#include "distributed/metadata_cache.h"
#include "nodes/makefuncs.h"
#include "nodes/nodes.h"
#include "nodes/primnodes.h"
@ -55,6 +56,8 @@ column_name_to_column(PG_FUNCTION_ARGS)
char *columnNodeString = NULL;
text *columnNodeText = NULL;
CheckCitusVersion(ERROR);
relation = relation_open(relationId, AccessShareLock);
column = BuildDistributionKeyFromColumnName(relation, columnName);
@ -107,6 +110,8 @@ column_to_column_name(PG_FUNCTION_ARGS)
char *columnName = NULL;
text *columnText = NULL;
CheckCitusVersion(ERROR);
columnName = ColumnNameToColumn(relationId, columnNodeString);
columnText = cstring_to_text(columnName);

View File

@ -106,8 +106,7 @@ static Oid workerHashFunctionId = InvalidOid;
/* Citus extension version variables */
bool EnableVersionChecks = true; /* version checks are enabled */
char *availableExtensionVersion = NULL;
static char *installedExtensionVersion = NULL;
static bool citusVersionKnownCompatible = false;
/* Hash table for informations about each partition */
static HTAB *DistTableCacheHash = NULL;
@ -130,6 +129,7 @@ static ScanKeyData DistShardScanKey[1];
/* local function forward declarations */
static bool IsDistributedTableViaCatalog(Oid relationId);
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
@ -142,7 +142,7 @@ static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static void ErrorIfInstalledVersionMismatch(void);
static bool CheckInstalledVersion(int elevel);
static char * AvailableExtensionVersion(void);
static char * InstalledExtensionVersion(void);
static bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
@ -184,22 +184,59 @@ IsDistributedTable(Oid relationId)
{
DistTableCacheEntry *cacheEntry = NULL;
cacheEntry = LookupDistTableCacheEntry(relationId);
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return
* false.
* If extension hasn't been created, or has the wrong version and the
* table isn't a distributed one, LookupDistTableCacheEntry() will return NULL.
*/
if (!CitusHasBeenLoaded())
if (!cacheEntry)
{
return false;
}
cacheEntry = LookupDistTableCacheEntry(relationId);
return cacheEntry->isDistributedTable;
}
/*
* IsDistributedTableViaCatalog returns whether the given relation is a
* distributed table or not.
*
* It does so by searching pg_dist_partition, explicitly bypassing caches,
* because this function is designed to be used in cases where accessing
* metadata tables is not safe.
*
* NB: Currently this still hardcodes pg_dist_partition logicalrelid column
* offset and the corresponding index. If we ever come close to changing
* that, we'll have to work a bit harder.
*/
static bool
IsDistributedTableViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = NULL;
SysScanDesc scanDescriptor = NULL;
const int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
partitionTuple = systable_getnext(scanDescriptor);
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, AccessShareLock);
return HeapTupleIsValid(partitionTuple);
}
/*
* DistributedTableList returns a list that includes all the valid distributed table
* cache entries.
@ -211,6 +248,8 @@ DistributedTableList(void)
List *distributedTableList = NIL;
ListCell *distTableOidCell = NULL;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* first, we need to iterate over pg_dist_partition */
distTableOidList = DistTableOidList();
@ -360,6 +399,8 @@ LookupShardCacheEntry(int64 shardId)
bool foundInCache = false;
bool recheck = false;
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* probably not reachable */
if (DistShardCacheHash == NULL)
{
@ -435,27 +476,16 @@ DistributedTableCacheEntry(Oid distributedRelationId)
{
DistTableCacheEntry *cacheEntry = NULL;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
cacheEntry = LookupDistTableCacheEntry(distributedRelationId);
if (cacheEntry->isDistributedTable)
if (cacheEntry && cacheEntry->isDistributedTable)
{
return cacheEntry;
}
else
{
char *relationName = get_rel_name(distributedRelationId);
ereport(ERROR, (errmsg("relation %s is not distributed",
relationName)));
ereport(ERROR, (errmsg("relation %s is not distributed", relationName)));
}
}
@ -471,11 +501,48 @@ LookupDistTableCacheEntry(Oid relationId)
bool foundInCache = false;
void *hashKey = (void *) &relationId;
/*
* Can't be a distributed relation if the extension hasn't been loaded
* yet. As we can't do lookups in nonexistent tables, directly return NULL
* here.
*/
if (!CitusHasBeenLoaded())
{
return NULL;
}
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
}
/*
* If the version is not known to be compatible, perform thorough check,
* unless such checks are disabled.
*/
if (!citusVersionKnownCompatible && EnableVersionChecks)
{
bool isDistributed = IsDistributedTableViaCatalog(relationId);
int reportLevel = DEBUG1;
/*
* If there's a version-mismatch, and we're dealing with a distributed
* table, we have to error out as we can't return a valid entry. We
* want to check compatibility in the non-distributed case as well, so
* future lookups can use the cache if compatible.
*/
if (isDistributed)
{
reportLevel = ERROR;
}
if (!CheckCitusVersion(reportLevel))
{
/* incompatible, can't access cache, so return before doing so */
return NULL;
}
}
cacheEntry = hash_search(DistTableCacheHash, hashKey, HASH_ENTER, &foundInCache);
/* return valid matches */
@ -1066,86 +1133,116 @@ CitusHasBeenLoaded(void)
DistPartitionRelationId();
/*
* We also set installedExtensionVersion to NULL so that it will be re-read
* in case of extension update.
* We also reset citusVersionKnownCompatible, so it will be re-read in
* case of extension update.
*/
installedExtensionVersion = NULL;
citusVersionKnownCompatible = false;
}
}
if (extensionLoaded)
{
ErrorIfAvailableVersionMismatch();
ErrorIfInstalledVersionMismatch();
}
return extensionLoaded;
}
/*
* ErrorIfAvailableExtensionVersionMismatch compares CITUS_EXTENSIONVERSION and
* currently available version from citus.control file. If they are not same in
* major or minor version numbers, this function errors out. It ignores the schema
* version.
* CheckCitusVersion checks whether there is a version mismatch between the
* available version and the loaded version or between the installed version
* and the loaded version. Returns true if compatible, false otherwise.
*
* As a side effect, this function also sets citusVersionKnownCompatible global
* variable to true which reduces version check cost of next calls.
*/
void
ErrorIfAvailableVersionMismatch(void)
bool
CheckCitusVersion(int elevel)
{
if (citusVersionKnownCompatible ||
!CitusHasBeenLoaded() ||
!EnableVersionChecks)
{
return true;
}
if (CheckAvailableVersion(elevel) && CheckInstalledVersion(elevel))
{
citusVersionKnownCompatible = true;
return true;
}
else
{
return false;
}
}
/*
* CheckAvailableVersion compares CITUS_EXTENSIONVERSION and the currently
* available version from the citus.control file. If they are not compatible,
* this function logs an error with the specified elevel and returns false,
* otherwise it returns true.
*/
bool
CheckAvailableVersion(int elevel)
{
char *availableVersion = NULL;
if (!EnableVersionChecks)
{
return;
return true;
}
availableVersion = AvailableExtensionVersion();
if (!MajorVersionsCompatible(availableVersion, CITUS_EXTENSIONVERSION))
{
ereport(ERROR, (errmsg("loaded Citus library version differs from latest "
ereport(elevel, (errmsg("loaded Citus library version differs from latest "
"available extension version"),
errdetail("Loaded library requires %s, but the latest control "
"file specifies %s.", CITUS_MAJORVERSION,
availableVersion),
errhint("Restart the database to load the latest Citus "
"library.")));
return false;
}
return true;
}
/*
* ErrorIfInstalledVersionMismatch compares CITUS_EXTENSIONVERSION and currently
* and catalog version from pg_extemsion catalog table. If they are not same in
* major or minor version numbers, this function errors out. It ignores the schema
* version.
* CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the the
* extension's current version from the pg_extemsion catalog table. If they
* are not compatible, this function logs an error with the specified elevel,
* otherwise it returns true.
*/
static void
ErrorIfInstalledVersionMismatch(void)
static bool
CheckInstalledVersion(int elevel)
{
char *installedVersion = NULL;
if (!EnableVersionChecks)
{
return;
}
Assert(CitusHasBeenLoaded());
Assert(EnableVersionChecks);
installedVersion = InstalledExtensionVersion();
if (!MajorVersionsCompatible(installedVersion, CITUS_EXTENSIONVERSION))
{
ereport(ERROR, (errmsg("loaded Citus library version differs from installed "
ereport(elevel, (errmsg("loaded Citus library version differs from installed "
"extension version"),
errdetail("Loaded library requires %s, but the installed "
"extension version is %s.", CITUS_MAJORVERSION,
installedVersion),
errhint("Run ALTER EXTENSION citus UPDATE and try again.")));
return false;
}
return true;
}
/*
* MajorVersionsCompatible compares given two versions. If they are same in major
* and minor version numbers, this function returns true. It ignores the schema
* version.
* MajorVersionsCompatible checks whether both versions are compatible. They
* are if major and minor version numbers match, the schema version is
* ignored. Returns true if compatible, false otherwise.
*/
bool
MajorVersionsCompatible(char *leftVersion, char *rightVersion)
@ -1203,12 +1300,7 @@ AvailableExtensionVersion(void)
bool hasTuple = false;
bool goForward = true;
bool doCopy = false;
/* if we cached the result before, return it*/
if (availableExtensionVersion != NULL)
{
return availableExtensionVersion;
}
char *availableExtensionVersion;
estate = CreateExecutorState();
extensionsResultSet = makeNode(ReturnSetInfo);
@ -1274,8 +1366,6 @@ AvailableExtensionVersion(void)
/*
* InstalledExtensionVersion returns the Citus version in PostgreSQL pg_extension table.
* It also saves the result, thus consecutive calls to CitusExtensionCatalogVersion
* will not read the catalog tables again.
*/
static char *
InstalledExtensionVersion(void)
@ -1284,12 +1374,7 @@ InstalledExtensionVersion(void)
SysScanDesc scandesc;
ScanKeyData entry[1];
HeapTuple extensionTuple = NULL;
/* if we cached the result before, return it*/
if (installedExtensionVersion != NULL)
{
return installedExtensionVersion;
}
char *installedExtensionVersion = NULL;
relation = heap_open(ExtensionRelationId, AccessShareLock);
@ -1670,6 +1755,8 @@ master_dist_partition_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1731,6 +1818,8 @@ master_dist_shard_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1792,6 +1881,8 @@ master_dist_placement_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
newTuple = triggerData->tg_newtuple;
oldTuple = triggerData->tg_trigtuple;
@ -1848,6 +1939,8 @@ master_dist_node_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));
@ -1871,6 +1964,8 @@ master_dist_local_group_cache_invalidate(PG_FUNCTION_ARGS)
errmsg("must be called as trigger")));
}
CheckCitusVersion(ERROR);
CitusInvalidateRelcacheByRelid(DistLocalGroupIdRelationId());
PG_RETURN_DATUM(PointerGetDatum(NULL));

View File

@ -95,8 +95,11 @@ master_add_node(PG_FUNCTION_ARGS)
bool hasMetadata = false;
bool isActive = false;
bool nodeAlreadyExists = false;
Datum nodeRecord;
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
CheckCitusVersion(ERROR);
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
/*
@ -129,8 +132,11 @@ master_add_inactive_node(PG_FUNCTION_ARGS)
bool hasMetadata = false;
bool isActive = false;
bool nodeAlreadyExists = false;
Datum nodeRecord;
Datum nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
CheckCitusVersion(ERROR);
nodeRecord = AddNodeMetadata(nodeNameString, nodePort, groupId, nodeRack,
hasMetadata, isActive, &nodeAlreadyExists);
PG_RETURN_CSTRING(nodeRecord);
@ -153,6 +159,8 @@ master_remove_node(PG_FUNCTION_ARGS)
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
CheckCitusVersion(ERROR);
RemoveNodeFromCluster(nodeNameString, nodePort);
PG_RETURN_VOID();
@ -179,6 +187,8 @@ master_disable_node(PG_FUNCTION_ARGS)
bool hasShardPlacements = false;
bool isActive = false;
CheckCitusVersion(ERROR);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
hasShardPlacements = NodeHasActiveShardPlacements(nodeName, nodePort);
@ -210,6 +220,8 @@ master_activate_node(PG_FUNCTION_ARGS)
char *nodeNameString = text_to_cstring(nodeName);
Datum nodeRecord = 0;
CheckCitusVersion(ERROR);
nodeRecord = ActivateNode(nodeNameString, nodePort);
PG_RETURN_CSTRING(nodeRecord);
@ -263,9 +275,12 @@ Datum
master_initialize_node_metadata(PG_FUNCTION_ARGS)
{
ListCell *workerNodeCell = NULL;
List *workerNodes = ParseWorkerNodeFileAndRename();
List *workerNodes = NULL;
bool nodeAlreadyExists = false;
CheckCitusVersion(ERROR);
workerNodes = ParseWorkerNodeFileAndRename();
foreach(workerNodeCell, workerNodes)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
@ -273,8 +288,6 @@ master_initialize_node_metadata(PG_FUNCTION_ARGS)
AddNodeMetadata(workerNode->workerName, workerNode->workerPort, 0,
workerNode->workerRack, false, workerNode->isActive,
&nodeAlreadyExists);
ActivateNode(workerNode->workerName, workerNode->workerPort);
}
PG_RETURN_BOOL(true);
@ -293,6 +306,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
char distributionMethod = 0;
Oid relationId = InvalidOid;
CheckCitusVersion(ERROR);
/*
* To have optional parameter as NULL, we defined this UDF as not strict, therefore
* we need to check all parameters for NULL values.
@ -1045,19 +1060,19 @@ ParseWorkerNodeFileAndRename()
strlcpy(workerNode->workerRack, nodeRack, WORKER_LENGTH);
workerNode->workerPort = nodePort;
workerNode->hasMetadata = false;
workerNode->isActive = false;
workerNode->isActive = true;
workerNodeList = lappend(workerNodeList, workerNode);
}
FreeFile(workerFileStream);
free(workerFilePath);
/* rename the file, marking that it is not used anymore */
appendStringInfo(renamedWorkerFilePath, "%s", workerFilePath);
appendStringInfo(renamedWorkerFilePath, ".obsolete");
rename(workerFilePath, renamedWorkerFilePath->data);
FreeFile(workerFileStream);
free(workerFilePath);
return workerNodeList;
}

View File

@ -60,6 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
DistTableCacheEntry *tableEntry = NULL;
EnsureCoordinator();
CheckCitusVersion(ERROR);
if (!IsDistributedTable(relationId))
{

View File

@ -54,6 +54,8 @@ lock_shard_metadata(PG_FUNCTION_ARGS)
int shardIdCount = 0;
int shardIdIndex = 0;
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
@ -92,6 +94,8 @@ lock_shard_resources(PG_FUNCTION_ARGS)
int shardIdCount = 0;
int shardIdIndex = 0;
CheckCitusVersion(ERROR);
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
@ -180,21 +184,6 @@ TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
}
/*
* LockRelationDistributionMetadata returns after getting a the lock used for a
* relation's distribution metadata, blocking if required. Only ExclusiveLock
* and ShareLock modes are supported. Any locks acquired using this method are
* released at transaction end.
*/
void
LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode)
{
Assert(lockMode == ExclusiveLock || lockMode == ShareLock);
(void) LockRelationOid(relationId, lockMode);
}
/*
* LockShardResource acquires a lock needed to modify data on a remote shard.
* This task may be assigned to multiple backends at the same time, so the lock

View File

@ -67,8 +67,12 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
char *taskCallString = text_to_cstring(taskCallStringText);
uint32 taskCallStringLength = strlen(taskCallString);
bool taskTrackerRunning = false;
CheckCitusVersion(ERROR);
/* check that we have a running task tracker on this host */
bool taskTrackerRunning = TaskTrackerRunning();
taskTrackerRunning = TaskTrackerRunning();
if (!taskTrackerRunning)
{
ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW),
@ -129,7 +133,12 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
WorkerTask *workerTask = NULL;
uint32 taskStatus = 0;
bool taskTrackerRunning = TaskTrackerRunning();
bool taskTrackerRunning = false;
CheckCitusVersion(ERROR);
taskTrackerRunning = TaskTrackerRunning();
if (taskTrackerRunning)
{
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
@ -170,6 +179,8 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
StringInfo jobDirectoryName = NULL;
StringInfo jobSchemaName = NULL;
CheckCitusVersion(ERROR);
/*
* We first clean up any open connections, and remove tasks belonging to
* this job from the shared hash.

View File

@ -48,10 +48,12 @@ bool ExpireCachedShards = false;
/* Local functions forward declarations */
static void FetchRegularFile(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename);
static void FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename,
StringInfo localFilename);
static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
StringInfo transmitCommand, StringInfo filePath);
const char *nodeUser, StringInfo transmitCommand,
StringInfo filePath);
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
int32 fileDescriptor);
static void DeleteFile(const char *filename);
@ -114,13 +116,18 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
* task directory does not exist. We then lock and create the directory.
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -155,13 +162,18 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
* task directory does not exist. We then lock and create the directory.
*/
bool taskDirectoryExists = DirectoryExists(taskDirectoryName);
CheckCitusVersion(ERROR);
if (!taskDirectoryExists)
{
InitTaskDirectory(jobId, upstreamTaskId);
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -180,11 +192,16 @@ TaskFilename(StringInfo directoryName, uint32 taskId)
}
/* Helper function to transfer the remote file in an idempotent manner. */
/*
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
* manner. It connects to the remote node as superuser to give file access.
* Callers must make sure that the file names are sanitized.
*/
static void
FetchRegularFile(const char *nodeName, uint32 nodePort,
FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename)
{
char *nodeUser = NULL;
StringInfo attemptFilename = NULL;
StringInfo transmitCommand = NULL;
uint32 randomId = (uint32) random();
@ -203,7 +220,11 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, attemptFilename);
/* connect as superuser to give file access */
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
attemptFilename);
if (!received)
{
ereport(ERROR, (errmsg("could not receive file \"%s\" from %s:%u",
@ -230,7 +251,7 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
* and returns false.
*/
static bool
ReceiveRegularFile(const char *nodeName, uint32 nodePort,
ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
StringInfo transmitCommand, StringInfo filePath)
{
int32 fileDescriptor = -1;
@ -262,7 +283,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort,
nodeDatabase = get_database_name(MyDatabaseId);
/* connect to remote node */
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
if (connectionId == INVALID_CONNECTION_ID)
{
ReceiveResourceCleanup(connectionId, filename, fileDescriptor);
@ -415,6 +436,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
@ -443,6 +466,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
CheckCitusVersion(ERROR);
/* extend names in ddl command and apply extended command */
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId,
@ -472,6 +497,9 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
Oid sequenceRelationId = InvalidOid;
NodeTag nodeType = nodeTag(commandNode);
CheckCitusVersion(ERROR);
if (nodeType != T_CreateSeqStmt)
{
ereport(ERROR,
@ -512,6 +540,8 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
CheckCitusVersion(ERROR);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
@ -536,6 +566,8 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
CheckCitusVersion(ERROR);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
@ -795,7 +827,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
received = ReceiveRegularFile(nodeName, nodePort, NULL, remoteCopyCommand,
localFilePath);
if (!received)
{
return false;
@ -835,8 +868,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
CommandCounterIncrement();
}
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
/*
* Copy local file into the relation. We call ProcessUtility() instead of
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
@ -855,6 +886,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
/* finally delete the temporary file we created */
DeleteFile(localFilePath->data);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
return true;
}
@ -870,6 +903,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
static bool
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
{
const char *nodeUser = NULL;
StringInfo localFilePath = NULL;
StringInfo remoteFilePath = NULL;
StringInfo transmitCommand = NULL;
@ -896,7 +930,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilePath->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, localFilePath);
/*
* We allow some arbitrary input in the file name and connect to the remote
* node as superuser to transmit. Therefore, we only allow calling this
* function when already running as superuser.
*/
EnsureSuperUser();
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
localFilePath);
if (!received)
{
return false;
@ -1210,6 +1253,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
bool received = false;
StringInfo queryString = NULL;
CheckCitusVersion(ERROR);
/* We extract schema names and table names from qualified names */
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
@ -1233,7 +1278,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
sourceCopyCommand = makeStringInfo();
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand,
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
localFilePath);
if (!received)
{

View File

@ -50,12 +50,15 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
Relation distributedRelation = NULL;
List *shardList = LoadShardList(relationId);
List *shardList = NULL;
ListCell *shardCell = NULL;
char relationKind = '\0';
CheckCitusVersion(ERROR);
EnsureSuperUser();
shardList = LoadShardList(relationId);
/* first check the relation type */
distributedRelation = relation_open(relationId, AccessShareLock);
relationKind = distributedRelation->rd_rel->relkind;

View File

@ -40,6 +40,9 @@ worker_foreign_file_path(PG_FUNCTION_ARGS)
ForeignTable *foreignTable = GetForeignTable(relationId);
ListCell *optionCell = NULL;
CheckCitusVersion(ERROR);
foreach(optionCell, foreignTable->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
@ -80,6 +83,8 @@ worker_find_block_local_path(PG_FUNCTION_ARGS)
(void) blockId;
(void) dataDirectoryObject;
CheckCitusVersion(ERROR);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("called function is currently unsupported")));

View File

@ -81,6 +81,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
/* we should have the same number of column names and types */
int32 columnNameCount = ArrayObjectCount(columnNameObject);
int32 columnTypeCount = ArrayObjectCount(columnTypeObject);
CheckCitusVersion(ERROR);
if (columnNameCount != columnTypeCount)
{
ereport(ERROR, (errmsg("column name array size: %d and type array size: %d"
@ -152,6 +155,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
int createIntermediateTableResult = 0;
int finished = 0;
CheckCitusVersion(ERROR);
/*
* If the schema for the job isn't already created by the task tracker
* protocol, we fall to using the default 'public' schema.
@ -226,6 +231,8 @@ worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS)
int scanKeyCount = 0;
HeapTuple heapTuple = NULL;
CheckCitusVersion(ERROR);
pgNamespace = heap_open(NamespaceRelationId, AccessExclusiveLock);
scanDescriptor = heap_beginscan_catalog(pgNamespace, scanKeyCount, scanKey);

View File

@ -111,6 +111,9 @@ worker_range_partition_table(PG_FUNCTION_ARGS)
/* first check that array element's and partition column's types match */
Oid splitPointType = ARR_ELEMTYPE(splitPointObject);
CheckCitusVersion(ERROR);
if (splitPointType != partitionColumnType)
{
ereport(ERROR, (errmsg("partition column type %u and split point type %u "
@ -183,6 +186,8 @@ worker_hash_partition_table(PG_FUNCTION_ARGS)
FileOutputStream *partitionFileArray = NULL;
uint32 fileCount = partitionCount;
CheckCitusVersion(ERROR);
/* use column's type information to get the hashing function */
hashFunction = GetFunctionInfo(partitionColumnType, HASH_AM_OID, HASHPROC);

View File

@ -38,6 +38,7 @@ worker_create_truncate_trigger(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
EnsureSuperUser();
CheckCitusVersion(ERROR);
/* Create the truncate trigger */
CreateTruncateTrigger(relationId);

View File

@ -18,7 +18,6 @@
#include "utils/hsearch.h"
extern bool EnableVersionChecks;
extern char *availableExtensionVersion;
/*
* Representation of a table's metadata that is frequently used for
@ -79,7 +78,8 @@ extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern bool CitusHasBeenLoaded(void);
void ErrorIfAvailableVersionMismatch(void);
extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);
bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
/* access WorkerNodeHash */

View File

@ -98,7 +98,7 @@ typedef struct WaitInfo
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
const char *nodeDatabase);
const char *nodeDatabase, const char *nodeUser);
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);

View File

@ -20,6 +20,9 @@
#include "tcop/dest.h"
#define INVALID_PARTITION_COLUMN_INDEX -1
/*
* A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary to copy out results. While it'd be a bit nicer to share code,
@ -90,6 +93,7 @@ typedef struct CitusCopyDestReceiver
/* function declarations for copying into a distributed table */
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList,
int partitionColumnIndex,
EState *executorState,
bool stopOnFailure);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);

View File

@ -154,6 +154,7 @@ typedef struct TaskTracker
{
uint32 workerPort; /* node's port; part of hash table key */
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
char *userName; /* which user to connect as */
TrackerStatus trackerStatus;
int32 connectionId;
uint32 connectPollCount;

View File

@ -32,6 +32,7 @@ extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
extern void ReportConnectionError(MultiConnection *connection, int elevel);
extern void ReportResultError(MultiConnection *connection, struct pg_result *result,
int elevel);
extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */

View File

@ -66,7 +66,6 @@ typedef enum AdvisoryLocktagClass
/* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
extern void LockRelationDistributionMetadata(Oid relationId, LOCKMODE lockMode);
/* Lock shard data, for DML commands or remote fetches */
extern void LockShardResource(uint64 shardId, LOCKMODE lockmode);

View File

@ -0,0 +1,36 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-insert s2-vacuum-analyze s1-commit
create_distributed_table
step s1-begin:
BEGIN;
step s1-insert:
INSERT INTO test_insert_vacuum VALUES(1, 1);
step s2-vacuum-analyze:
VACUUM ANALYZE test_insert_vacuum;
step s1-commit:
COMMIT;
starting permutation: s1-begin s1-insert s2-vacuum-full s1-commit
create_distributed_table
step s1-begin:
BEGIN;
step s1-insert:
INSERT INTO test_insert_vacuum VALUES(1, 1);
step s2-vacuum-full:
VACUUM FULL test_insert_vacuum;
<waiting ...>
step s1-commit:
COMMIT;
step s2-vacuum-full: <... completed>

View File

@ -483,24 +483,31 @@ CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2;
SELECT create_distributed_table('data_load_test', 'col1');
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT * FROM data_load_test;
col1 | col3
------+-------
132 | world
243 | hello
SELECT * FROM data_load_test ORDER BY col2;
col2 | col3 | CoL4")
-------+-------+--------
hello | world |
world | hello |
(2 rows)
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
col2 | col3 | CoL4")
-------+-------+--------
hello | world |
(1 row)
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;

View File

@ -84,7 +84,7 @@ ALTER EXTENSION citus UPDATE TO '6.2-4';
SHOW citus.version;
citus.version
---------------
6.2devel
6.2.5
(1 row)
-- ensure no objects were created outside pg_catalog
@ -108,6 +108,91 @@ CREATE EXTENSION citus VERSION '5.0';
ERROR: specified version incompatible with loaded Citus library
DETAIL: Loaded library requires 6.2, but 5.0 was specified.
HINT: If a newer library is present, restart the database and try the command again.
-- Test non-distributed queries work even in version mismatch
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '6.1-17';
SET citus.enable_version_checks TO 'true';
-- Test CREATE TABLE
CREATE TABLE version_mismatch_table(column1 int);
-- Test COPY
\copy version_mismatch_table FROM STDIN;
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
column1
---------
0
1
2
3
4
5
(6 rows)
-- Test SELECT from pg_catalog
SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
FROM pg_catalog.pg_database d
ORDER BY 1;
Name | Owner | Access privileges
------------+----------+-----------------------
postgres | postgres |
regression | postgres |
template0 | postgres | =c/postgres +
| | postgres=CTc/postgres
template1 | postgres | =c/postgres +
| | postgres=CTc/postgres
(4 rows)
-- We should not distribute table in version mistmatch
SELECT create_distributed_table('version_mismatch_table', 'column1');
ERROR: loaded Citus library version differs from installed extension version
DETAIL: Loaded library requires 6.2, but the installed extension version is 6.1-17.
HINT: Run ALTER EXTENSION citus UPDATE and try again.
-- This function will cause fail in next ALTER EXTENSION
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
RETURNS bigint LANGUAGE plpgsql
AS $function$
BEGIN
END;
$function$;
SET citus.enable_version_checks TO 'false';
-- This will fail because of previous function declaration
ALTER EXTENSION citus UPDATE TO '6.2-2';
ERROR: function "citus_table_size" already exists with same argument types
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
SET citus.enable_version_checks TO 'true';
DROP FUNCTION citus_table_size(regclass);
SET citus.enable_version_checks TO 'false';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- Test updating to the latest version without specifying the version number
ALTER EXTENSION citus UPDATE;
-- re-create in newest version
DROP EXTENSION citus;
\c
CREATE EXTENSION citus;
-- test cache invalidation in workers
\c - - - :worker_1_port
-- this will initialize the cache
\d
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '5.2-4';
SET citus.enable_version_checks TO 'true';
-- during ALTER EXTENSION, we should invalidate the cache
ALTER EXTENSION citus UPDATE;
-- if cache is invalidated succesfull, this \d should work without any problem
\d
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)

View File

@ -92,6 +92,10 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
NOTICE: relation "lineitem_orderkey_index" already exists, skipping
-- Verify that we can create indexes concurrently
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
-- Verify that no-name local CREATE INDEX CONCURRENTLY works
CREATE TABLE local_table (id integer, name text);
CREATE INDEX CONCURRENTLY ON local_table(id);
DROP TABLE local_table;
-- Verify that all indexes got created on the master node and one of the workers
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
schemaname | tablename | indexname | tablespace | indexdef

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
create_distributed_table
--------------------------
@ -56,6 +56,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
ERROR: absolute path not allowed
-- check full permission
SET ROLE full_access;
EXECUTE prepare_insert(1);
@ -85,7 +88,22 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
-- check read permission
SET ROLE read_access;
EXECUTE prepare_insert(1);
@ -117,6 +135,17 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- check no permission
SET ROLE no_access;
@ -133,6 +162,13 @@ ERROR: permission denied for relation test
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
ERROR: permission denied for relation test
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
ERROR: permission denied for relation test
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
RESET ROLE;
DROP TABLE test;

View File

@ -900,6 +900,78 @@ SELECT key, value2 FROM prepare_func_table;
(6 rows)
DROP TABLE prepare_func_table;
-- Text columns can give issues when there is an implicit cast from varchar
CREATE TABLE text_partition_column_table (
key text NOT NULL,
value int
);
SELECT create_distributed_table('text_partition_column_table', 'key');
create_distributed_table
--------------------------
(1 row)
PREPARE prepared_relabel_insert(varchar) AS
INSERT INTO text_partition_column_table VALUES ($1, 1);
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
SELECT key, value FROM text_partition_column_table ORDER BY key;
key | value
------+-------
test | 1
test | 1
test | 1
test | 1
test | 1
test | 1
(6 rows)
DROP TABLE text_partition_column_table;
-- Domain type columns can give issues
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
SELECT run_command_on_workers($$
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
$$);
run_command_on_workers
-------------------------------------
(localhost,57637,t,"CREATE DOMAIN")
(localhost,57638,t,"CREATE DOMAIN")
(2 rows)
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
);
SELECT create_distributed_table('domain_partition_column_table', 'key');
create_distributed_table
--------------------------
(1 row)
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);
EXECUTE prepared_coercion_to_domain_insert('test-1');
EXECUTE prepared_coercion_to_domain_insert('test-2');
EXECUTE prepared_coercion_to_domain_insert('test-3');
EXECUTE prepared_coercion_to_domain_insert('test-4');
EXECUTE prepared_coercion_to_domain_insert('test-5');
EXECUTE prepared_coercion_to_domain_insert('test-6');
SELECT key, value FROM domain_partition_column_table ORDER BY key;
key | value
--------+-------
test-1 | 1
test-2 | 1
test-3 | 1
test-4 | 1
test-5 | 1
test-6 | 1
(6 rows)
DROP TABLE domain_partition_column_table;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will

View File

@ -2163,7 +2163,6 @@ BEGIN;
INSERT INTO failure_test VALUES (1, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
@ -2182,7 +2181,6 @@ ROLLBACK;
INSERT INTO failure_test VALUES (2, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard

View File

@ -8,3 +8,5 @@ test: isolation_cluster_management
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement
test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum

View File

@ -878,19 +878,15 @@ ALTER USER test_user WITH nologin;
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 2: "2,2"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 3: "3,3"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport
@ -912,7 +908,6 @@ SELECT shardid, shardstate, nodename, nodeport
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
ERROR: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_reference, line 1: "3,1"
-- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport
@ -930,11 +925,9 @@ SELECT shardid, shardstate, nodename, nodeport
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
ERROR: could not connect to any active placements
CONTEXT: COPY numbers_hash_other, line 1: "1,1"

View File

@ -0,0 +1,47 @@
setup
{
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_insert_vacuum(column1 int, column2 int);
SELECT create_distributed_table('test_insert_vacuum', 'column1');
}
teardown
{
DROP TABLE test_insert_vacuum;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-insert"
{
INSERT INTO test_insert_vacuum VALUES(1, 1);
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-vacuum-analyze"
{
VACUUM ANALYZE test_insert_vacuum;
}
step "s2-vacuum-full"
{
VACUUM FULL test_insert_vacuum;
}
# INSERT and VACUUM ANALYZE should not block each other.
permutation "s1-begin" "s1-insert" "s2-vacuum-analyze" "s1-commit"
# INSERT and VACUUM FULL should block each other.
permutation "s1-begin" "s1-insert" "s2-vacuum-full" "s1-commit"

View File

@ -260,12 +260,14 @@ DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2;
SELECT create_distributed_table('data_load_test', 'col1');
SELECT * FROM data_load_test;
ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col3');
SELECT * FROM data_load_test ORDER BY col2;
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;

View File

@ -100,6 +100,77 @@ RESET citus.enable_version_checks;
DROP EXTENSION citus;
CREATE EXTENSION citus VERSION '5.0';
-- Test non-distributed queries work even in version mismatch
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '6.1-17';
SET citus.enable_version_checks TO 'true';
-- Test CREATE TABLE
CREATE TABLE version_mismatch_table(column1 int);
-- Test COPY
\copy version_mismatch_table FROM STDIN;
0
1
2
3
4
\.
-- Test INSERT
INSERT INTO version_mismatch_table(column1) VALUES(5);
-- Test SELECT
SELECT * FROM version_mismatch_table ORDER BY column1;
-- Test SELECT from pg_catalog
SELECT d.datname as "Name",
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
pg_catalog.array_to_string(d.datacl, E'\n') AS "Access privileges"
FROM pg_catalog.pg_database d
ORDER BY 1;
-- We should not distribute table in version mistmatch
SELECT create_distributed_table('version_mismatch_table', 'column1');
-- This function will cause fail in next ALTER EXTENSION
CREATE OR REPLACE FUNCTION pg_catalog.citus_table_size(table_name regclass)
RETURNS bigint LANGUAGE plpgsql
AS $function$
BEGIN
END;
$function$;
SET citus.enable_version_checks TO 'false';
-- This will fail because of previous function declaration
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- We can DROP problematic function and continue ALTER EXTENSION even when version checks are on
SET citus.enable_version_checks TO 'true';
DROP FUNCTION citus_table_size(regclass);
SET citus.enable_version_checks TO 'false';
ALTER EXTENSION citus UPDATE TO '6.2-2';
-- Test updating to the latest version without specifying the version number
ALTER EXTENSION citus UPDATE;
-- re-create in newest version
DROP EXTENSION citus;
\c
CREATE EXTENSION citus;
-- test cache invalidation in workers
\c - - - :worker_1_port
-- this will initialize the cache
\d
DROP EXTENSION citus;
SET citus.enable_version_checks TO 'false';
CREATE EXTENSION citus VERSION '5.2-4';
SET citus.enable_version_checks TO 'true';
-- during ALTER EXTENSION, we should invalidate the cache
ALTER EXTENSION citus UPDATE;
-- if cache is invalidated succesfull, this \d should work without any problem
\d

View File

@ -66,6 +66,11 @@ CREATE INDEX IF NOT EXISTS lineitem_orderkey_index on index_test_hash(a);
-- Verify that we can create indexes concurrently
CREATE INDEX CONCURRENTLY lineitem_concurrently_index ON lineitem (l_orderkey);
-- Verify that no-name local CREATE INDEX CONCURRENTLY works
CREATE TABLE local_table (id integer, name text);
CREATE INDEX CONCURRENTLY ON local_table(id);
DROP TABLE local_table;
-- Verify that all indexes got created on the master node and one of the workers
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
\c - - - :worker_1_port

View File

@ -10,7 +10,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
-- turn off propagation to avoid Enterprise processing the following section
@ -47,6 +47,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
-- check full permission
SET ROLE full_access;
@ -59,8 +62,18 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
-- check read permission
SET ROLE read_access;
@ -73,6 +86,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- check no permission
@ -87,6 +107,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
RESET ROLE;

View File

@ -476,6 +476,53 @@ SELECT key, value2 FROM prepare_func_table;
DROP TABLE prepare_func_table;
-- Text columns can give issues when there is an implicit cast from varchar
CREATE TABLE text_partition_column_table (
key text NOT NULL,
value int
);
SELECT create_distributed_table('text_partition_column_table', 'key');
PREPARE prepared_relabel_insert(varchar) AS
INSERT INTO text_partition_column_table VALUES ($1, 1);
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
EXECUTE prepared_relabel_insert('test');
SELECT key, value FROM text_partition_column_table ORDER BY key;
DROP TABLE text_partition_column_table;
-- Domain type columns can give issues
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$');
SELECT run_command_on_workers($$
CREATE DOMAIN test_key AS text CHECK(VALUE ~ '^test-\d$')
$$);
CREATE TABLE domain_partition_column_table (
key test_key NOT NULL,
value int
);
SELECT create_distributed_table('domain_partition_column_table', 'key');
PREPARE prepared_coercion_to_domain_insert(text) AS
INSERT INTO domain_partition_column_table VALUES ($1, 1);
EXECUTE prepared_coercion_to_domain_insert('test-1');
EXECUTE prepared_coercion_to_domain_insert('test-2');
EXECUTE prepared_coercion_to_domain_insert('test-3');
EXECUTE prepared_coercion_to_domain_insert('test-4');
EXECUTE prepared_coercion_to_domain_insert('test-5');
EXECUTE prepared_coercion_to_domain_insert('test-6');
SELECT key, value FROM domain_partition_column_table ORDER BY key;
DROP TABLE domain_partition_column_table;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will