From 78aaad2738fc286c069360d8a7e97b4f7035d6c8 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Tue, 14 Jun 2016 11:18:06 +0300 Subject: [PATCH] Fix master_append_table_to_shard to work with schemas Fixes #78 With this change, it is possible to append a table in any schema to shard. The function master_append_table_to_shard now supports schema names. --- .../master/master_stage_protocol.c | 33 ++++-- .../worker/worker_data_fetch_protocol.c | 57 +++++---- src/include/distributed/master_protocol.h | 4 +- .../regress/expected/multi_schema_support.out | 109 ++++++++++++++++++ src/test/regress/multi_schedule | 5 + .../regress/multi_task_tracker_extra_schedule | 5 + src/test/regress/sql/multi_schema_support.sql | 71 ++++++++++++ 7 files changed, 251 insertions(+), 33 deletions(-) create mode 100644 src/test/regress/expected/multi_schema_support.out create mode 100644 src/test/regress/sql/multi_schema_support.sql diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 31ce0b17a..1b3cdacce 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -191,10 +191,14 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) text *sourceTableNameText = PG_GETARG_TEXT_P(1); text *sourceNodeNameText = PG_GETARG_TEXT_P(2); uint32 sourceNodePort = PG_GETARG_UINT32(3); + char *sourceTableName = text_to_cstring(sourceTableNameText); char *sourceNodeName = text_to_cstring(sourceNodeNameText); - char *shardName = NULL; + Oid shardSchemaOid = 0; + char *shardSchemaName = NULL; + char *shardTableName = NULL; + char *shardQualifiedName = NULL; List *shardPlacementList = NIL; List *succeededPlacementList = NIL; List *failedPlacementList = NIL; @@ -234,14 +238,20 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) */ LockShardResource(shardId, AccessExclusiveLock); + /* get schame name of the target shard */ + shardSchemaOid = get_rel_namespace(relationId); + shardSchemaName = get_namespace_name(shardSchemaOid); + /* if shard doesn't have an alias, extend regular table name */ - shardName = LoadShardAlias(relationId, shardId); - if (shardName == NULL) + shardTableName = LoadShardAlias(relationId, shardId); + if (shardTableName == NULL) { - shardName = get_rel_name(relationId); - AppendShardIdToName(&shardName, shardId); + shardTableName = get_rel_name(relationId); + AppendShardIdToName(&shardTableName, shardId); } + shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName); + shardPlacementList = FinalizedShardPlacementList(shardId); if (shardPlacementList == NIL) { @@ -260,7 +270,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) StringInfo workerAppendQuery = makeStringInfo(); appendStringInfo(workerAppendQuery, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardName), + quote_literal_cstr(shardQualifiedName), quote_literal_cstr(sourceTableName), quote_literal_cstr(sourceNodeName), sourceNodePort); @@ -299,7 +309,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) workerName, workerPort); ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " - "\"%s:%u\"", shardName, workerName, workerPort), + "\"%s:%u\"", shardQualifiedName, workerName, + workerPort), errdetail("Marking this shard placement as inactive"))); } @@ -596,16 +607,18 @@ WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName List *queryResultList = NIL; StringInfo tableSizeString = NULL; char *tableSizeStringEnd = NULL; + char *quotedTableName = quote_literal_cstr(tableName); bool cstoreTable = CStoreTable(relationId); StringInfo tableSizeQuery = makeStringInfo(); + if (cstoreTable) { - appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, tableName); + appendStringInfo(tableSizeQuery, SHARD_CSTORE_TABLE_SIZE_QUERY, quotedTableName); } else { - appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, tableName); + appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedTableName); } queryResultList = ExecuteRemoteQuery(nodeName, nodePort, NULL, tableSizeQuery); @@ -622,7 +635,7 @@ WorkerTableSize(char *nodeName, uint32 nodePort, Oid relationId, char *tableName if (errno != 0 || (*tableSizeStringEnd) != '\0') { ereport(ERROR, (errmsg("could not extract table size for table \"%s\"", - tableName))); + quotedTableName))); } return tableSize; diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 2fe1ca67f..e7224f0f1 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -602,6 +602,7 @@ LocalTableSize(Oid relationId) { char *relationName = get_rel_name(relationId); struct stat fileStat; + int statOK = 0; StringInfo localFilePath = makeStringInfo(); @@ -1031,61 +1032,75 @@ ParseTreeNode(const char *ddlCommand) Datum worker_append_table_to_shard(PG_FUNCTION_ARGS) { - text *shardNameText = PG_GETARG_TEXT_P(0); - text *remoteTableNameText = PG_GETARG_TEXT_P(1); - text *nodeNameText = PG_GETARG_TEXT_P(2); - uint32 nodePort = PG_GETARG_UINT32(3); + text *shardQualifiedNameText = PG_GETARG_TEXT_P(0); + text *sourceQualifiedNameText = PG_GETARG_TEXT_P(1); + text *sourceNodeNameText = PG_GETARG_TEXT_P(2); + uint32 sourceNodePort = PG_GETARG_UINT32(3); - char *shardName = text_to_cstring(shardNameText); - char *remoteTableName = text_to_cstring(remoteTableNameText); - char *nodeName = text_to_cstring(nodeNameText); + List *shardQualifiedNameList = textToQualifiedNameList(shardQualifiedNameText); + List *sourceQualifiedNameList = textToQualifiedNameList(sourceQualifiedNameText); + char *sourceNodeName = text_to_cstring(sourceNodeNameText); + + char *shardTableName = NULL; + char *shardSchemaName = NULL; + char *shardQualifiedName = NULL; + char *sourceSchemaName = NULL; + char *sourceTableName = NULL; + char *sourceQualifiedName = NULL; StringInfo shardNameString = NULL; StringInfo localFilePath = NULL; - StringInfo remoteCopyCommand = NULL; + StringInfo sourceCopyCommand = NULL; CopyStmt *localCopyCommand = NULL; RangeVar *localTable = NULL; uint64 shardId = INVALID_SHARD_ID; bool received = false; - char *quotedTableName = NULL; StringInfo queryString = NULL; - const char *schemaName = NULL; - /* copy remote table's data to this node */ - shardNameString = makeStringInfo(); - appendStringInfoString(shardNameString, shardName); + /* We extract schema names and table names from qualified names */ + DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); + + DeconstructQualifiedName(sourceQualifiedNameList, &sourceSchemaName, + &sourceTableName); /* * We lock on the shardId, but do not unlock. When the function returns, and * the transaction for this function commits, this lock will automatically * be released. This ensures appends to a shard happen in a serial manner. */ + shardNameString = makeStringInfo(); + appendStringInfoString(shardNameString, shardTableName); + shardId = ExtractShardId(shardNameString); LockShardResource(shardId, AccessExclusiveLock); + /* copy remote table's data to this node */ localFilePath = makeStringInfo(); appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); - quotedTableName = quote_qualified_identifier(NULL, remoteTableName); - remoteCopyCommand = makeStringInfo(); - appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName); + sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName); + sourceCopyCommand = makeStringInfo(); + appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); - received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath); + received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand, + localFilePath); if (!received) { ereport(ERROR, (errmsg("could not copy table \"%s\" from \"%s:%u\"", - remoteTableName, nodeName, nodePort))); + sourceTableName, sourceNodeName, sourceNodePort))); } /* copy local file into the given shard */ - localTable = makeRangeVar((char *) schemaName, shardNameString->data, -1); + localTable = makeRangeVar(shardSchemaName, shardTableName, -1); localCopyCommand = CopyStatement(localTable, localFilePath->data); - quotedTableName = quote_qualified_identifier(schemaName, shardNameString->data); + shardQualifiedName = quote_qualified_identifier(shardSchemaName, + shardTableName); queryString = makeStringInfo(); - appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data); + appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, + localFilePath->data); ProcessUtility((Node *) localCopyCommand, queryString->data, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 0a45c8b1e..d3f2da184 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -55,8 +55,8 @@ "SELECT worker_append_table_to_shard (%s, %s, %s, %u)" #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" #define SHARD_MAX_VALUE_QUERY "SELECT max(%s) FROM %s" -#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size('%s')" -#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size('%s')" +#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)" +#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)" #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out new file mode 100644 index 000000000..0c106e613 --- /dev/null +++ b/src/test/regress/expected/multi_schema_support.out @@ -0,0 +1,109 @@ +-- +-- MULTI_SCHEMA_SUPPORT +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1190000; +-- test master_append_table_to_shard with schema +CREATE SCHEMA test_schema_support; +NOTICE: Citus partially supports CREATE SCHEMA for distributed databases +DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet +CREATE TABLE test_schema_support.nation_append( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152) +); +SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_nationkey', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('test_schema_support.nation_append'); + master_create_empty_shard +--------------------------- + 1190000 +(1 row) + +-- create table to append +CREATE TABLE public.nation_local( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152) +); +\COPY public.nation_local FROM STDIN with delimiter '|'; +-- append table to shard +SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +-- verify table actually appended to shard +SELECT COUNT(*) FROM test_schema_support.nation_append; + count +------- + 6 +(1 row) + +-- test with shard name contains special characters +CREATE TABLE test_schema_support."nation._'append" ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); +SELECT master_create_distributed_table('test_schema_support."nation._''append"', 'n_nationkey', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('test_schema_support."nation._''append"'); + master_create_empty_shard +--------------------------- + 1190001 +(1 row) + +SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +-- verify table actually appended to shard +SELECT COUNT(*) FROM test_schema_support."nation._'append"; + count +------- + 6 +(1 row) + +-- test with search_path is set +SET search_path TO test_schema_support; +SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +-- verify table actually appended to shard +SELECT COUNT(*) FROM nation_append; + count +------- + 12 +(1 row) + +-- test with search_path is set and shard name contains special characters +SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port); + master_append_table_to_shard +------------------------------ + 0.0266667 +(1 row) + +-- verify table actually appended to shard +SELECT COUNT(*) FROM "nation._'append"; + count +------- + 12 +(1 row) + diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5b3912c63..ed60ff120 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -149,3 +149,8 @@ test: multi_large_shardid # multi_drop_extension makes sure we can safely drop and recreate the extension # ---------- test: multi_drop_extension + +# ---------- +# multi_schema_support makes sure we can work with tables in schemas other than public with no problem +# ---------- +test: multi_schema_support diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 9932b153c..27276f83c 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -109,3 +109,8 @@ test: multi_large_shardid # ---------- test: multi_drop_extension +# ---------- +# multi_schema_support makes sure we can work with tables in schemas other than public with no problem +# ---------- +test: multi_schema_support + diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql new file mode 100644 index 000000000..1e20b38e4 --- /dev/null +++ b/src/test/regress/sql/multi_schema_support.sql @@ -0,0 +1,71 @@ +-- +-- MULTI_SCHEMA_SUPPORT +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1190000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1190000; + +-- test master_append_table_to_shard with schema +CREATE SCHEMA test_schema_support; + +CREATE TABLE test_schema_support.nation_append( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152) +); +SELECT master_create_distributed_table('test_schema_support.nation_append', 'n_nationkey', 'append'); + +SELECT master_create_empty_shard('test_schema_support.nation_append'); + +-- create table to append +CREATE TABLE public.nation_local( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152) +); + +\COPY public.nation_local FROM STDIN with delimiter '|'; +0|ALGERIA|0| haggle. carefully final deposits detect slyly agai +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5|ETHIOPIA|0|ven packages wake quickly. regu +\. + +-- append table to shard +SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port); + +-- verify table actually appended to shard +SELECT COUNT(*) FROM test_schema_support.nation_append; + +-- test with shard name contains special characters +CREATE TABLE test_schema_support."nation._'append" ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); + +SELECT master_create_distributed_table('test_schema_support."nation._''append"', 'n_nationkey', 'append'); +SELECT master_create_empty_shard('test_schema_support."nation._''append"'); + +SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port); + +-- verify table actually appended to shard +SELECT COUNT(*) FROM test_schema_support."nation._'append"; + +-- test with search_path is set +SET search_path TO test_schema_support; + +SELECT master_append_table_to_shard(1190000, 'public.nation_local', 'localhost', :master_port); + +-- verify table actually appended to shard +SELECT COUNT(*) FROM nation_append; + +-- test with search_path is set and shard name contains special characters +SELECT master_append_table_to_shard(1190001, 'nation_local', 'localhost', :master_port); + +-- verify table actually appended to shard +SELECT COUNT(*) FROM "nation._'append";