diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index cc92098a1..52716e05f 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -6,7 +6,7 @@ citus_top_builddir = ../../.. MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ - 5.1-1 5.1-2 5.1-3 5.1-4 + 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -41,6 +41,8 @@ $(EXTENSION)--5.1-3.sql: $(EXTENSION)--5.1-2.sql $(EXTENSION)--5.1-2--5.1-3.sql cat $^ > $@ $(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql cat $^ > $@ +$(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.1-4--5.1-5.sql b/src/backend/distributed/citus--5.1-4--5.1-5.sql new file mode 100644 index 000000000..8d0aa5a35 --- /dev/null +++ b/src/backend/distributed/citus--5.1-4--5.1-5.sql @@ -0,0 +1,17 @@ +DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]) + IS 'fetch foreign file from remote node and apply file'; + +DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[]) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[]) + IS 'fetch PostgreSQL table from remote node'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 5fa5179c8..297eb777c 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.1-4' +default_version = '5.1-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 3f1088fab..dd6a79033 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -896,18 +896,13 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) char *relationName = relation->relname; char *schemaName = relation->schemaname; - char *qualifiedName = quote_qualified_identifier(schemaName, relationName); - - /* fetch the ddl commands needed to create the table */ - StringInfo tableNameStringInfo = makeStringInfo(); - appendStringInfoString(tableNameStringInfo, qualifiedName); /* * The warning message created in TableDDLCommandList() is descriptive * enough; therefore, we just throw an error which says that we could not * run the copy operation. */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableNameStringInfo); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, relationName); if (ddlCommandList == NIL) { ereport(ERROR, (errmsg("could not run copy from the worker node"))); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index b43ee24ab..20429ba49 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -198,7 +198,12 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32 static List * RecreateTableDDLCommandList(Oid relationId) { - char *relationName = get_rel_name(relationId); + const char *relationName = get_rel_name(relationId); + Oid relationSchemaId = get_rel_namespace(relationId); + const char *relationSchemaName = get_namespace_name(relationSchemaId); + const char *qualifiedRelationName = quote_qualified_identifier(relationSchemaName, + relationName); + StringInfo dropCommand = makeStringInfo(); List *createCommandList = NIL; List *dropCommandList = NIL; @@ -209,12 +214,12 @@ RecreateTableDDLCommandList(Oid relationId) if (relationKind == RELKIND_RELATION) { appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND, - quote_identifier(relationName)); + qualifiedRelationName); } else if (relationKind == RELKIND_FOREIGN_TABLE) { appendStringInfo(dropCommand, DROP_FOREIGN_TABLE_COMMAND, - quote_identifier(relationName)); + qualifiedRelationName); } else { diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index bb63a0e1c..163853057 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3750,6 +3750,7 @@ ShardFetchQueryString(uint64 shardId) /* check storage type to create the correct query string */ ShardInterval *shardInterval = LoadShardInterval(shardId); char storageType = shardInterval->storageType; + char *shardSchemaName = NULL; char *shardTableName = NULL; /* @@ -3766,7 +3767,10 @@ ShardFetchQueryString(uint64 shardId) else { /* construct the shard name */ + Oid shardSchemaId = get_rel_namespace(shardInterval->relationId); char *tableName = get_rel_name(shardInterval->relationId); + + shardSchemaName = get_namespace_name(shardSchemaId); shardTableName = pstrdup(tableName); AppendShardIdToName(&shardTableName, shardId); } @@ -3776,13 +3780,13 @@ ShardFetchQueryString(uint64 shardId) storageType == SHARD_STORAGE_COLUMNAR) { appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, - shardTableName, shardLength, + shardSchemaName, shardTableName, shardLength, nodeNameArrayString->data, nodePortArrayString->data); } else if (storageType == SHARD_STORAGE_FOREIGN) { appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, - shardTableName, shardLength, + shardSchemaName, shardTableName, shardLength, nodeNameArrayString->data, nodePortArrayString->data); } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 72ada2cd2..f55a08919 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -53,20 +53,21 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort, static void ReceiveResourceCleanup(int32 connectionId, const char *filename, int32 fileDescriptor); static void DeleteFile(const char *filename); -static void FetchTableCommon(text *tableName, uint64 remoteTableSize, - ArrayType *nodeNameObject, ArrayType *nodePortObject, +static void FetchTableCommon(text *tableSchemaNameText, text *tableName, + uint64 remoteTableSize, ArrayType *nodeNameObject, + ArrayType *nodePortObject, bool (*FetchTableFunction)(const char *, uint32, - StringInfo)); + const char *, const char *)); static uint64 LocalTableSize(Oid relationId); -static uint64 ExtractShardId(StringInfo tableName); +static uint64 ExtractShardId(const char *tableName); static bool FetchRegularTable(const char *nodeName, uint32 nodePort, - StringInfo tableName); + const char *schemaName, const char *tableName); static bool FetchForeignTable(const char *nodeName, uint32 nodePort, - StringInfo tableName); + const char *schemaName, const char *tableName); static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort, - StringInfo tableName); + const char *schemaName, const char *tableName); static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, - StringInfo tableName); + const char *schemaName, const char *tableName); static bool check_log_statement(List *stmt_list); @@ -426,16 +427,17 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_fetch_regular_table(PG_FUNCTION_ARGS) { - text *regularTableName = PG_GETARG_TEXT_P(0); - uint64 generationStamp = PG_GETARG_INT64(1); - ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); - ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + text *regularSchemaName = PG_GETARG_TEXT_P(0); + text *regularTableName = PG_GETARG_TEXT_P(1); + uint64 generationStamp = PG_GETARG_INT64(2); + ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3); + ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4); /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. */ - FetchTableCommon(regularTableName, generationStamp, + FetchTableCommon(regularSchemaName, regularTableName, generationStamp, nodeNameObject, nodePortObject, &FetchRegularTable); PG_RETURN_VOID(); @@ -450,16 +452,17 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS) Datum worker_fetch_foreign_file(PG_FUNCTION_ARGS) { - text *foreignTableName = PG_GETARG_TEXT_P(0); - uint64 foreignFileSize = PG_GETARG_INT64(1); - ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); - ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); + text *foreignSchemaName = PG_GETARG_TEXT_P(0); + text *foreignTableName = PG_GETARG_TEXT_P(1); + uint64 foreignFileSize = PG_GETARG_INT64(2); + ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3); + ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4); /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. */ - FetchTableCommon(foreignTableName, foreignFileSize, + FetchTableCommon(foreignSchemaName, foreignTableName, foreignFileSize, nodeNameObject, nodePortObject, &FetchForeignTable); PG_RETURN_VOID(); @@ -473,12 +476,14 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS) * are retried in case of node failures. */ static void -FetchTableCommon(text *tableNameText, uint64 remoteTableSize, +FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTableSize, ArrayType *nodeNameObject, ArrayType *nodePortObject, - bool (*FetchTableFunction)(const char *, uint32, StringInfo)) + bool (*FetchTableFunction)(const char *, uint32, const char *, + const char *)) { - StringInfo tableName = NULL; - char *tableNameCString = NULL; + char *schemaName = NULL; + char *tableName = NULL; + Oid schemaId = InvalidOid; uint64 shardId = INVALID_SHARD_ID; Oid relationId = InvalidOid; uint32 nodeIndex = 0; @@ -496,9 +501,8 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize, " do not match", nodeNameCount, nodePortCount))); } - tableName = makeStringInfo(); - tableNameCString = text_to_cstring(tableNameText); - appendStringInfoString(tableName, tableNameCString); + schemaName = text_to_cstring(tableSchemaNameText); + tableName = text_to_cstring(tableNameText); /* * We lock on the shardId, but do not unlock. When the function returns, and @@ -510,7 +514,8 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize, LockShardResource(shardId, AccessExclusiveLock); /* check if we already fetched the table */ - relationId = RelnameGetRelid(tableName->data); + schemaId = get_namespace_oid(schemaName, true); + relationId = get_relname_relid(tableName, schemaId); if (relationId != InvalidOid) { uint64 localTableSize = 0; @@ -561,7 +566,7 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize, char *nodeName = TextDatumGetCString(nodeNameDatum); uint32 nodePort = DatumGetUInt32(nodePortDatum); - tableFetched = (*FetchTableFunction)(nodeName, nodePort, tableName); + tableFetched = (*FetchTableFunction)(nodeName, nodePort, schemaName, tableName); nodeIndex++; } @@ -569,7 +574,7 @@ FetchTableCommon(text *tableNameText, uint64 remoteTableSize, /* error out if we tried all nodes and could not fetch the table */ if (!tableFetched) { - ereport(ERROR, (errmsg("could not fetch relation: \"%s\"", tableName->data))); + ereport(ERROR, (errmsg("could not fetch relation: \"%s\"", tableName))); } } @@ -646,18 +651,18 @@ LocalTableSize(Oid relationId) /* Extracts shard id from the given table name, and returns it. */ static uint64 -ExtractShardId(StringInfo tableName) +ExtractShardId(const char *tableName) { uint64 shardId = 0; char *shardIdString = NULL; char *shardIdStringEnd = NULL; /* find the last underscore and increment for shardId string */ - shardIdString = strrchr(tableName->data, SHARD_NAME_SEPARATOR); + shardIdString = strrchr(tableName, SHARD_NAME_SEPARATOR); if (shardIdString == NULL) { ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", - tableName->data))); + tableName))); } shardIdString++; @@ -668,7 +673,7 @@ ExtractShardId(StringInfo tableName) if (errno != 0 || (*shardIdStringEnd != '\0')) { ereport(ERROR, (errmsg("could not extract shardId from table name \"%s\"", - tableName->data))); + tableName))); } #else ereport(ERROR, (errmsg("could not extract shardId from table name"), @@ -688,7 +693,8 @@ ExtractShardId(StringInfo tableName) * false. On other types of failures, the function errors out. */ static bool -FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) +FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, + const char *tableName) { StringInfo localFilePath = NULL; StringInfo remoteCopyCommand = NULL; @@ -700,7 +706,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) bool received = false; char *quotedTableName = NULL; StringInfo queryString = NULL; - const char *schemaName = NULL; const char *tableOwner = NULL; Oid tableOwnerId = InvalidOid; Oid savedUserId = InvalidOid; @@ -712,7 +717,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); - quotedTableName = quote_qualified_identifier(schemaName, tableName->data); + quotedTableName = quote_qualified_identifier(schemaName, tableName); remoteCopyCommand = makeStringInfo(); appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName); @@ -723,7 +728,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) } /* fetch the ddl commands needed to create the table */ - tableOwner = RemoteTableOwner(nodeName, nodePort, tableName); + tableOwner = RemoteTableOwner(nodeName, nodePort, schemaName, tableName); if (tableOwner == NULL) { return false; @@ -731,7 +736,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) tableOwnerId = get_role_oid(tableOwner, false); /* fetch the ddl commands needed to create the table */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName); if (ddlCommandList == NIL) { return false; @@ -763,7 +768,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) * directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook * into process utility to provide their custom COPY behavior. */ - localTable = makeRangeVar((char *) schemaName, tableName->data, -1); + localTable = makeRangeVar((char *) schemaName, (char *) tableName, -1); localCopyCommand = CopyStatement(localTable, localFilePath->data); queryString = makeStringInfo(); @@ -788,8 +793,10 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, StringInfo tableName) * commands against the local database, the function errors out. */ static bool -FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) +FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName, + const char *tableName) { + char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); StringInfo localFilePath = NULL; StringInfo remoteFilePath = NULL; StringInfo transmitCommand = NULL; @@ -798,11 +805,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) List *ddlCommandList = NIL; ListCell *ddlCommandCell = NULL; - /* fetch foreign file to this node in an idempotent manner */ + /* + * Fetch a foreign file to this node in an idempotent manner. It's OK that + * this file name lacks the schema, as the table name will have a shard id + * attached to it, which is unique (so conflicts are avoided even if two + * tables in different schemas have the same name). + */ localFilePath = makeStringInfo(); - appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName->data); + appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName); - remoteFilePath = ForeignFilePath(nodeName, nodePort, tableName); + remoteFilePath = ForeignFilePath(nodeName, nodePort, schemaName, tableName); if (remoteFilePath == NULL) { return false; @@ -818,7 +830,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) } /* fetch the ddl commands needed to create the table */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName); if (ddlCommandList == NIL) { return false; @@ -826,7 +838,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) alterTableCommand = makeStringInfo(); appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, - tableName->data, localFilePath->data); + qualifiedTableName, localFilePath->data); ddlCommandList = lappend(ddlCommandList, alterTableCommand); @@ -853,15 +865,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName) * the table. If an error occurs during fetching, return NULL. */ static const char * -RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName) +RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName, + const char *tableName) { List *ownerList = NIL; StringInfo queryString = NULL; - const char *escapedTableName = quote_literal_cstr(tableName->data); + const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); StringInfo relationOwner; queryString = makeStringInfo(); - appendStringInfo(queryString, GET_TABLE_OWNER, escapedTableName); + appendStringInfo(queryString, GET_TABLE_OWNER, qualifiedTableName); ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); if (list_length(ownerList) != 1) @@ -881,13 +894,15 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName) * the function returns an empty list. */ List * -TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName) +TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaName, + const char *tableName) { + const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); List *ddlCommandList = NIL; StringInfo queryString = NULL; queryString = makeStringInfo(); - appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName->data); + appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, qualifiedTableName); ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); return ddlCommandList; @@ -900,14 +915,16 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName) * null. */ static StringInfo -ForeignFilePath(const char *nodeName, uint32 nodePort, StringInfo tableName) +ForeignFilePath(const char *nodeName, uint32 nodePort, const char *schemaName, + const char *tableName) { + const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); List *foreignPathList = NIL; StringInfo foreignPathCommand = NULL; StringInfo foreignPath = NULL; foreignPathCommand = makeStringInfo(); - appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName->data); + appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, qualifiedTableName); foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand); if (foreignPathList != NIL) @@ -1059,7 +1076,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) char *sourceTableName = NULL; char *sourceQualifiedName = NULL; - StringInfo shardNameString = NULL; StringInfo localFilePath = NULL; StringInfo sourceCopyCommand = NULL; CopyStmt *localCopyCommand = NULL; @@ -1079,10 +1095,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) * the transaction for this function commits, this lock will automatically * be released. This ensures appends to a shard happen in a serial manner. */ - shardNameString = makeStringInfo(); - appendStringInfoString(shardNameString, shardTableName); - - shardId = ExtractShardId(shardNameString); + shardId = ExtractShardId(shardTableName); LockShardResource(shardId, AccessExclusiveLock); /* copy remote table's data to this node */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index a23083590..5261f2d04 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -33,9 +33,9 @@ #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber #define MERGE_COLUMN_FORMAT "merge_column_%u" #define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \ - ('%s', " UINT64_FORMAT ", '%s', '%s')" + ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" #define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \ - ('%s', " UINT64_FORMAT ", '%s', '%s')" + ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 8b2e69d8b..6a6019d18 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -50,7 +50,7 @@ #define FOREIGN_CACHED_FILE_PATH "pg_foreign_file/cached/%s" #define GET_TABLE_OWNER \ "SELECT rolname FROM pg_class JOIN pg_roles ON (pg_roles.oid = pg_class.relowner) " \ - "WHERE pg_class.oid = %s::regclass" + "WHERE pg_class.oid = '%s'::regclass" #define GET_TABLE_DDL_EVENTS "SELECT master_get_table_ddl_events('%s')" #define SET_FOREIGN_TABLE_FILENAME "ALTER FOREIGN TABLE %s OPTIONS (SET filename '%s')" #define FOREIGN_FILE_PATH_COMMAND "SELECT worker_foreign_file_path('%s')" @@ -120,7 +120,7 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject); extern int32 ArrayObjectCount(ArrayType *arrayObject); extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, - StringInfo tableName); + const char *schemaName, const char *tableName); /* Function declarations shared with the master planner */ extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index e2f4eee19..2e6bdc01f 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -390,7 +390,7 @@ ORDER BY customer_keys.o_custkey DESC LIMIT 10 OFFSET 20; DEBUG: push down of limit count: 30 -DEBUG: building index "pg_toast_16958_index" on table "pg_toast_16958" +DEBUG: building index "pg_toast_16966_index" on table "pg_toast_16966" o_custkey | total_order_count -----------+------------------- 1466 | 1 diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e74f1f88f..8c59e720c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -18,6 +18,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-1'; ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-4'; +ALTER EXTENSION citus UPDATE TO '5.1-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index fa2cd9e38..fe97e8414 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -826,3 +826,222 @@ SELECT master_apply_delete_command('DELETE FROM nation_append') ; \c - - - :worker_1_port \d test_schema_support.nation_append_119* \c - - - :master_port +-- check joins of tables which are in schemas other than public +-- we create new tables with replication factor of 1 +-- so that we guarantee to have repartitions when necessary +-- create necessary objects and load data to them +CREATE SCHEMA test_schema_support_join_1; +NOTICE: Citus partially supports CREATE SCHEMA for distributed databases +DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet +CREATE SCHEMA test_schema_support_join_2; +NOTICE: Citus partially supports CREATE SCHEMA for distributed databases +DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet +CREATE TABLE test_schema_support_join_1.nation_hash ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); +CREATE TABLE test_schema_support_join_1.nation_hash_2 ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); +CREATE TABLE test_schema_support_join_2.nation_hash ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); +SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash', 'n_nationkey', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +\COPY test_schema_support_join_1.nation_hash FROM STDIN with delimiter '|'; +SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash_2', 'n_nationkey', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash_2', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +\COPY test_schema_support_join_1.nation_hash_2 FROM STDIN with delimiter '|'; +SELECT master_create_distributed_table('test_schema_support_join_2.nation_hash', 'n_nationkey', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_schema_support_join_2.nation_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +\COPY test_schema_support_join_2.nation_hash FROM STDIN with delimiter '|'; +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + count +------- + 6 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + count +------- + 6 +(1 row) + +-- check when search_path is public, +-- join of two tables which are in same schemas, +-- join on partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_1.nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + count +------- + 6 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + count +------- + 6 +(1 row) + +-- single repartition joins +SET citus.task_executor_type TO "task-tracker"; +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on partition column and non-partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + count +------- + 6 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on partition column and non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + count +------- + 6 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on partition column and non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + count +------- + 6 +(1 row) + +-- hash repartition joins +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on non-partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + count +------- + 14 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + count +------- + 14 +(1 row) + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + count +------- + 14 +(1 row) + +-- set task_executor back to real-time +SET citus.task_executor_type TO "real-time"; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index b936177b2..1c1fcd376 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -23,6 +23,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-1'; ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-4'; +ALTER EXTENSION citus UPDATE TO '5.1-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 40770eb23..28a59d4fa 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -547,3 +547,184 @@ SELECT master_apply_delete_command('DELETE FROM nation_append') ; \d test_schema_support.nation_append_119* \c - - - :master_port + + +-- check joins of tables which are in schemas other than public +-- we create new tables with replication factor of 1 +-- so that we guarantee to have repartitions when necessary + +-- create necessary objects and load data to them +CREATE SCHEMA test_schema_support_join_1; +CREATE SCHEMA test_schema_support_join_2; + +CREATE TABLE test_schema_support_join_1.nation_hash ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); + +CREATE TABLE test_schema_support_join_1.nation_hash_2 ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); + +CREATE TABLE test_schema_support_join_2.nation_hash ( + n_nationkey integer not null, + n_name char(25) not null, + n_regionkey integer not null, + n_comment varchar(152)); + +SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash', 'n_nationkey', 'hash'); +SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash', 4, 1); + +\COPY test_schema_support_join_1.nation_hash FROM STDIN with delimiter '|'; +0|ALGERIA|0|haggle. carefully final deposits detect slyly agai +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5|ETHIOPIA|0|ven packages wake quickly. regu +\. + +SELECT master_create_distributed_table('test_schema_support_join_1.nation_hash_2', 'n_nationkey', 'hash'); +SELECT master_create_worker_shards('test_schema_support_join_1.nation_hash_2', 4, 1); + +\COPY test_schema_support_join_1.nation_hash_2 FROM STDIN with delimiter '|'; +0|ALGERIA|0|haggle. carefully final deposits detect slyly agai +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5|ETHIOPIA|0|ven packages wake quickly. regu +\. + +SELECT master_create_distributed_table('test_schema_support_join_2.nation_hash', 'n_nationkey', 'hash'); +SELECT master_create_worker_shards('test_schema_support_join_2.nation_hash', 4, 1); + +\COPY test_schema_support_join_2.nation_hash FROM STDIN with delimiter '|'; +0|ALGERIA|0|haggle. carefully final deposits detect slyly agai +1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon +2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special +3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold +4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d +5|ETHIOPIA|0|ven packages wake quickly. regu +\. + +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + +-- check when search_path is public, +-- join of two tables which are in same schemas, +-- join on partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_1.nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_nationkey; + +-- single repartition joins +SET citus.task_executor_type TO "task-tracker"; + +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on partition column and non-partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on partition column and non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on partition column and non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_nationkey = n2.n_regionkey; + +-- hash repartition joins + +-- check when search_path is public, +-- join of two tables which are in different schemas, +-- join on non-partition column +SET search_path TO public; +SELECT + count (*) +FROM + test_schema_support_join_1.nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + +-- check when search_path is different than public, +-- join of two tables which are in different schemas, +-- join on non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, test_schema_support_join_2.nation_hash n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + +-- check when search_path is different than public, +-- join of two tables which are in same schemas, +-- join on non-partition column +SET search_path TO test_schema_support_join_1; +SELECT + count (*) +FROM + nation_hash n1, nation_hash_2 n2 +WHERE + n1.n_regionkey = n2.n_regionkey; + +-- set task_executor back to real-time +SET citus.task_executor_type TO "real-time";