Add old version(without schema name parameter) of api functions back

Fixes #676

We added old versions (i.e. without schema name) of worker_apply_shard_ddl_command,
worker_fetch_foreign_file and worker_fetch_regular_table back. During function call
of one of these functions, we set schema name as  public schema and call the newer
version of the functions.
pull/678/head
Burak Yucesoy 2016-07-27 14:46:01 +03:00 committed by Metin Doslu
parent afb829b102
commit a649b47bac
12 changed files with 80 additions and 24 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus MODULE_big = citus
EXTENSION = citus EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -43,6 +43,8 @@ $(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql $(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,26 @@
CREATE OR REPLACE FUNCTION pg_catalog.worker_apply_shard_ddl_command(bigint, text)
RETURNS void
LANGUAGE sql
AS $worker_apply_shard_ddl_command$
SELECT pg_catalog.worker_apply_shard_ddl_command($1, 'public', $2);
$worker_apply_shard_ddl_command$;
COMMENT ON FUNCTION worker_apply_shard_ddl_command(bigint, text)
IS 'extend ddl command with shardId and apply on database';
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
RETURNS void
LANGUAGE sql
AS $worker_fetch_foreign_file$
SELECT pg_catalog.worker_fetch_foreign_file('public', $1, $2, $3, $4);
$worker_fetch_foreign_file$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
IS 'fetch foreign file from remote node and apply file';
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
RETURNS void
LANGUAGE sql
AS $worker_fetch_regular_table$
SELECT pg_catalog.worker_fetch_regular_table('public', $1, $2, $3, $4);
$worker_fetch_regular_table$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
IS 'fetch PostgreSQL table from remote node';

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '5.1-5' default_version = '5.1-6'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -441,7 +441,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
{ {
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
bool shardCreated = true; bool shardCreated = true;
ListCell *ddlCommandCell = NULL; ListCell *ddlCommandCell = NULL;
@ -452,8 +451,19 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
List *queryResultList = NIL; List *queryResultList = NIL;
StringInfo applyDDLCommand = makeStringInfo(); StringInfo applyDDLCommand = makeStringInfo();
if (strcmp(schemaName, "public") != 0)
{
char *escapedSchemaName = quote_literal_cstr(schemaName);
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand); escapedSchemaName, escapedDDLCommand);
}
else
{
appendStringInfo(applyDDLCommand,
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId,
escapedDDLCommand);
}
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner, queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyDDLCommand); applyDDLCommand);

View File

@ -3779,15 +3779,33 @@ ShardFetchQueryString(uint64 shardId)
if (storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_RELAY || if (storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_RELAY ||
storageType == SHARD_STORAGE_COLUMNAR) storageType == SHARD_STORAGE_COLUMNAR)
{ {
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, if (strcmp(shardSchemaName, "public") != 0)
shardSchemaName, shardTableName, shardLength, {
nodeNameArrayString->data, nodePortArrayString->data); appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, shardSchemaName,
shardTableName, shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
else
{
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND_WITHOUT_SCHEMA,
shardTableName, shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
} }
else if (storageType == SHARD_STORAGE_FOREIGN) else if (storageType == SHARD_STORAGE_FOREIGN)
{ {
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, if (strcmp(shardSchemaName, "public") != 0)
shardSchemaName, shardTableName, shardLength, {
nodeNameArrayString->data, nodePortArrayString->data); appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, shardSchemaName,
shardTableName, shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
else
{
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND_WITHOUT_SCHEMA,
shardTableName, shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
} }
return shardFetchQuery; return shardFetchQuery;

View File

@ -1387,23 +1387,15 @@ static bool
ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand) ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand)
{ {
const char *remoteCommand = APPLY_SHARD_DDL_COMMAND; const char *remoteCommand = APPLY_SHARD_DDL_COMMAND;
const char *parameterValue[3]; const char *parameterValue[2];
const int parameterCount = 3; const int parameterCount = 2;
PGresult *ddlResult = NULL; PGresult *ddlResult = NULL;
char shardIdString[NAMEDATALEN]; char shardIdString[NAMEDATALEN];
snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId); snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId);
/*
* We changed worker_apply_shard_ddl_command and now it requires schema name. Since
* \STAGE will be deprecated anyway, we use public schema for everything to make it
* work with worker_apply_shard_ddl_command. Please note that if user specifies
* schema name, this will not override it, because we prioritize schema names given
* in the query in worker_apply_shard_ddl_command.
*/
parameterValue[0] = shardIdString; parameterValue[0] = shardIdString;
parameterValue[1] = "public"; parameterValue[1] = ddlCommand;
parameterValue[2] = ddlCommand;
ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand, ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand,
parameterValue, parameterCount); parameterValue, parameterCount);

View File

@ -65,7 +65,7 @@
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \ "SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';" "WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
#define APPLY_SHARD_DDL_COMMAND \ #define APPLY_SHARD_DDL_COMMAND \
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text, $3::text)" "SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)"
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')" #define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')" #define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"

View File

@ -51,6 +51,8 @@
/* Remote call definitions to help with data staging and deletion */ /* Remote call definitions to help with data staging and deletion */
#define WORKER_APPLY_SHARD_DDL_COMMAND \ #define WORKER_APPLY_SHARD_DDL_COMMAND \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)" "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
#define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPEND_TABLE_TO_SHARD \ #define WORKER_APPEND_TABLE_TO_SHARD \
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)" "SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"

View File

@ -34,8 +34,12 @@
#define MERGE_COLUMN_FORMAT "merge_column_%u" #define MERGE_COLUMN_FORMAT "merge_column_%u"
#define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \ #define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define TABLE_FETCH_COMMAND_WITHOUT_SCHEMA "SELECT worker_fetch_regular_table \
('%s', " UINT64_FORMAT ", '%s', '%s')"
#define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \ #define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define FOREIGN_FETCH_COMMAND_WITHOUT_SCHEMA "SELECT worker_fetch_foreign_file \
('%s', " UINT64_FORMAT ", '%s', '%s')"
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20; LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30 DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16966_index" on table "pg_toast_16966" DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977"
o_custkey | total_order_count o_custkey | total_order_count
-----------+------------------- -----------+-------------------
1466 | 1 1466 | 1

View File

@ -19,6 +19,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -24,6 +24,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5'; ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;