Remove schema name parameter from API functions

We remove schema name parameter from worker_fetch_foreign_file and
worker_fetch_regular_table functions. We now send schema name
concatanated with table name.
pull/678/head
Burak Yucesoy 2016-07-28 17:25:46 +03:00 committed by Metin Doslu
parent a649b47bac
commit 6f20af9e38
10 changed files with 88 additions and 78 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -45,6 +45,8 @@ $(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql
cat $^ > $@
$(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql
cat $^ > $@
$(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,17 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]);
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
IS 'fetch foreign file from remote node and apply file';
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[]);
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
IS 'fetch PostgreSQL table from remote node';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.1-6'
default_version = '5.1-7'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -954,13 +954,14 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName);
/*
* The warning message created in TableDDLCommandList() is descriptive
* enough; therefore, we just throw an error which says that we could not
* run the copy operation.
*/
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, relationName);
ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName);
if (ddlCommandList == NIL)
{
ereport(ERROR, (errmsg("could not run copy from the worker node")));

View File

@ -3781,14 +3781,17 @@ ShardFetchQueryString(uint64 shardId)
{
if (strcmp(shardSchemaName, "public") != 0)
{
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, shardSchemaName,
shardTableName, shardLength, nodeNameArrayString->data,
char *qualifiedTableName = quote_qualified_identifier(shardSchemaName,
shardTableName);
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, qualifiedTableName,
shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
else
{
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND_WITHOUT_SCHEMA,
shardTableName, shardLength, nodeNameArrayString->data,
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, shardTableName,
shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
}
@ -3796,14 +3799,17 @@ ShardFetchQueryString(uint64 shardId)
{
if (strcmp(shardSchemaName, "public") != 0)
{
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, shardSchemaName,
shardTableName, shardLength, nodeNameArrayString->data,
char *qualifiedTableName = quote_qualified_identifier(shardSchemaName,
shardTableName);
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, qualifiedTableName,
shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
else
{
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND_WITHOUT_SCHEMA,
shardTableName, shardLength, nodeNameArrayString->data,
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, shardTableName,
shardLength, nodeNameArrayString->data,
nodePortArrayString->data);
}
}

View File

@ -53,21 +53,20 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
int32 fileDescriptor);
static void DeleteFile(const char *filename);
static void FetchTableCommon(text *tableSchemaNameText, text *tableName,
uint64 remoteTableSize, ArrayType *nodeNameObject,
ArrayType *nodePortObject,
static void FetchTableCommon(text *tableName, uint64 remoteTableSize,
ArrayType *nodeNameObject, ArrayType *nodePortObject,
bool (*FetchTableFunction)(const char *, uint32,
const char *, const char *));
const char *));
static uint64 LocalTableSize(Oid relationId);
static uint64 ExtractShardId(const char *tableName);
static bool FetchRegularTable(const char *nodeName, uint32 nodePort,
const char *schemaName, const char *tableName);
const char *tableName);
static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
const char *schemaName, const char *tableName);
const char *tableName);
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
const char *schemaName, const char *tableName);
const char *tableName);
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
const char *schemaName, const char *tableName);
const char *tableName);
static bool check_log_statement(List *stmt_list);
@ -427,18 +426,17 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
Datum
worker_fetch_regular_table(PG_FUNCTION_ARGS)
{
text *regularSchemaName = PG_GETARG_TEXT_P(0);
text *regularTableName = PG_GETARG_TEXT_P(1);
uint64 generationStamp = PG_GETARG_INT64(2);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
text *regularTableName = PG_GETARG_TEXT_P(0);
uint64 generationStamp = PG_GETARG_INT64(1);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
*/
FetchTableCommon(regularSchemaName, regularTableName, generationStamp,
nodeNameObject, nodePortObject, &FetchRegularTable);
FetchTableCommon(regularTableName, generationStamp, nodeNameObject, nodePortObject,
&FetchRegularTable);
PG_RETURN_VOID();
}
@ -452,18 +450,17 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
Datum
worker_fetch_foreign_file(PG_FUNCTION_ARGS)
{
text *foreignSchemaName = PG_GETARG_TEXT_P(0);
text *foreignTableName = PG_GETARG_TEXT_P(1);
uint64 foreignFileSize = PG_GETARG_INT64(2);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
text *foreignTableName = PG_GETARG_TEXT_P(0);
uint64 foreignFileSize = PG_GETARG_INT64(1);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
/*
* Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching.
*/
FetchTableCommon(foreignSchemaName, foreignTableName, foreignFileSize,
nodeNameObject, nodePortObject, &FetchForeignTable);
FetchTableCommon(foreignTableName, foreignFileSize, nodeNameObject, nodePortObject,
&FetchForeignTable);
PG_RETURN_VOID();
}
@ -476,18 +473,17 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
* are retried in case of node failures.
*/
static void
FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTableSize,
FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
ArrayType *nodeNameObject, ArrayType *nodePortObject,
bool (*FetchTableFunction)(const char *, uint32, const char *,
const char *))
bool (*FetchTableFunction)(const char *, uint32, const char *))
{
char *schemaName = NULL;
char *tableName = NULL;
Oid schemaId = InvalidOid;
uint64 shardId = INVALID_SHARD_ID;
Oid relationId = InvalidOid;
List *relationNameList = NIL;
RangeVar *relation = NULL;
uint32 nodeIndex = 0;
bool tableFetched = false;
char *tableName = text_to_cstring(tableNameText);
Datum *nodeNameArray = DeconstructArrayObject(nodeNameObject);
Datum *nodePortArray = DeconstructArrayObject(nodePortObject);
@ -501,9 +497,6 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
" do not match", nodeNameCount, nodePortCount)));
}
schemaName = text_to_cstring(tableSchemaNameText);
tableName = text_to_cstring(tableNameText);
/*
* We lock on the shardId, but do not unlock. When the function returns, and
* the transaction for this function commits, this lock will automatically
@ -513,9 +506,11 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
shardId = ExtractShardId(tableName);
LockShardResource(shardId, AccessExclusiveLock);
relationNameList = textToQualifiedNameList(tableNameText);
relation = makeRangeVarFromNameList(relationNameList);
relationId = RangeVarGetRelid(relation, NoLock, true);
/* check if we already fetched the table */
schemaId = get_namespace_oid(schemaName, true);
relationId = get_relname_relid(tableName, schemaId);
if (relationId != InvalidOid)
{
uint64 localTableSize = 0;
@ -566,7 +561,7 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
char *nodeName = TextDatumGetCString(nodeNameDatum);
uint32 nodePort = DatumGetUInt32(nodePortDatum);
tableFetched = (*FetchTableFunction)(nodeName, nodePort, schemaName, tableName);
tableFetched = (*FetchTableFunction)(nodeName, nodePort, tableName);
nodeIndex++;
}
@ -693,8 +688,7 @@ ExtractShardId(const char *tableName)
* false. On other types of failures, the function errors out.
*/
static bool
FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
{
StringInfo localFilePath = NULL;
StringInfo remoteCopyCommand = NULL;
@ -704,12 +698,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
RangeVar *localTable = NULL;
uint64 shardId = 0;
bool received = false;
char *quotedTableName = NULL;
StringInfo queryString = NULL;
const char *tableOwner = NULL;
Oid tableOwnerId = InvalidOid;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
List *tableNameList = NIL;
/* copy remote table's data to this node in an idempotent manner */
shardId = ExtractShardId(tableName);
@ -717,9 +711,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
quotedTableName = quote_qualified_identifier(schemaName, tableName);
remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName);
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
if (!received)
@ -728,7 +721,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
}
/* fetch the ddl commands needed to create the table */
tableOwner = RemoteTableOwner(nodeName, nodePort, schemaName, tableName);
tableOwner = RemoteTableOwner(nodeName, nodePort, tableName);
if (tableOwner == NULL)
{
return false;
@ -736,7 +729,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
tableOwnerId = get_role_oid(tableOwner, false);
/* fetch the ddl commands needed to create the table */
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName);
if (ddlCommandList == NIL)
{
return false;
@ -768,11 +761,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
* into process utility to provide their custom COPY behavior.
*/
localTable = makeRangeVar((char *) schemaName, (char *) tableName, -1);
tableNameList = stringToQualifiedNameList(tableName);
localTable = makeRangeVarFromNameList(tableNameList);
localCopyCommand = CopyStatement(localTable, localFilePath->data);
queryString = makeStringInfo();
appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data);
appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data);
ProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
@ -793,10 +787,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
* commands against the local database, the function errors out.
*/
static bool
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo localFilePath = NULL;
StringInfo remoteFilePath = NULL;
StringInfo transmitCommand = NULL;
@ -814,7 +806,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
localFilePath = makeStringInfo();
appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName);
remoteFilePath = ForeignFilePath(nodeName, nodePort, schemaName, tableName);
remoteFilePath = ForeignFilePath(nodeName, nodePort, tableName);
if (remoteFilePath == NULL)
{
return false;
@ -830,15 +822,15 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
}
/* fetch the ddl commands needed to create the table */
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName);
if (ddlCommandList == NIL)
{
return false;
}
alterTableCommand = makeStringInfo();
appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME,
qualifiedTableName, localFilePath->data);
appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, tableName,
localFilePath->data);
ddlCommandList = lappend(ddlCommandList, alterTableCommand);
@ -865,16 +857,14 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
* the table. If an error occurs during fetching, return NULL.
*/
static const char *
RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *tableName)
{
List *ownerList = NIL;
StringInfo queryString = NULL;
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo relationOwner;
queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_OWNER, qualifiedTableName);
appendStringInfo(queryString, GET_TABLE_OWNER, tableName);
ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
if (list_length(ownerList) != 1)
@ -894,15 +884,13 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName,
* the function returns an empty list.
*/
List *
TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName)
{
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
List *ddlCommandList = NIL;
StringInfo queryString = NULL;
queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, qualifiedTableName);
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName);
ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
return ddlCommandList;
@ -915,16 +903,14 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaNam
* null.
*/
static StringInfo
ForeignFilePath(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName)
{
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
List *foreignPathList = NIL;
StringInfo foreignPathCommand = NULL;
StringInfo foreignPath = NULL;
foreignPathCommand = makeStringInfo();
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, qualifiedTableName);
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName);
foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand);
if (foreignPathList != NIL)

View File

@ -33,12 +33,8 @@
#define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
#define MERGE_COLUMN_FORMAT "merge_column_%u"
#define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define TABLE_FETCH_COMMAND_WITHOUT_SCHEMA "SELECT worker_fetch_regular_table \
('%s', " UINT64_FORMAT ", '%s', '%s')"
#define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define FOREIGN_FETCH_COMMAND_WITHOUT_SCHEMA "SELECT worker_fetch_foreign_file \
('%s', " UINT64_FORMAT ", '%s', '%s')"
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"

View File

@ -120,7 +120,7 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
extern int32 ArrayObjectCount(ArrayType *arrayObject);
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
const char *schemaName, const char *tableName);
const char *tableName);
/* Function declarations shared with the master planner */
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);

View File

@ -20,6 +20,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -25,6 +25,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
-- drop extension an re-create in newest version
DROP EXTENSION citus;