Change worker_apply_shard_ddl_command to accept schema name as parameter

Fixes #565
Fixes #626

To add schema support to citus, we need to schema-prefix all table names, object names etc.
in the queries sent to worker nodes. However; query deparsing is not available for most of
DDL commands, therefore it is not easy to generate worker query in the master node.

As a solution we are sending schema names along with shard id and query to run to worker
nodes with worker_apply_shard_ddl_command.

To not break \STAGE command we pass public schema as paramater while calling
worker_apply_shard_ddl_command from there. This will not cause problem if user uses \STAGE
in different schema because passes schema name is used only if there is no schema name is
given in the query.
pull/1938/head
Burak Yucesoy 2016-07-12 16:53:53 +03:00
parent 10923f5409
commit d0beacc4e1
18 changed files with 519 additions and 67 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3
5.1-1 5.1-2 5.1-3 5.1-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -39,6 +39,8 @@ $(EXTENSION)--5.1-2.sql: $(EXTENSION)--5.1-1.sql $(EXTENSION)--5.1-1--5.1-2.sql
cat $^ > $@
$(EXTENSION)--5.1-3.sql: $(EXTENSION)--5.1-2.sql $(EXTENSION)--5.1-2--5.1-3.sql
cat $^ > $@
$(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_apply_shard_ddl_command(bigint, text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_apply_shard_ddl_command(bigint, text, text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_apply_shard_ddl_command$$;
COMMENT ON FUNCTION worker_apply_shard_ddl_command(bigint, text, text)
IS 'extend ddl command with shardId and apply on database';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.1-3'
default_version = '5.1-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -1106,6 +1106,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
char *tableOwner = TableOwner(relationId);
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
@ -1113,7 +1115,6 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
tableOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList)
@ -1122,7 +1123,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
uint64 shardId = shardInterval->shardId;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
char *escapedCommandString = NULL;
char *escapedSchemaName = quote_literal_cstr(schemaName);
char *escapedCommandString = quote_literal_cstr(commandString);
StringInfo applyCommand = makeStringInfo();
shardConnections = GetShardConnections(shardConnectionHash,
@ -1131,9 +1133,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
Assert(shardConnectionsFound);
/* build the shard ddl command */
escapedCommandString = quote_literal_cstr(commandString);
appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedCommandString);
escapedSchemaName, escapedCommandString);
ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections);

View File

@ -203,8 +203,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
CreateShardPlacements(shardId, ddlCommandList, relationOwner, workerNodeList,
roundRobinNodeIndex, replicationFactor);
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
workerNodeList, roundRobinNodeIndex, replicationFactor);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);

View File

@ -134,7 +134,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
targetPlacement->nodePort);
/* finally, drop/recreate remote table and add back row (in healthy state) */
CreateShardPlacements(shardId, ddlCommandList, relationOwner,
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
list_make1(targetNode), 0, 1);
HOLD_INTERRUPTS();
@ -244,18 +244,23 @@ CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId,
ShardPlacement *healthyPlacement,
ShardPlacement *placementToRepair)
{
char *relationName = get_rel_name(distributedTableId);
const char *shardName = NULL;
const char *shardTableName = NULL;
const char *shardQualifiedName = NULL;
StringInfo copyRelationQuery = makeStringInfo();
List *queryResultList = NIL;
bool copySuccessful = false;
char *relationName = get_rel_name(distributedTableId);
Oid shardSchemaOid = get_rel_namespace(distributedTableId);
const char *shardSchemaName = get_namespace_name(shardSchemaOid);
AppendShardIdToName(&relationName, shardId);
shardName = quote_identifier(relationName);
shardTableName = quote_identifier(relationName);
shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName);
appendStringInfo(copyRelationQuery, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName), /* table to append */
quote_literal_cstr(shardName), /* remote table name */
quote_literal_cstr(shardQualifiedName), /* table to append */
quote_literal_cstr(shardQualifiedName), /* remote table name */
quote_literal_cstr(healthyPlacement->nodeName), /* remote host */
healthyPlacement->nodePort); /* remote port */

View File

@ -40,8 +40,8 @@
/* Local functions forward declarations */
static bool WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *newShardOwner, List *ddlCommandList);
static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue);
@ -167,7 +167,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++;
}
CreateShardPlacements(shardId, ddlEventList, relationOwner,
CreateShardPlacements(relationId, shardId, ddlEventList, relationOwner,
candidateNodeList, 0, ShardReplicationFactor);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
@ -377,8 +377,9 @@ CheckDistributedTable(Oid relationId)
* nodes if some DDL commands had been successful).
*/
void
CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner,
List *workerNodeList, int workerStartIndex, int replicationFactor)
CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int attemptCount = replicationFactor;
int workerNodeCount = list_length(workerNodeList);
@ -398,8 +399,8 @@ CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
bool created = WorkerCreateShard(nodeName, nodePort, shardId, newPlacementOwner,
ddlEventList);
bool created = WorkerCreateShard(relationId, nodeName, nodePort, shardId,
newPlacementOwner, ddlEventList);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;
@ -435,9 +436,12 @@ CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner
* each DDL command, and could leave the shard in an half-initialized state.
*/
static bool
WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *newShardOwner, List *ddlCommandList)
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
bool shardCreated = true;
ListCell *ddlCommandCell = NULL;
@ -446,10 +450,10 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *ddlCommand = (char *) lfirst(ddlCommandCell);
char *escapedDDLCommand = quote_literal_cstr(ddlCommand);
List *queryResultList = NIL;
StringInfo applyDDLCommand = makeStringInfo();
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND,
shardId, escapedDDLCommand);
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand);
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyDDLCommand);

View File

@ -38,6 +38,7 @@ static bool TypeAddIndexConstraint(const AlterTableCmd *command);
static bool TypeDropIndexConstraint(const AlterTableCmd *command,
const RangeVar *relation, uint64 shardId);
static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId);
static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
/*
@ -48,7 +49,7 @@ static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId
* function has the side effect of extending relation names in the parse tree.
*/
void
RelayEventExtendNames(Node *parseTree, uint64 shardId)
RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{
/* we don't extend names in extension or schema commands */
NodeTag nodeType = nodeTag(parseTree);
@ -63,6 +64,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
AlterSeqStmt *alterSeqStmt = (AlterSeqStmt *) parseTree;
char **sequenceName = &(alterSeqStmt->sequence->relname);
char **sequenceSchemaName = &(alterSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
@ -79,12 +84,16 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
char **relationName = &(alterTableStmt->relation->relname);
char **relationSchemaName = &(alterTableStmt->relation->schemaname);
RangeVar *relation = alterTableStmt->relation; /* for constraints */
List *commandList = alterTableStmt->cmds;
ListCell *commandCell = NULL;
/* first append shardId to base relation name */
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
/* append shardId to base relation name */
AppendShardIdToName(relationName, shardId);
foreach(commandCell, commandList)
@ -110,6 +119,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
ClusterStmt *clusterStmt = (ClusterStmt *) parseTree;
char **relationName = NULL;
char **relationSchemaName = NULL;
/* we do not support clustering the entire database */
if (clusterStmt->relation == NULL)
@ -118,6 +128,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
}
relationName = &(clusterStmt->relation->relname);
relationSchemaName = &(clusterStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
if (clusterStmt->indexname != NULL)
@ -133,6 +148,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
CreateSeqStmt *createSeqStmt = (CreateSeqStmt *) parseTree;
char **sequenceName = &(createSeqStmt->sequence->relname);
char **sequenceSchemaName = &(createSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
@ -165,6 +184,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
CreateStmt *createStmt = (CreateStmt *) parseTree;
char **relationName = &(createStmt->relation->relname);
char **relationSchemaName = &(createStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
break;
@ -181,6 +204,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
List *relationNameList = NULL;
int relationNameListLength = 0;
Value *relationSchemaNameValue = NULL;
Value *relationNameValue = NULL;
char **relationName = NULL;
@ -212,12 +236,14 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
case 2:
{
relationSchemaNameValue = linitial(relationNameList);
relationNameValue = lsecond(relationNameList);
break;
}
case 3:
{
relationSchemaNameValue = lsecond(relationNameList);
relationNameValue = lthird(relationNameList);
break;
}
@ -231,6 +257,13 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
}
}
/* prefix with schema name if it is not added already */
if (relationSchemaNameValue == NULL)
{
Value *schemaNameValue = makeString(pstrdup(schemaName));
relationNameList = lcons(schemaNameValue, relationNameList);
}
relationName = &(relationNameValue->val.str);
AppendShardIdToName(relationName, shardId);
}
@ -257,6 +290,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
RangeVar *relation = (RangeVar *) lfirst(lc);
char **relationName = &(relation->relname);
char **relationSchemaName = &(relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
}
}
@ -268,6 +306,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
IndexStmt *indexStmt = (IndexStmt *) parseTree;
char **relationName = &(indexStmt->relation->relname);
char **indexName = &(indexStmt->idxname);
char **relationSchemaName = &(indexStmt->relation->schemaname);
/*
* Concurrent index statements cannot run within a transaction block.
@ -290,6 +329,9 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
ereport(ERROR, (errmsg("cannot extend name for null index name")));
}
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
AppendShardIdToName(indexName, shardId);
break;
@ -304,6 +346,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
if (objectType == REINDEX_OBJECT_TABLE || objectType == REINDEX_OBJECT_INDEX)
{
char **objectName = &(reindexStmt->relation->relname);
char **objectSchemaName = &(reindexStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(objectName, shardId);
}
else if (objectType == REINDEX_OBJECT_DATABASE)
@ -315,6 +362,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX)
{
char **objectName = &(reindexStmt->relation->relname);
char **objectSchemaName = &(reindexStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(objectName, shardId);
}
else if (objectType == OBJECT_DATABASE)
@ -341,6 +393,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
char **oldRelationName = &(renameStmt->relation->relname);
char **newRelationName = &(renameStmt->newname);
char **objectSchemaName = &(renameStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(oldRelationName, shardId);
AppendShardIdToName(newRelationName, shardId);
@ -348,6 +404,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
else if (objectType == OBJECT_COLUMN || objectType == OBJECT_TRIGGER)
{
char **relationName = &(renameStmt->relation->relname);
char **objectSchemaName = &(renameStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
}
@ -512,6 +572,20 @@ AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId)
}
/*
* SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set
* it sets its value to given newSchemaName.
*/
static void
SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName)
{
if ((*schemaName) == NULL)
{
*schemaName = pstrdup(newSchemaName);
}
}
/*
* AppendShardIdToName appends shardId to the given name. The function takes in
* the name's address in order to reallocate memory for the name in the same

View File

@ -400,13 +400,15 @@ Datum
worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
{
uint64 shardId = PG_GETARG_INT64(0);
text *ddlCommandText = PG_GETARG_TEXT_P(1);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *ddlCommandText = PG_GETARG_TEXT_P(2);
char *schemaName = text_to_cstring(schemaNameText);
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
/* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, shardId);
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);

View File

@ -1387,15 +1387,23 @@ static bool
ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand)
{
const char *remoteCommand = APPLY_SHARD_DDL_COMMAND;
const char *parameterValue[2];
const int parameterCount = 2;
const char *parameterValue[3];
const int parameterCount = 3;
PGresult *ddlResult = NULL;
char shardIdString[NAMEDATALEN];
snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId);
/*
* We changed worker_apply_shard_ddl_command and now it requires schema name. Since
* \STAGE will be deprecated anyway, we use public schema for everything to make it
* work with worker_apply_shard_ddl_command. Please note that if user specifies
* schema name, this will not override it, because we prioritize schema names given
* in the query in worker_apply_shard_ddl_command.
*/
parameterValue[0] = shardIdString;
parameterValue[1] = ddlCommand;
parameterValue[1] = "public";
parameterValue[2] = ddlCommand;
ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand,
parameterValue, parameterCount);

View File

@ -65,7 +65,7 @@
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
#define APPLY_SHARD_DDL_COMMAND \
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)"
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text, $3::text)"
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"

View File

@ -50,7 +50,7 @@
/* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
#define WORKER_APPEND_TABLE_TO_SHARD \
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
@ -88,10 +88,9 @@ extern bool CStoreTable(Oid relationId);
extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId);
extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
char *newPlacementOwner,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
/* Function declarations for generating metadata for shard creation */

View File

@ -40,7 +40,7 @@ typedef enum
/* Function declarations to extend names in DDL commands */
extern void RelayEventExtendNames(Node *parseTree, uint64 shardId);
extern void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId);
extern void AppendShardIdToName(char **name, uint64 shardId);
extern void AppendShardIdToStringInfo(StringInfo name, uint64 shardId);

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16953_index" on table "pg_toast_16953"
DEBUG: building index "pg_toast_16958_index" on table "pg_toast_16958"
o_custkey | total_order_count
-----------+-------------------
1466 | 1

View File

@ -17,6 +17,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -138,7 +138,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash', 'n_nat
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 2);
master_create_worker_shards
-----------------------------
@ -442,7 +442,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash_collatio
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 2);
master_create_worker_shards
-----------------------------
@ -485,7 +485,7 @@ SELECT master_create_distributed_table('nation_hash_collation_search_path', 'n_n
(1 row)
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 1);
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 2);
master_create_worker_shards
-----------------------------
@ -537,7 +537,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash_composit
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 2);
master_create_worker_shards
-----------------------------
@ -547,7 +547,7 @@ SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_ty
\COPY test_schema_support.nation_hash_composite_types FROM STDIN with delimiter '|';
SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '(a,a)'::test_schema_support.new_composite_type;
n_nationkey | n_name | n_regionkey | n_comment | test_col
-------------+---------------------------+-------------+-----------------------------------------------------+----------
-------------+---------------------------+-------------+----------------------------------------------------+----------
0 | ALGERIA | 0 | haggle. carefully final deposits detect slyly agai | (a,a)
(1 row)
@ -555,7 +555,249 @@ SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '
SET search_path TO test_schema_support;
SELECT * FROM nation_hash_composite_types WHERE test_col = '(a,a)'::new_composite_type;
n_nationkey | n_name | n_regionkey | n_comment | test_col
-------------+---------------------------+-------------+-----------------------------------------------------+----------
-------------+---------------------------+-------------+----------------------------------------------------+----------
0 | ALGERIA | 0 | haggle. carefully final deposits detect slyly agai | (a,a)
(1 row)
-- test ALTER TABLE ADD/DROP queries with schemas
SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
ALTER TABLE nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :master_port
SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
-- test CREATE/DROP INDEX with schemas
SET search_path TO public;
-- CREATE index
CREATE INDEX index1 ON test_schema_support.nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1" btree (n_name)
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1_1190003" btree (n_name)
\c - - - :master_port
-- DROP index
DROP INDEX test_schema_support.index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
-- CREATE index
CREATE INDEX index1 ON nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1" btree (n_name)
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1_1190003" btree (n_name)
\c - - - :master_port
-- DROP index
SET search_path TO test_schema_support;
DROP INDEX index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
-- test master_copy_shard_placement with schemas
SET search_path TO public;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
(2 rows)
--test with search_path is set
SET search_path TO test_schema_support;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
(2 rows)

View File

@ -22,6 +22,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -102,7 +102,7 @@ CREATE TABLE test_schema_support.nation_hash(
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 2);
-- test cursors
@ -335,7 +335,7 @@ CREATE TABLE test_schema_support.nation_hash_collation(
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash_collation', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 2);
\COPY test_schema_support.nation_hash_collation FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
@ -358,7 +358,7 @@ CREATE TABLE nation_hash_collation_search_path(
n_comment varchar(152)
);
SELECT master_create_distributed_table('nation_hash_collation_search_path', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 1);
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 2);
\COPY nation_hash_collation_search_path FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
@ -393,7 +393,7 @@ CREATE TABLE test_schema_support.nation_hash_composite_types(
test_col test_schema_support.new_composite_type
);
SELECT master_create_distributed_table('test_schema_support.nation_hash_composite_types', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 2);
-- insert some data to verify composite type queries
\COPY test_schema_support.nation_hash_composite_types FROM STDIN with delimiter '|';
@ -411,3 +411,108 @@ SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '
SET search_path TO test_schema_support;
SELECT * FROM nation_hash_composite_types WHERE test_col = '(a,a)'::new_composite_type;
-- test ALTER TABLE ADD/DROP queries with schemas
SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
ALTER TABLE nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- test CREATE/DROP INDEX with schemas
SET search_path TO public;
-- CREATE index
CREATE INDEX index1 ON test_schema_support.nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- DROP index
DROP INDEX test_schema_support.index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
-- CREATE index
CREATE INDEX index1 ON nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- DROP index
SET search_path TO test_schema_support;
DROP INDEX index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- test master_copy_shard_placement with schemas
SET search_path TO public;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
--test with search_path is set
SET search_path TO test_schema_support;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;