Merge pull request #653 from citusdata/fix/fix_worker_fetch_regular_table_with_schema

Fix worker_fetch_regular_table with schema

cr: @jasonmp85
pull/660/head
Jason Petersen 2016-07-22 00:56:07 -06:00 committed by GitHub
commit a53a2fe8b0
14 changed files with 512 additions and 74 deletions

View File

@ -6,7 +6,7 @@ citus_top_builddir = ../../..
MODULE_big = citus MODULE_big = citus
EXTENSION = citus EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -41,6 +41,8 @@ $(EXTENSION)--5.1-3.sql: $(EXTENSION)--5.1-2.sql $(EXTENSION)--5.1-2--5.1-3.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql $(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,17 @@
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]);
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[])
IS 'fetch foreign file from remote node and apply file';
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]);
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[])
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$;
COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[])
IS 'fetch PostgreSQL table from remote node';

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '5.1-4' default_version = '5.1-5'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -896,18 +896,13 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
char *relationName = relation->relname; char *relationName = relation->relname;
char *schemaName = relation->schemaname; char *schemaName = relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
/* fetch the ddl commands needed to create the table */
StringInfo tableNameStringInfo = makeStringInfo();
appendStringInfoString(tableNameStringInfo, qualifiedName);
/* /*
* The warning message created in TableDDLCommandList() is descriptive * The warning message created in TableDDLCommandList() is descriptive
* enough; therefore, we just throw an error which says that we could not * enough; therefore, we just throw an error which says that we could not
* run the copy operation. * run the copy operation.
*/ */
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableNameStringInfo); ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, relationName);
if (ddlCommandList == NIL) if (ddlCommandList == NIL)
{ {
ereport(ERROR, (errmsg("could not run copy from the worker node"))); ereport(ERROR, (errmsg("could not run copy from the worker node")));

View File

@ -198,7 +198,12 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32
static List * static List *
RecreateTableDDLCommandList(Oid relationId) RecreateTableDDLCommandList(Oid relationId)
{ {
char *relationName = get_rel_name(relationId); const char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
const char *relationSchemaName = get_namespace_name(relationSchemaId);
const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName,
relationName);
StringInfo dropCommand = makeStringInfo(); StringInfo dropCommand = makeStringInfo();
List *createCommandList = NIL; List *createCommandList = NIL;
List *dropCommandList = NIL; List *dropCommandList = NIL;
@ -209,12 +214,12 @@ RecreateTableDDLCommandList(Oid relationId)
if (relationKind == RELKIND_RELATION) if (relationKind == RELKIND_RELATION)
{ {
appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND, appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
quote_identifier(relationName)); qualifiedRelationName);
} }
else if (relationKind == RELKIND_FOREIGN_TABLE) else if (relationKind == RELKIND_FOREIGN_TABLE)
{ {
appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND, appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND,
quote_identifier(relationName)); qualifiedRelationName);
} }
else else
{ {

View File

@ -3750,6 +3750,7 @@ ShardFetchQueryString(uint64 shardId)
/* check storage type to create the correct query string */ /* check storage type to create the correct query string */
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
char storageType = shardInterval->storageType; char storageType = shardInterval->storageType;
char *shardSchemaName = NULL;
char *shardTableName = NULL; char *shardTableName = NULL;
/* /*
@ -3766,7 +3767,10 @@ ShardFetchQueryString(uint64 shardId)
else else
{ {
/* construct the shard name */ /* construct the shard name */
Oid shardSchemaId = get_rel_namespace(shardInterval->relationId);
char *tableName = get_rel_name(shardInterval->relationId); char *tableName = get_rel_name(shardInterval->relationId);
shardSchemaName = get_namespace_name(shardSchemaId);
shardTableName = pstrdup(tableName); shardTableName = pstrdup(tableName);
AppendShardIdToName(&shardTableName, shardId); AppendShardIdToName(&shardTableName, shardId);
} }
@ -3776,13 +3780,13 @@ ShardFetchQueryString(uint64 shardId)
storageType == SHARD_STORAGE_COLUMNAR) storageType == SHARD_STORAGE_COLUMNAR)
{ {
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND,
shardTableName, shardLength, shardSchemaName, shardTableName, shardLength,
nodeNameArrayString->data, nodePortArrayString->data); nodeNameArrayString->data, nodePortArrayString->data);
} }
else if (storageType == SHARD_STORAGE_FOREIGN) else if (storageType == SHARD_STORAGE_FOREIGN)
{ {
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND,
shardTableName, shardLength, shardSchemaName, shardTableName, shardLength,
nodeNameArrayString->data, nodePortArrayString->data); nodeNameArrayString->data, nodePortArrayString->data);
} }

View File

@ -53,20 +53,21 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
static void ReceiveResourceCleanup(int32 connectionId, const char *filename, static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
int32 fileDescriptor); int32 fileDescriptor);
static void DeleteFile(const char *filename); static void DeleteFile(const char *filename);
static void FetchTableCommon(text *tableName, uint64 remoteTableSize, static void FetchTableCommon(text *tableSchemaNameText, text *tableName,
ArrayType *nodeNameObject, ArrayType *nodePortObject, uint64 remoteTableSize, ArrayType *nodeNameObject,
ArrayType *nodePortObject,
bool (*FetchTableFunction)(const char *, uint32, bool (*FetchTableFunction)(const char *, uint32,
StringInfo)); const char *, const char *));
static uint64 LocalTableSize(Oid relationId); static uint64 LocalTableSize(Oid relationId);
static uint64 ExtractShardId(StringInfo tableName); static uint64 ExtractShardId(const char *tableName);
static bool FetchRegularTable(const char *nodeName, uint32 nodePort, static bool FetchRegularTable(const char *nodeName, uint32 nodePort,
StringInfo tableName); const char *schemaName, const char *tableName);
static bool FetchForeignTable(const char *nodeName, uint32 nodePort, static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
StringInfo tableName); const char *schemaName, const char *tableName);
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort, static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
StringInfo tableName); const char *schemaName, const char *tableName);
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
StringInfo tableName); const char *schemaName, const char *tableName);
static bool check_log_statement(List *stmt_list); static bool check_log_statement(List *stmt_list);
@ -426,16 +427,17 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
Datum Datum
worker_fetch_regular_table(PG_FUNCTION_ARGS) worker_fetch_regular_table(PG_FUNCTION_ARGS)
{ {
text *regularTableName = PG_GETARG_TEXT_P(0); text *regularSchemaName = PG_GETARG_TEXT_P(0);
uint64 generationStamp = PG_GETARG_INT64(1); text *regularTableName = PG_GETARG_TEXT_P(1);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); uint64 generationStamp = PG_GETARG_INT64(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
/* /*
* Run common logic to fetch the remote table, and use the provided function * Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching. * pointer to perform the actual table fetching.
*/ */
FetchTableCommon(regularTableName, generationStamp, FetchTableCommon(regularSchemaName, regularTableName, generationStamp,
nodeNameObject, nodePortObject, &FetchRegularTable); nodeNameObject, nodePortObject, &FetchRegularTable);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -450,16 +452,17 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
Datum Datum
worker_fetch_foreign_file(PG_FUNCTION_ARGS) worker_fetch_foreign_file(PG_FUNCTION_ARGS)
{ {
text *foreignTableName = PG_GETARG_TEXT_P(0); text *foreignSchemaName = PG_GETARG_TEXT_P(0);
uint64 foreignFileSize = PG_GETARG_INT64(1); text *foreignTableName = PG_GETARG_TEXT_P(1);
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); uint64 foreignFileSize = PG_GETARG_INT64(2);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
/* /*
* Run common logic to fetch the remote table, and use the provided function * Run common logic to fetch the remote table, and use the provided function
* pointer to perform the actual table fetching. * pointer to perform the actual table fetching.
*/ */
FetchTableCommon(foreignTableName, foreignFileSize, FetchTableCommon(foreignSchemaName, foreignTableName, foreignFileSize,
nodeNameObject, nodePortObject, &FetchForeignTable); nodeNameObject, nodePortObject, &FetchForeignTable);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -473,12 +476,14 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
* are retried in case of node failures. * are retried in case of node failures.
*/ */
static void static void
FetchTableCommon(text *tableNameText, uint64 remoteTableSize, FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTableSize,
ArrayType *nodeNameObject, ArrayType *nodePortObject, ArrayType *nodeNameObject, ArrayType *nodePortObject,
bool (*FetchTableFunction)(const char *, uint32, StringInfo)) bool (*FetchTableFunction)(const char *, uint32, const char *,
const char *))
{ {
StringInfo tableName = NULL; char *schemaName = NULL;
char *tableNameCString = NULL; char *tableName = NULL;
Oid schemaId = InvalidOid;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
uint32 nodeIndex = 0; uint32 nodeIndex = 0;
@ -496,9 +501,8 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
" do not match", nodeNameCount, nodePortCount))); " do not match", nodeNameCount, nodePortCount)));
} }
tableName = makeStringInfo(); schemaName = text_to_cstring(tableSchemaNameText);
tableNameCString = text_to_cstring(tableNameText); tableName = text_to_cstring(tableNameText);
appendStringInfoString(tableName, tableNameCString);
/* /*
* We lock on the shardId, but do not unlock. When the function returns, and * We lock on the shardId, but do not unlock. When the function returns, and
@ -510,7 +514,8 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
LockShardResource(shardId, AccessExclusiveLock); LockShardResource(shardId, AccessExclusiveLock);
/* check if we already fetched the table */ /* check if we already fetched the table */
relationId = RelnameGetRelid(tableName->data); schemaId = get_namespace_oid(schemaName, true);
relationId = get_relname_relid(tableName, schemaId);
if (relationId != InvalidOid) if (relationId != InvalidOid)
{ {
uint64 localTableSize = 0; uint64 localTableSize = 0;
@ -561,7 +566,7 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
char *nodeName = TextDatumGetCString(nodeNameDatum); char *nodeName = TextDatumGetCString(nodeNameDatum);
uint32 nodePort = DatumGetUInt32(nodePortDatum); uint32 nodePort = DatumGetUInt32(nodePortDatum);
tableFetched = (*FetchTableFunction)(nodeName, nodePort, tableName); tableFetched = (*FetchTableFunction)(nodeName, nodePort, schemaName, tableName);
nodeIndex++; nodeIndex++;
} }
@ -569,7 +574,7 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
/* error out if we tried all nodes and could not fetch the table */ /* error out if we tried all nodes and could not fetch the table */
if (!tableFetched) if (!tableFetched)
{ {
ereport(ERROR, (errmsg("could not fetch relation: \"%s\"", tableName->data))); ereport(ERROR, (errmsg("could not fetch relation: \"%s\"", tableName)));
} }
} }
@ -646,18 +651,18 @@ LocalTableSize(Oid relationId)
/* Extracts shard id from the given table name, and returns it. */ /* Extracts shard id from the given table name, and returns it. */
static uint64 static uint64
ExtractShardId(StringInfo tableName) ExtractShardId(const char *tableName)
{ {
uint64 shardId = 0; uint64 shardId = 0;
char *shardIdString = NULL; char *shardIdString = NULL;
char *shardIdStringEnd = NULL; char *shardIdStringEnd = NULL;
/* find the last underscore and increment for shardId string */ /* find the last underscore and increment for shardId string */
shardIdString = strrchr(tableName->data, SHARD_NAME_SEPARATOR); shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR);
if (shardIdString == NULL) if (shardIdString == NULL)
{ {
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
tableName->data))); tableName)));
} }
shardIdString++; shardIdString++;
@ -668,7 +673,7 @@ ExtractShardId(StringInfo tableName)
if (errno != 0 || (*shardIdStringEnd != '\0')) if (errno != 0 || (*shardIdStringEnd != '\0'))
{ {
ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"",
tableName->data))); tableName)));
} }
#else #else
ereport(ERROR, (errmsg("could not extract shardId from table name"), ereport(ERROR, (errmsg("could not extract shardId from table name"),
@ -688,7 +693,8 @@ ExtractShardId(StringInfo tableName)
* false. On other types of failures, the function errors out. * false. On other types of failures, the function errors out.
*/ */
static bool static bool
FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
{ {
StringInfo localFilePath = NULL; StringInfo localFilePath = NULL;
StringInfo remoteCopyCommand = NULL; StringInfo remoteCopyCommand = NULL;
@ -700,7 +706,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
bool received = false; bool received = false;
char *quotedTableName = NULL; char *quotedTableName = NULL;
StringInfo queryString = NULL; StringInfo queryString = NULL;
const char *schemaName = NULL;
const char *tableOwner = NULL; const char *tableOwner = NULL;
Oid tableOwnerId = InvalidOid; Oid tableOwnerId = InvalidOid;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
@ -712,7 +717,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
quotedTableName = quote_qualified_identifier(schemaName, tableName->data); quotedTableName = quote_qualified_identifier(schemaName, tableName);
remoteCopyCommand = makeStringInfo(); remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName); appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName);
@ -723,7 +728,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
} }
/* fetch the ddl commands needed to create the table */ /* fetch the ddl commands needed to create the table */
tableOwner = RemoteTableOwner(nodeName, nodePort, tableName); tableOwner = RemoteTableOwner(nodeName, nodePort, schemaName, tableName);
if (tableOwner == NULL) if (tableOwner == NULL)
{ {
return false; return false;
@ -731,7 +736,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
tableOwnerId = get_role_oid(tableOwner, false); tableOwnerId = get_role_oid(tableOwner, false);
/* fetch the ddl commands needed to create the table */ /* fetch the ddl commands needed to create the table */
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
if (ddlCommandList == NIL) if (ddlCommandList == NIL)
{ {
return false; return false;
@ -763,7 +768,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook * directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
* into process utility to provide their custom COPY behavior. * into process utility to provide their custom COPY behavior.
*/ */
localTable = makeRangeVar((char *) schemaName, tableName->data, -1); localTable = makeRangeVar((char *) schemaName, (char *) tableName, -1);
localCopyCommand = CopyStatement(localTable, localFilePath->data); localCopyCommand = CopyStatement(localTable, localFilePath->data);
queryString = makeStringInfo(); queryString = makeStringInfo();
@ -788,8 +793,10 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
* commands against the local database, the function errors out. * commands against the local database, the function errors out.
*/ */
static bool static bool
FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
{ {
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo localFilePath = NULL; StringInfo localFilePath = NULL;
StringInfo remoteFilePath = NULL; StringInfo remoteFilePath = NULL;
StringInfo transmitCommand = NULL; StringInfo transmitCommand = NULL;
@ -798,11 +805,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
ListCell *ddlCommandCell = NULL; ListCell *ddlCommandCell = NULL;
/* fetch foreign file to this node in an idempotent manner */ /*
* Fetch a foreign file to this node in an idempotent manner. It's OK that
* this file name lacks the schema, as the table name will have a shard id
* attached to it, which is unique (so conflicts are avoided even if two
* tables in different schemas have the same name).
*/
localFilePath = makeStringInfo(); localFilePath = makeStringInfo();
appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName->data); appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName);
remoteFilePath = ForeignFilePath(nodeName, nodePort, tableName); remoteFilePath = ForeignFilePath(nodeName, nodePort, schemaName, tableName);
if (remoteFilePath == NULL) if (remoteFilePath == NULL)
{ {
return false; return false;
@ -818,7 +830,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
} }
/* fetch the ddl commands needed to create the table */ /* fetch the ddl commands needed to create the table */
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
if (ddlCommandList == NIL) if (ddlCommandList == NIL)
{ {
return false; return false;
@ -826,7 +838,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
alterTableCommand = makeStringInfo(); alterTableCommand = makeStringInfo();
appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME,
tableName->data, localFilePath->data); qualifiedTableName, localFilePath->data);
ddlCommandList = lappend(ddlCommandList, alterTableCommand); ddlCommandList = lappend(ddlCommandList, alterTableCommand);
@ -853,15 +865,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName)
* the table. If an error occurs during fetching, return NULL. * the table. If an error occurs during fetching, return NULL.
*/ */
static const char * static const char *
RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName) RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
{ {
List *ownerList = NIL; List *ownerList = NIL;
StringInfo queryString = NULL; StringInfo queryString = NULL;
const char *escapedTableName = quote_literal_cstr(tableName->data); const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
StringInfo relationOwner; StringInfo relationOwner;
queryString = makeStringInfo(); queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_OWNER, escapedTableName); appendStringInfo(queryString, GET_TABLE_OWNER, qualifiedTableName);
ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
if (list_length(ownerList) != 1) if (list_length(ownerList) != 1)
@ -881,13 +894,15 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName)
* the function returns an empty list. * the function returns an empty list.
*/ */
List * List *
TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName) TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
{ {
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
StringInfo queryString = NULL; StringInfo queryString = NULL;
queryString = makeStringInfo(); queryString = makeStringInfo();
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName->data); appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, qualifiedTableName);
ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
return ddlCommandList; return ddlCommandList;
@ -900,14 +915,16 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName)
* null. * null.
*/ */
static StringInfo static StringInfo
ForeignFilePath(const char *nodeName, uint32 nodePort, StringInfo tableName) ForeignFilePath(const char *nodeName, uint32 nodePort, const char *schemaName,
const char *tableName)
{ {
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
List *foreignPathList = NIL; List *foreignPathList = NIL;
StringInfo foreignPathCommand = NULL; StringInfo foreignPathCommand = NULL;
StringInfo foreignPath = NULL; StringInfo foreignPath = NULL;
foreignPathCommand = makeStringInfo(); foreignPathCommand = makeStringInfo();
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName->data); appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, qualifiedTableName);
foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand); foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand);
if (foreignPathList != NIL) if (foreignPathList != NIL)
@ -1059,7 +1076,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
char *sourceTableName = NULL; char *sourceTableName = NULL;
char *sourceQualifiedName = NULL; char *sourceQualifiedName = NULL;
StringInfo shardNameString = NULL;
StringInfo localFilePath = NULL; StringInfo localFilePath = NULL;
StringInfo sourceCopyCommand = NULL; StringInfo sourceCopyCommand = NULL;
CopyStmt *localCopyCommand = NULL; CopyStmt *localCopyCommand = NULL;
@ -1079,10 +1095,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
* the transaction for this function commits, this lock will automatically * the transaction for this function commits, this lock will automatically
* be released. This ensures appends to a shard happen in a serial manner. * be released. This ensures appends to a shard happen in a serial manner.
*/ */
shardNameString = makeStringInfo(); shardId = ExtractShardId(shardTableName);
appendStringInfoString(shardNameString, shardTableName);
shardId = ExtractShardId(shardNameString);
LockShardResource(shardId, AccessExclusiveLock); LockShardResource(shardId, AccessExclusiveLock);
/* copy remote table's data to this node */ /* copy remote table's data to this node */

View File

@ -33,9 +33,9 @@
#define RESERVED_HASHED_COLUMN_ID MaxAttrNumber #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
#define MERGE_COLUMN_FORMAT "merge_column_%u" #define MERGE_COLUMN_FORMAT "merge_column_%u"
#define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \ #define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \
('%s', " UINT64_FORMAT ", '%s', '%s')" ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \ #define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \
('%s', " UINT64_FORMAT ", '%s', '%s')" ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \

View File

@ -50,7 +50,7 @@
#define FOREIGN_CACHED_FILE_PATH "pg_foreign_file/cached/%s" #define FOREIGN_CACHED_FILE_PATH "pg_foreign_file/cached/%s"
#define GET_TABLE_OWNER \ #define GET_TABLE_OWNER \
"SELECT rolname FROM pg_class JOIN pg_roles ON (pg_roles.oid = pg_class.relowner) " \ "SELECT rolname FROM pg_class JOIN pg_roles ON (pg_roles.oid = pg_class.relowner) " \
"WHERE pg_class.oid = %s::regclass" "WHERE pg_class.oid = '%s'::regclass"
#define GET_TABLE_DDL_EVENTS "SELECT master_get_table_ddl_events('%s')" #define GET_TABLE_DDL_EVENTS "SELECT master_get_table_ddl_events('%s')"
#define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')" #define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')"
#define FOREIGN_FILE_PATH_COMMAND "SELECT worker_foreign_file_path('%s')" #define FOREIGN_FILE_PATH_COMMAND "SELECT worker_foreign_file_path('%s')"
@ -120,7 +120,7 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
extern int32 ArrayObjectCount(ArrayType *arrayObject); extern int32 ArrayObjectCount(ArrayType *arrayObject);
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId); extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
StringInfo tableName); const char *schemaName, const char *tableName);
/* Function declarations shared with the master planner */ /* Function declarations shared with the master planner */
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);

View File

@ -390,7 +390,7 @@ ORDER BY
customer_keys.o_custkey DESC customer_keys.o_custkey DESC
LIMIT 10 OFFSET 20; LIMIT 10 OFFSET 20;
DEBUG: push down of limit count: 30 DEBUG: push down of limit count: 30
DEBUG: building index "pg_toast_16958_index" on table "pg_toast_16958" DEBUG: building index "pg_toast_16966_index" on table "pg_toast_16966"
o_custkey | total_order_count o_custkey | total_order_count
-----------+------------------- -----------+-------------------
1466 | 1 1466 | 1

View File

@ -18,6 +18,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -826,3 +826,222 @@ SELECT master_apply_delete_command('DELETE FROM nation_append') ;
\c - - - :worker_1_port \c - - - :worker_1_port
\d test_schema_support.nation_append_119* \d test_schema_support.nation_append_119*
\c - - - :master_port \c - - - :master_port
-- check joins of tables which are in schemas other than public
-- we create new tables with replication factor of 1
-- so that we guarantee to have repartitions when necessary
-- create necessary objects and load data to them
CREATE SCHEMA test_schema_support_join_1;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE SCHEMA test_schema_support_join_2;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE TABLE test_schema_support_join_1.nation_hash (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
CREATE TABLE test_schema_support_join_1.nation_hash_2 (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
CREATE TABLE test_schema_support_join_2.nation_hash (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
\COPY test_schema_support_join_1.nation_hash FROM STDIN with delimiter '|';
SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash_2', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash_2', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
\COPY test_schema_support_join_1.nation_hash_2 FROM STDIN with delimiter '|';
SELECT master_create_distributed_table('test_schema_support_join_2.nation_hash', 'n_nationkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('test_schema_support_join_2.nation_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
\COPY test_schema_support_join_2.nation_hash FROM STDIN with delimiter '|';
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_nationkey;
count
-------
6
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_nationkey;
count
-------
6
(1 row)
-- check when search_path is public,
-- join of two tables which are in same schemas,
-- join on partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_1.nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_nationkey;
count
-------
6
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_nationkey;
count
-------
6
(1 row)
-- single repartition joins
SET citus.task_executor_type TO "task-tracker";
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on partition column and non-partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_regionkey;
count
-------
6
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on partition column and non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_regionkey;
count
-------
6
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on partition column and non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_regionkey;
count
-------
6
(1 row)
-- hash repartition joins
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on non-partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_regionkey = n2.n_regionkey;
count
-------
14
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_regionkey = n2.n_regionkey;
count
-------
14
(1 row)
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_regionkey = n2.n_regionkey;
count
-------
14
(1 row)
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";

View File

@ -23,6 +23,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-1';
ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-2';
ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-3';
ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-4';
ALTER EXTENSION citus UPDATE TO '5.1-5';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;

View File

@ -547,3 +547,184 @@ SELECT master_apply_delete_command('DELETE FROM nation_append') ;
\d test_schema_support.nation_append_119* \d test_schema_support.nation_append_119*
\c - - - :master_port \c - - - :master_port
-- check joins of tables which are in schemas other than public
-- we create new tables with replication factor of 1
-- so that we guarantee to have repartitions when necessary
-- create necessary objects and load data to them
CREATE SCHEMA test_schema_support_join_1;
CREATE SCHEMA test_schema_support_join_2;
CREATE TABLE test_schema_support_join_1.nation_hash (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
CREATE TABLE test_schema_support_join_1.nation_hash_2 (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
CREATE TABLE test_schema_support_join_2.nation_hash (
n_nationkey integer not null,
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash', 4, 1);
\COPY test_schema_support_join_1.nation_hash FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash_2', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash_2', 4, 1);
\COPY test_schema_support_join_1.nation_hash_2 FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
SELECT master_create_distributed_table('test_schema_support_join_2.nation_hash', 'n_nationkey', 'hash');
SELECT master_create_worker_shards('test_schema_support_join_2.nation_hash', 4, 1);
\COPY test_schema_support_join_2.nation_hash FROM STDIN with delimiter '|';
0|ALGERIA|0|haggle. carefully final deposits detect slyly agai
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
5|ETHIOPIA|0|ven packages wake quickly. regu
\.
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_nationkey;
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_nationkey;
-- check when search_path is public,
-- join of two tables which are in same schemas,
-- join on partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_1.nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_nationkey;
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_nationkey;
-- single repartition joins
SET citus.task_executor_type TO "task-tracker";
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on partition column and non-partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_regionkey;
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on partition column and non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_nationkey = n2.n_regionkey;
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on partition column and non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_nationkey = n2.n_regionkey;
-- hash repartition joins
-- check when search_path is public,
-- join of two tables which are in different schemas,
-- join on non-partition column
SET search_path TO public;
SELECT
count (*)
FROM
test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_regionkey = n2.n_regionkey;
-- check when search_path is different than public,
-- join of two tables which are in different schemas,
-- join on non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, test_schema_support_join_2.nation_hash n2
WHERE
n1.n_regionkey = n2.n_regionkey;
-- check when search_path is different than public,
-- join of two tables which are in same schemas,
-- join on non-partition column
SET search_path TO test_schema_support_join_1;
SELECT
count (*)
FROM
nation_hash n1, nation_hash_2 n2
WHERE
n1.n_regionkey = n2.n_regionkey;
-- set task_executor back to real-time
SET citus.task_executor_type TO "real-time";