Merge pull request #643 from citusdata/add_schema_name_to_worker_apply_shard_ddl_command

Add schema name to worker apply shard ddl command
pull/644/head
Burak Yücesoy 2016-07-21 14:23:29 +03:00 committed by GitHub
commit 85bfad781b
18 changed files with 519 additions and 67 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3
5.1-1 5.1-2 5.1-3 5.1-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -39,6 +39,8 @@ $(EXTENSION)--5.1-2.sql: $(EXTENSION)--5.1-1.sql $(EXTENSION)--5.1-1--5.1-2.sql
cat $^ > $@
$(EXTENSION)--5.1-3.sql: $(EXTENSION)--5.1-2.sql $(EXTENSION)--5.1-2--5.1-3.sql
cat $^ > $@
$(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,8 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_apply_shard_ddl_command(bigint, text);
CREATE OR REPLACE FUNCTION pg_catalog.worker_apply_shard_ddl_command(bigint, text, text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_apply_shard_ddl_command$$;
COMMENT ON FUNCTION worker_apply_shard_ddl_command(bigint, text, text)
IS 'extend ddl command with shardId and apply on database';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.1-3'
default_version = '5.1-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -1106,6 +1106,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
char *tableOwner = TableOwner(relationId);
HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
@ -1113,7 +1115,6 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
tableOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList)
@ -1122,7 +1123,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
uint64 shardId = shardInterval->shardId;
ShardConnections *shardConnections = NULL;
bool shardConnectionsFound = false;
char *escapedCommandString = NULL;
char *escapedSchemaName = quote_literal_cstr(schemaName);
char *escapedCommandString = quote_literal_cstr(commandString);
StringInfo applyCommand = makeStringInfo();
shardConnections = GetShardConnections(shardConnectionHash,
@ -1131,9 +1133,8 @@ ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
Assert(shardConnectionsFound);
/* build the shard ddl command */
escapedCommandString = quote_literal_cstr(commandString);
appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedCommandString);
escapedSchemaName, escapedCommandString);
ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections);

View File

@ -203,8 +203,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
CreateShardPlacements(shardId, ddlCommandList, relationOwner, workerNodeList,
roundRobinNodeIndex, replicationFactor);
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
workerNodeList, roundRobinNodeIndex, replicationFactor);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);

View File

@ -134,7 +134,7 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
targetPlacement->nodePort);
/* finally, drop/recreate remote table and add back row (in healthy state) */
CreateShardPlacements(shardId, ddlCommandList, relationOwner,
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
list_make1(targetNode), 0, 1);
HOLD_INTERRUPTS();
@ -244,18 +244,23 @@ CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId,
ShardPlacement *healthyPlacement,
ShardPlacement *placementToRepair)
{
char *relationName = get_rel_name(distributedTableId);
const char *shardName = NULL;
const char *shardTableName = NULL;
const char *shardQualifiedName = NULL;
StringInfo copyRelationQuery = makeStringInfo();
List *queryResultList = NIL;
bool copySuccessful = false;
char *relationName = get_rel_name(distributedTableId);
Oid shardSchemaOid = get_rel_namespace(distributedTableId);
const char *shardSchemaName = get_namespace_name(shardSchemaOid);
AppendShardIdToName(&relationName, shardId);
shardName = quote_identifier(relationName);
shardTableName = quote_identifier(relationName);
shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName);
appendStringInfo(copyRelationQuery, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName), /* table to append */
quote_literal_cstr(shardName), /* remote table name */
quote_literal_cstr(shardQualifiedName), /* table to append */
quote_literal_cstr(shardQualifiedName), /* remote table name */
quote_literal_cstr(healthyPlacement->nodeName), /* remote host */
healthyPlacement->nodePort); /* remote port */

View File

@ -40,8 +40,8 @@
/* Local functions forward declarations */
static bool WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *newShardOwner, List *ddlCommandList);
static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue);
@ -167,7 +167,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++;
}
CreateShardPlacements(shardId, ddlEventList, relationOwner,
CreateShardPlacements(relationId, shardId, ddlEventList, relationOwner,
candidateNodeList, 0, ShardReplicationFactor);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
@ -377,8 +377,9 @@ CheckDistributedTable(Oid relationId)
* nodes if some DDL commands had been successful).
*/
void
CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner,
List *workerNodeList, int workerStartIndex, int replicationFactor)
CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int attemptCount = replicationFactor;
int workerNodeCount = list_length(workerNodeList);
@ -398,8 +399,8 @@ CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
bool created = WorkerCreateShard(nodeName, nodePort, shardId, newPlacementOwner,
ddlEventList);
bool created = WorkerCreateShard(relationId, nodeName, nodePort, shardId,
newPlacementOwner, ddlEventList);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;
@ -435,9 +436,12 @@ CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner
* each DDL command, and could leave the shard in an half-initialized state.
*/
static bool
WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *newShardOwner, List *ddlCommandList)
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
bool shardCreated = true;
ListCell *ddlCommandCell = NULL;
@ -446,10 +450,10 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
char *ddlCommand = (char *) lfirst(ddlCommandCell);
char *escapedDDLCommand = quote_literal_cstr(ddlCommand);
List *queryResultList = NIL;
StringInfo applyDDLCommand = makeStringInfo();
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND,
shardId, escapedDDLCommand);
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand);
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyDDLCommand);

View File

@ -38,6 +38,7 @@ static bool TypeAddIndexConstraint(const AlterTableCmd *command);
static bool TypeDropIndexConstraint(const AlterTableCmd *command,
const RangeVar *relation, uint64 shardId);
static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId);
static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
/*
@ -48,7 +49,7 @@ static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId
* function has the side effect of extending relation names in the parse tree.
*/
void
RelayEventExtendNames(Node *parseTree, uint64 shardId)
RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{
/* we don't extend names in extension or schema commands */
NodeTag nodeType = nodeTag(parseTree);
@ -63,6 +64,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
AlterSeqStmt *alterSeqStmt = (AlterSeqStmt *) parseTree;
char **sequenceName = &(alterSeqStmt->sequence->relname);
char **sequenceSchemaName = &(alterSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
@ -79,12 +84,16 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
char **relationName = &(alterTableStmt->relation->relname);
char **relationSchemaName = &(alterTableStmt->relation->schemaname);
RangeVar *relation = alterTableStmt->relation; /* for constraints */
List *commandList = alterTableStmt->cmds;
ListCell *commandCell = NULL;
/* first append shardId to base relation name */
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
/* append shardId to base relation name */
AppendShardIdToName(relationName, shardId);
foreach(commandCell, commandList)
@ -110,6 +119,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
ClusterStmt *clusterStmt = (ClusterStmt *) parseTree;
char **relationName = NULL;
char **relationSchemaName = NULL;
/* we do not support clustering the entire database */
if (clusterStmt->relation == NULL)
@ -118,6 +128,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
}
relationName = &(clusterStmt->relation->relname);
relationSchemaName = &(clusterStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
if (clusterStmt->indexname != NULL)
@ -133,6 +148,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
CreateSeqStmt *createSeqStmt = (CreateSeqStmt *) parseTree;
char **sequenceName = &(createSeqStmt->sequence->relname);
char **sequenceSchemaName = &(createSeqStmt->sequence->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(sequenceSchemaName, schemaName);
AppendShardIdToName(sequenceName, shardId);
break;
@ -165,6 +184,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
CreateStmt *createStmt = (CreateStmt *) parseTree;
char **relationName = &(createStmt->relation->relname);
char **relationSchemaName = &(createStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
break;
@ -181,6 +204,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
List *relationNameList = NULL;
int relationNameListLength = 0;
Value *relationSchemaNameValue = NULL;
Value *relationNameValue = NULL;
char **relationName = NULL;
@ -212,12 +236,14 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
case 2:
{
relationSchemaNameValue = linitial(relationNameList);
relationNameValue = lsecond(relationNameList);
break;
}
case 3:
{
relationSchemaNameValue = lsecond(relationNameList);
relationNameValue = lthird(relationNameList);
break;
}
@ -231,6 +257,13 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
}
}
/* prefix with schema name if it is not added already */
if (relationSchemaNameValue == NULL)
{
Value *schemaNameValue = makeString(pstrdup(schemaName));
relationNameList = lcons(schemaNameValue, relationNameList);
}
relationName = &(relationNameValue->val.str);
AppendShardIdToName(relationName, shardId);
}
@ -257,6 +290,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
RangeVar *relation = (RangeVar *) lfirst(lc);
char **relationName = &(relation->relname);
char **relationSchemaName = &(relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
}
}
@ -268,6 +306,7 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
IndexStmt *indexStmt = (IndexStmt *) parseTree;
char **relationName = &(indexStmt->relation->relname);
char **indexName = &(indexStmt->idxname);
char **relationSchemaName = &(indexStmt->relation->schemaname);
/*
* Concurrent index statements cannot run within a transaction block.
@ -290,6 +329,9 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
ereport(ERROR, (errmsg("cannot extend name for null index name")));
}
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
AppendShardIdToName(indexName, shardId);
break;
@ -304,6 +346,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
if (objectType == REINDEX_OBJECT_TABLE || objectType == REINDEX_OBJECT_INDEX)
{
char **objectName = &(reindexStmt->relation->relname);
char **objectSchemaName = &(reindexStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(objectName, shardId);
}
else if (objectType == REINDEX_OBJECT_DATABASE)
@ -315,6 +362,11 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX)
{
char **objectName = &(reindexStmt->relation->relname);
char **objectSchemaName = &(reindexStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(objectName, shardId);
}
else if (objectType == OBJECT_DATABASE)
@ -341,6 +393,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
{
char **oldRelationName = &(renameStmt->relation->relname);
char **newRelationName = &(renameStmt->newname);
char **objectSchemaName = &(renameStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(oldRelationName, shardId);
AppendShardIdToName(newRelationName, shardId);
@ -348,6 +404,10 @@ RelayEventExtendNames(Node *parseTree, uint64 shardId)
else if (objectType == OBJECT_COLUMN || objectType == OBJECT_TRIGGER)
{
char **relationName = &(renameStmt->relation->relname);
char **objectSchemaName = &(renameStmt->relation->schemaname);
/* prefix with schema name if it is not added already */
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
}
@ -512,6 +572,20 @@ AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId)
}
/*
* SetSchemaNameIfNotExist function checks whether schemaName is set and if it is not set
* it sets its value to given newSchemaName.
*/
static void
SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName)
{
if ((*schemaName) == NULL)
{
*schemaName = pstrdup(newSchemaName);
}
}
/*
* AppendShardIdToName appends shardId to the given name. The function takes in
* the name's address in order to reallocate memory for the name in the same

View File

@ -400,13 +400,15 @@ Datum
worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
{
uint64 shardId = PG_GETARG_INT64(0);
text *ddlCommandText = PG_GETARG_TEXT_P(1);
text *schemaNameText = PG_GETARG_TEXT_P(1);
text *ddlCommandText = PG_GETARG_TEXT_P(2);
char *schemaName = text_to_cstring(schemaNameText);
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
/* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, shardId);
RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);

View File

@ -1387,15 +1387,23 @@ static bool
ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand)
{
const char *remoteCommand = APPLY_SHARD_DDL_COMMAND;
const char *parameterValue[2];
const int parameterCount = 2;
const char *parameterValue[3];
const int parameterCount = 3;
PGresult *ddlResult = NULL;
char shardIdString[NAMEDATALEN];
snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId);
/*
* We changed worker_apply_shard_ddl_command and now it requires schema name. Since
* \STAGE will be deprecated anyway, we use public schema for everything to make it
* work with worker_apply_shard_ddl_command. Please note that if user specifies
* schema name, this will not override it, because we prioritize schema names given
* in the query in worker_apply_shard_ddl_command.
*/
parameterValue[0] = shardIdString;
parameterValue[1] = ddlCommand;
parameterValue[1] = "public";
parameterValue[2] = ddlCommand;
ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand,
parameterValue, parameterCount);

View File

@ -65,7 +65,7 @@
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
#define APPLY_SHARD_DDL_COMMAND \
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)"
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text, $3::text)"
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"

View File

@ -50,7 +50,7 @@
/* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
#define WORKER_APPEND_TABLE_TO_SHARD \
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
@ -88,10 +88,9 @@ extern bool CStoreTable(Oid relationId);
extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId);
extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
char *newPlacementOwner,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
/* Function declarations for generating metadata for shard creation */

View File

@ -40,7 +40,7 @@ typedef enum
/* Function declarations to extend names in DDL commands */
extern void RelayEventExtendNames(Node *parseTree, uint64 shardId);
extern void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId);
extern void AppendShardIdToName(char **name, uint64 shardId);
extern void AppendShardIdToStringInfo(StringInfo name, uint64 shardId);

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16953_index" on table "pg_toast_16953"
DEBUG: building index "pg_toast_16958_index" on table "pg_toast_16958"
o_custkey | total_order_count
-----------+-------------------
1466 | 1

View File

@ -17,6 +17,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -138,7 +138,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash', 'n_nat
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 2);
master_create_worker_shards
-----------------------------
@ -442,7 +442,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash_collatio
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 2);
master_create_worker_shards
-----------------------------
@ -485,7 +485,7 @@ SELECT master_create_distributed_table('nation_hash_collation_search_path', 'n_n
(1 row)
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 1);
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 2);
master_create_worker_shards
-----------------------------
@ -537,7 +537,7 @@ SELECT master_create_distributed_table('test_schema_support.nation_hash_composit
(1 row)
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 2);
master_create_worker_shards
-----------------------------
@ -547,7 +547,7 @@ SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_ty
\COPY test_schema_support.nation_hash_composite_types FROM STDIN with delimiter '|';
SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '(a,a)'::test_schema_support.new_composite_type;
n_nationkey | n_name | n_regionkey | n_comment | test_col
-------------+---------------------------+-------------+-----------------------------------------------------+----------
-------------+---------------------------+-------------+----------------------------------------------------+----------
0 | ALGERIA | 0 | haggle. carefully final deposits detect slyly agai | (a,a)
(1 row)
@ -555,7 +555,249 @@ SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '
SET search_path TO test_schema_support;
SELECT * FROM nation_hash_composite_types WHERE test_col = '(a,a)'::new_composite_type;
n_nationkey | n_name | n_regionkey | n_comment | test_col
-------------+---------------------------+-------------+-----------------------------------------------------+----------
-------------+---------------------------+-------------+----------------------------------------------------+----------
0 | ALGERIA | 0 | haggle. carefully final deposits detect slyly agai | (a,a)
(1 row)
-- test ALTER TABLE ADD/DROP queries with schemas
SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
ALTER TABLE nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
new_col | integer |
\c - - - :master_port
SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
-- test CREATE/DROP INDEX with schemas
SET search_path TO public;
-- CREATE index
CREATE INDEX index1 ON test_schema_support.nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1" btree (n_name)
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1_1190003" btree (n_name)
\c - - - :master_port
-- DROP index
DROP INDEX test_schema_support.index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
-- CREATE index
CREATE INDEX index1 ON nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1" btree (n_name)
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
Indexes:
"index1_1190003" btree (n_name)
\c - - - :master_port
-- DROP index
SET search_path TO test_schema_support;
DROP INDEX index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
Table "test_schema_support.nation_hash"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
Table "test_schema_support.nation_hash_1190003"
Column | Type | Modifiers
-------------+------------------------+-----------
n_nationkey | integer | not null
n_name | character(25) | not null
n_regionkey | integer | not null
n_comment | character varying(152) |
\c - - - :master_port
-- test master_copy_shard_placement with schemas
SET search_path TO public;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
(2 rows)
--test with search_path is set
SET search_path TO test_schema_support;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
master_copy_shard_placement
-----------------------------
(1 row)
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1190000 | 1 | 8192 | localhost | 57638
1190000 | 1 | 0 | localhost | 57637
(2 rows)

View File

@ -22,6 +22,7 @@ ALTER EXTENSION citus UPDATE TO '5.0-2';
ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;

View File

@ -102,7 +102,7 @@ CREATE TABLE test_schema_support.nation_hash(
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash', 4, 2);
-- test cursors
@ -335,7 +335,7 @@ CREATE TABLE test_schema_support.nation_hash_collation(
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_hash_collation', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_collation', 4, 2);
\COPY test_schema_support.nation_hash_collation FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
@ -358,7 +358,7 @@ CREATE TABLE nation_hash_collation_search_path(
n_comment varchar(152)
);
SELECT master_create_distributed_table('nation_hash_collation_search_path', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 1);
SELECT master_create_worker_shards('nation_hash_collation_search_path', 4, 2);
\COPY nation_hash_collation_search_path FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
@ -393,7 +393,7 @@ CREATE TABLE test_schema_support.nation_hash_composite_types(
test_col test_schema_support.new_composite_type
);
SELECT master_create_distributed_table('test_schema_support.nation_hash_composite_types', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 1);
SELECT master_create_worker_shards('test_schema_support.nation_hash_composite_types', 4, 2);
-- insert some data to verify composite type queries
\COPY test_schema_support.nation_hash_composite_types FROM STDIN with delimiter '|';
@ -411,3 +411,108 @@ SELECT * FROM test_schema_support.nation_hash_composite_types WHERE test_col = '
SET search_path TO test_schema_support;
SELECT * FROM nation_hash_composite_types WHERE test_col = '(a,a)'::new_composite_type;
-- test ALTER TABLE ADD/DROP queries with schemas
SET search_path TO public;
ALTER TABLE test_schema_support.nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
ALTER TABLE nation_hash ADD COLUMN new_col INT;
-- verify column is added
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- test CREATE/DROP INDEX with schemas
SET search_path TO public;
-- CREATE index
CREATE INDEX index1 ON test_schema_support.nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- DROP index
DROP INDEX test_schema_support.index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
--test with search_path is set
SET search_path TO test_schema_support;
-- CREATE index
CREATE INDEX index1 ON nation_hash(n_name);
--verify INDEX is created
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- DROP index
SET search_path TO test_schema_support;
DROP INDEX index1;
--verify INDEX is dropped
\d test_schema_support.nation_hash;
\c - - - :worker_1_port
\d test_schema_support.nation_hash_1190003;
\c - - - :master_port
-- test master_copy_shard_placement with schemas
SET search_path TO public;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;
--test with search_path is set
SET search_path TO test_schema_support;
-- mark shard as inactive
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and nodeport = :worker_1_port;
SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port);
-- verify shardstate
SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000;