Merge pull request #602 from citusdata/fix/fix_78_append_table_to_shard_with_schema

Fix master_append_table_to_shard to work with schemas
pull/610/head
Burak Yücesoy 2016-06-17 04:46:20 +03:00 committed by GitHub
commit 65609c93ec
7 changed files with 251 additions and 33 deletions

View File

@ -191,10 +191,14 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
text *sourceTableNameText = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(2);
uint32 sourceNodePort = PG_GETARG_UINT32(3);
char *sourceTableName = text_to_cstring(sourceTableNameText);
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *shardName = NULL;
Oid shardSchemaOid = 0;
char *shardSchemaName = NULL;
char *shardTableName = NULL;
char *shardQualifiedName = NULL;
List *shardPlacementList = NIL;
List *succeededPlacementList = NIL;
List *failedPlacementList = NIL;
@ -234,14 +238,20 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
*/
LockShardResource(shardId, AccessExclusiveLock);
/* get schame name of the target shard */
shardSchemaOid = get_rel_namespace(relationId);
shardSchemaName = get_namespace_name(shardSchemaOid);
/* if shard doesn't have an alias, extend regular table name */
shardName = LoadShardAlias(relationId, shardId);
if (shardName == NULL)
shardTableName = LoadShardAlias(relationId, shardId);
if (shardTableName == NULL)
{
shardName = get_rel_name(relationId);
AppendShardIdToName(&shardName, shardId);
shardTableName = get_rel_name(relationId);
AppendShardIdToName(&shardTableName, shardId);
}
shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName);
shardPlacementList = FinalizedShardPlacementList(shardId);
if (shardPlacementList == NIL)
{
@ -260,7 +270,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
StringInfo workerAppendQuery = makeStringInfo();
appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD,
quote_literal_cstr(shardName),
quote_literal_cstr(shardQualifiedName),
quote_literal_cstr(sourceTableName),
quote_literal_cstr(sourceNodeName), sourceNodePort);
@ -299,7 +309,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
workerName, workerPort);
ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node "
"\"%s:%u\"", shardName, workerName, workerPort),
"\"%s:%u\"", shardQualifiedName, workerName,
workerPort),
errdetail("Marking this shard placement as inactive")));
}
@ -596,16 +607,18 @@ WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName
List *queryResultList = NIL;
StringInfo tableSizeString = NULL;
char *tableSizeStringEnd = NULL;
char *quotedTableName = quote_literal_cstr(tableName);
bool cstoreTable = CStoreTable(relationId);
StringInfo tableSizeQuery = makeStringInfo();
if (cstoreTable)
{
appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, tableName);
appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedTableName);
}
else
{
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName);
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedTableName);
}
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, tableSizeQuery);
@ -622,7 +635,7 @@ WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName
if (errno != 0 || (*tableSizeStringEnd) != '\0')
{
ereport(ERROR, (errmsg("could not extract table size for table \"%s\"",
tableName)));
quotedTableName)));
}
return tableSize;

View File

@ -602,6 +602,7 @@ LocalTableSize(Oid relationId)
{
char *relationName = get_rel_name(relationId);
struct stat fileStat;
int statOK = 0;
StringInfo localFilePath = makeStringInfo();
@ -1031,61 +1032,75 @@ ParseTreeNode(const char *ddlCommand)
Datum
worker_append_table_to_shard(PG_FUNCTION_ARGS)
{
text *shardNameText = PG_GETARG_TEXT_P(0);
text *remoteTableNameText = PG_GETARG_TEXT_P(1);
text *nodeNameText = PG_GETARG_TEXT_P(2);
uint32 nodePort = PG_GETARG_UINT32(3);
text *shardQualifiedNameText = PG_GETARG_TEXT_P(0);
text *sourceQualifiedNameText = PG_GETARG_TEXT_P(1);
text *sourceNodeNameText = PG_GETARG_TEXT_P(2);
uint32 sourceNodePort = PG_GETARG_UINT32(3);
char *shardName = text_to_cstring(shardNameText);
char *remoteTableName = text_to_cstring(remoteTableNameText);
char *nodeName = text_to_cstring(nodeNameText);
List *shardQualifiedNameList = textToQualifiedNameList(shardQualifiedNameText);
List *sourceQualifiedNameList = textToQualifiedNameList(sourceQualifiedNameText);
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *shardTableName = NULL;
char *shardSchemaName = NULL;
char *shardQualifiedName = NULL;
char *sourceSchemaName = NULL;
char *sourceTableName = NULL;
char *sourceQualifiedName = NULL;
StringInfo shardNameString = NULL;
StringInfo localFilePath = NULL;
StringInfo remoteCopyCommand = NULL;
StringInfo sourceCopyCommand = NULL;
CopyStmt *localCopyCommand = NULL;
RangeVar *localTable = NULL;
uint64 shardId = INVALID_SHARD_ID;
bool received = false;
char *quotedTableName = NULL;
StringInfo queryString = NULL;
const char *schemaName = NULL;
/* copy remote table's data to this node */
shardNameString = makeStringInfo();
appendStringInfoString(shardNameString, shardName);
/* We extract schema names and table names from qualified names */
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
DeconstructQualifiedName(sourceQualifiedNameList, &sourceSchemaName,
&sourceTableName);
/*
* We lock on the shardId, but do not unlock. When the function returns, and
* the transaction for this function commits, this lock will automatically
* be released. This ensures appends to a shard happen in a serial manner.
*/
shardNameString = makeStringInfo();
appendStringInfoString(shardNameString, shardTableName);
shardId = ExtractShardId(shardNameString);
LockShardResource(shardId, AccessExclusiveLock);
/* copy remote table's data to this node */
localFilePath = makeStringInfo();
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
quotedTableName = quote_qualified_identifier(NULL, remoteTableName);
remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName);
sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName);
sourceCopyCommand = makeStringInfo();
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand,
localFilePath);
if (!received)
{
ereport(ERROR, (errmsg("could not copy table \"%s\" from \"%s:%u\"",
remoteTableName, nodeName, nodePort)));
sourceTableName, sourceNodeName, sourceNodePort)));
}
/* copy local file into the given shard */
localTable = makeRangeVar((char *) schemaName, shardNameString->data, -1);
localTable = makeRangeVar(shardSchemaName, shardTableName, -1);
localCopyCommand = CopyStatement(localTable, localFilePath->data);
quotedTableName = quote_qualified_identifier(schemaName, shardNameString->data);
shardQualifiedName = quote_qualified_identifier(shardSchemaName,
shardTableName);
queryString = makeStringInfo();
appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data);
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
localFilePath->data);
ProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);

View File

@ -55,8 +55,8 @@
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
#define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size('%s')"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s"
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"

View File

@ -0,0 +1,109 @@
--
-- MULTI_SCHEMA_SUPPORT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1190000;
-- test master_append_table_to_shard with schema
CREATE SCHEMA test_schema_support;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE TABLE test_schema_support.nation_append(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_nationkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('test_schema_support.nation_append');
master_create_empty_shard
---------------------------
1190000
(1 row)
-- create table to append
CREATE TABLE public.nation_local(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
\COPY public.nation_local FROM STDIN with delimiter '|';
-- append table to shard
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support.nation_append;
count
-------
6
(1 row)
-- test with shard name contains special characters
CREATE TABLE test_schema_support."nation._'append" (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
SELECT master_create_distributed_table('test_schema_support."nation._''append"', 'n_nationkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('test_schema_support."nation._''append"');
master_create_empty_shard
---------------------------
1190001
(1 row)
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support."nation._'append";
count
-------
6
(1 row)
-- test with search_path is set
SET search_path TO test_schema_support;
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
-- verify table actually appended to shard
SELECT COUNT(*) FROM nation_append;
count
-------
12
(1 row)
-- test with search_path is set and shard name contains special characters
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
master_append_table_to_shard
------------------------------
0.0266667
(1 row)
-- verify table actually appended to shard
SELECT COUNT(*) FROM "nation._'append";
count
-------
12
(1 row)

View File

@ -149,3 +149,8 @@ test: multi_large_shardid
# multi_drop_extension makes sure we can safely drop and recreate the extension
# ----------
test: multi_drop_extension
# ----------
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem
# ----------
test: multi_schema_support

View File

@ -109,3 +109,8 @@ test: multi_large_shardid
# ----------
test: multi_drop_extension
# ----------
# multi_schema_support makes sure we can work with tables in schemas other than public with no problem
# ----------
test: multi_schema_support

View File

@ -0,0 +1,71 @@
--
-- MULTI_SCHEMA_SUPPORT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1190000;
-- test master_append_table_to_shard with schema
CREATE SCHEMA test_schema_support;
CREATE TABLE test_schema_support.nation_append(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_nationkey', 'append');
SELECT master_create_empty_shard('test_schema_support.nation_append');
-- create table to append
CREATE TABLE public.nation_local(
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152)
);
\COPY public.nation_local FROM STDIN with delimiter '|';
0|ALGERIA|0| haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- append table to shard
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support.nation_append;
-- test with shard name contains special characters
CREATE TABLE test_schema_support."nation._'append" (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
SELECT master_create_distributed_table('test_schema_support."nation._''append"', 'n_nationkey', 'append');
SELECT master_create_empty_shard('test_schema_support."nation._''append"');
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
-- verify table actually appended to shard
SELECT COUNT(*) FROM test_schema_support."nation._'append";
-- test with search_path is set
SET search_path TO test_schema_support;
SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port);
-- verify table actually appended to shard
SELECT COUNT(*) FROM nation_append;
-- test with search_path is set and shard name contains special characters
SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port);
-- verify table actually appended to shard
SELECT COUNT(*) FROM "nation._'append";