diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 52716e05f..9005365cb 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -6,7 +6,7 @@ citus_top_builddir = ../../.. MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ - 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 + 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -43,6 +43,10 @@ $(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql cat $^ > $@ $(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql cat $^ > $@ +$(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql + cat $^ > $@ +$(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.1-5--5.1-6.sql b/src/backend/distributed/citus--5.1-5--5.1-6.sql new file mode 100644 index 000000000..cf1b32321 --- /dev/null +++ b/src/backend/distributed/citus--5.1-5--5.1-6.sql @@ -0,0 +1,26 @@ +CREATE OR REPLACE FUNCTION pg_catalog.worker_apply_shard_ddl_command(bigint, text) + RETURNS void + LANGUAGE sql +AS $worker_apply_shard_ddl_command$ + SELECT pg_catalog.worker_apply_shard_ddl_command($1, 'public', $2); +$worker_apply_shard_ddl_command$; +COMMENT ON FUNCTION worker_apply_shard_ddl_command(bigint, text) + IS 'extend ddl command with shardId and apply on database'; + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]) + RETURNS void + LANGUAGE sql +AS $worker_fetch_foreign_file$ + SELECT pg_catalog.worker_fetch_foreign_file('public', $1, $2, $3, $4); +$worker_fetch_foreign_file$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]) + IS 'fetch foreign file from remote node and apply file'; + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]) + RETURNS void + LANGUAGE sql +AS $worker_fetch_regular_table$ + SELECT pg_catalog.worker_fetch_regular_table('public', $1, $2, $3, $4); +$worker_fetch_regular_table$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]) + IS 'fetch PostgreSQL table from remote node'; diff --git a/src/backend/distributed/citus--5.1-6--5.1-7.sql b/src/backend/distributed/citus--5.1-6--5.1-7.sql new file mode 100644 index 000000000..a6752d7b2 --- /dev/null +++ b/src/backend/distributed/citus--5.1-6--5.1-7.sql @@ -0,0 +1,17 @@ +DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[]) + IS 'fetch foreign file from remote node and apply file'; + +DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[]); + +CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$; +COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[]) + IS 'fetch PostgreSQL table from remote node'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 297eb777c..f34725785 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.1-5' +default_version = '5.1-7' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 97b6dca1e..2d32f464e 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -954,13 +954,14 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) char *relationName = relation->relname; char *schemaName = relation->schemaname; + char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName); /* * The warning message created in TableDDLCommandList() is descriptive * enough; therefore, we just throw an error which says that we could not * run the copy operation. */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, relationName); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName); if (ddlCommandList == NIL) { ereport(ERROR, (errmsg("could not run copy from the worker node"))); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index b2892f4b8..e4ae01c73 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -441,7 +441,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); - char *escapedSchemaName = quote_literal_cstr(schemaName); bool shardCreated = true; ListCell *ddlCommandCell = NULL; @@ -452,8 +451,19 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, List *queryResultList = NIL; StringInfo applyDDLCommand = makeStringInfo(); - appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, - escapedSchemaName, escapedDDLCommand); + if (strcmp(schemaName, "public") != 0) + { + char *escapedSchemaName = quote_literal_cstr(schemaName); + + appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, + escapedSchemaName, escapedDDLCommand); + } + else + { + appendStringInfo(applyDDLCommand, + WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId, + escapedDDLCommand); + } queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner, applyDDLCommand); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 163853057..071b2d2d8 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -3779,15 +3779,39 @@ ShardFetchQueryString(uint64 shardId) if (storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_RELAY || storageType == SHARD_STORAGE_COLUMNAR) { - appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, - shardSchemaName, shardTableName, shardLength, - nodeNameArrayString->data, nodePortArrayString->data); + if (strcmp(shardSchemaName, "public") != 0) + { + char *qualifiedTableName = quote_qualified_identifier(shardSchemaName, + shardTableName); + + appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, qualifiedTableName, + shardLength, nodeNameArrayString->data, + nodePortArrayString->data); + } + else + { + appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, shardTableName, + shardLength, nodeNameArrayString->data, + nodePortArrayString->data); + } } else if (storageType == SHARD_STORAGE_FOREIGN) { - appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, - shardSchemaName, shardTableName, shardLength, - nodeNameArrayString->data, nodePortArrayString->data); + if (strcmp(shardSchemaName, "public") != 0) + { + char *qualifiedTableName = quote_qualified_identifier(shardSchemaName, + shardTableName); + + appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, qualifiedTableName, + shardLength, nodeNameArrayString->data, + nodePortArrayString->data); + } + else + { + appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, shardTableName, + shardLength, nodeNameArrayString->data, + nodePortArrayString->data); + } } return shardFetchQuery; diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index f55a08919..f39eeb4da 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -53,21 +53,20 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort, static void ReceiveResourceCleanup(int32 connectionId, const char *filename, int32 fileDescriptor); static void DeleteFile(const char *filename); -static void FetchTableCommon(text *tableSchemaNameText, text *tableName, - uint64 remoteTableSize, ArrayType *nodeNameObject, - ArrayType *nodePortObject, +static void FetchTableCommon(text *tableName, uint64 remoteTableSize, + ArrayType *nodeNameObject, ArrayType *nodePortObject, bool (*FetchTableFunction)(const char *, uint32, - const char *, const char *)); + const char *)); static uint64 LocalTableSize(Oid relationId); static uint64 ExtractShardId(const char *tableName); static bool FetchRegularTable(const char *nodeName, uint32 nodePort, - const char *schemaName, const char *tableName); + const char *tableName); static bool FetchForeignTable(const char *nodeName, uint32 nodePort, - const char *schemaName, const char *tableName); + const char *tableName); static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort, - const char *schemaName, const char *tableName); + const char *tableName); static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, - const char *schemaName, const char *tableName); + const char *tableName); static bool check_log_statement(List *stmt_list); @@ -427,18 +426,17 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) Datum worker_fetch_regular_table(PG_FUNCTION_ARGS) { - text *regularSchemaName = PG_GETARG_TEXT_P(0); - text *regularTableName = PG_GETARG_TEXT_P(1); - uint64 generationStamp = PG_GETARG_INT64(2); - ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3); - ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4); + text *regularTableName = PG_GETARG_TEXT_P(0); + uint64 generationStamp = PG_GETARG_INT64(1); + ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); + ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. */ - FetchTableCommon(regularSchemaName, regularTableName, generationStamp, - nodeNameObject, nodePortObject, &FetchRegularTable); + FetchTableCommon(regularTableName, generationStamp, nodeNameObject, nodePortObject, + &FetchRegularTable); PG_RETURN_VOID(); } @@ -452,18 +450,17 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS) Datum worker_fetch_foreign_file(PG_FUNCTION_ARGS) { - text *foreignSchemaName = PG_GETARG_TEXT_P(0); - text *foreignTableName = PG_GETARG_TEXT_P(1); - uint64 foreignFileSize = PG_GETARG_INT64(2); - ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3); - ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4); + text *foreignTableName = PG_GETARG_TEXT_P(0); + uint64 foreignFileSize = PG_GETARG_INT64(1); + ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2); + ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3); /* * Run common logic to fetch the remote table, and use the provided function * pointer to perform the actual table fetching. */ - FetchTableCommon(foreignSchemaName, foreignTableName, foreignFileSize, - nodeNameObject, nodePortObject, &FetchForeignTable); + FetchTableCommon(foreignTableName, foreignFileSize, nodeNameObject, nodePortObject, + &FetchForeignTable); PG_RETURN_VOID(); } @@ -476,18 +473,17 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS) * are retried in case of node failures. */ static void -FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTableSize, +FetchTableCommon(text *tableNameText, uint64 remoteTableSize, ArrayType *nodeNameObject, ArrayType *nodePortObject, - bool (*FetchTableFunction)(const char *, uint32, const char *, - const char *)) + bool (*FetchTableFunction)(const char *, uint32, const char *)) { - char *schemaName = NULL; - char *tableName = NULL; - Oid schemaId = InvalidOid; uint64 shardId = INVALID_SHARD_ID; Oid relationId = InvalidOid; + List *relationNameList = NIL; + RangeVar *relation = NULL; uint32 nodeIndex = 0; bool tableFetched = false; + char *tableName = text_to_cstring(tableNameText); Datum *nodeNameArray = DeconstructArrayObject(nodeNameObject); Datum *nodePortArray = DeconstructArrayObject(nodePortObject); @@ -501,9 +497,6 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa " do not match", nodeNameCount, nodePortCount))); } - schemaName = text_to_cstring(tableSchemaNameText); - tableName = text_to_cstring(tableNameText); - /* * We lock on the shardId, but do not unlock. When the function returns, and * the transaction for this function commits, this lock will automatically @@ -513,9 +506,11 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa shardId = ExtractShardId(tableName); LockShardResource(shardId, AccessExclusiveLock); + relationNameList = textToQualifiedNameList(tableNameText); + relation = makeRangeVarFromNameList(relationNameList); + relationId = RangeVarGetRelid(relation, NoLock, true); + /* check if we already fetched the table */ - schemaId = get_namespace_oid(schemaName, true); - relationId = get_relname_relid(tableName, schemaId); if (relationId != InvalidOid) { uint64 localTableSize = 0; @@ -566,7 +561,7 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa char *nodeName = TextDatumGetCString(nodeNameDatum); uint32 nodePort = DatumGetUInt32(nodePortDatum); - tableFetched = (*FetchTableFunction)(nodeName, nodePort, schemaName, tableName); + tableFetched = (*FetchTableFunction)(nodeName, nodePort, tableName); nodeIndex++; } @@ -693,8 +688,7 @@ ExtractShardId(const char *tableName) * false. On other types of failures, the function errors out. */ static bool -FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, - const char *tableName) +FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName) { StringInfo localFilePath = NULL; StringInfo remoteCopyCommand = NULL; @@ -704,12 +698,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, RangeVar *localTable = NULL; uint64 shardId = 0; bool received = false; - char *quotedTableName = NULL; StringInfo queryString = NULL; const char *tableOwner = NULL; Oid tableOwnerId = InvalidOid; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; + List *tableNameList = NIL; /* copy remote table's data to this node in an idempotent manner */ shardId = ExtractShardId(tableName); @@ -717,9 +711,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); - quotedTableName = quote_qualified_identifier(schemaName, tableName); remoteCopyCommand = makeStringInfo(); - appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName); + appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName); received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath); if (!received) @@ -728,7 +721,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, } /* fetch the ddl commands needed to create the table */ - tableOwner = RemoteTableOwner(nodeName, nodePort, schemaName, tableName); + tableOwner = RemoteTableOwner(nodeName, nodePort, tableName); if (tableOwner == NULL) { return false; @@ -736,7 +729,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, tableOwnerId = get_role_oid(tableOwner, false); /* fetch the ddl commands needed to create the table */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); if (ddlCommandList == NIL) { return false; @@ -768,11 +761,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, * directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook * into process utility to provide their custom COPY behavior. */ - localTable = makeRangeVar((char *) schemaName, (char *) tableName, -1); + tableNameList = stringToQualifiedNameList(tableName); + localTable = makeRangeVarFromNameList(tableNameList); localCopyCommand = CopyStatement(localTable, localFilePath->data); queryString = makeStringInfo(); - appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data); + appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data); ProcessUtility((Node *) localCopyCommand, queryString->data, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); @@ -793,10 +787,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName, * commands against the local database, the function errors out. */ static bool -FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName, - const char *tableName) +FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName) { - char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); StringInfo localFilePath = NULL; StringInfo remoteFilePath = NULL; StringInfo transmitCommand = NULL; @@ -814,7 +806,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName, localFilePath = makeStringInfo(); appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName); - remoteFilePath = ForeignFilePath(nodeName, nodePort, schemaName, tableName); + remoteFilePath = ForeignFilePath(nodeName, nodePort, tableName); if (remoteFilePath == NULL) { return false; @@ -830,15 +822,15 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName, } /* fetch the ddl commands needed to create the table */ - ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName); + ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName); if (ddlCommandList == NIL) { return false; } alterTableCommand = makeStringInfo(); - appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, - qualifiedTableName, localFilePath->data); + appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, tableName, + localFilePath->data); ddlCommandList = lappend(ddlCommandList, alterTableCommand); @@ -865,16 +857,14 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName, * the table. If an error occurs during fetching, return NULL. */ static const char * -RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName, - const char *tableName) +RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *tableName) { List *ownerList = NIL; StringInfo queryString = NULL; - const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); StringInfo relationOwner; queryString = makeStringInfo(); - appendStringInfo(queryString, GET_TABLE_OWNER, qualifiedTableName); + appendStringInfo(queryString, GET_TABLE_OWNER, tableName); ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); if (list_length(ownerList) != 1) @@ -894,15 +884,13 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName, * the function returns an empty list. */ List * -TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaName, - const char *tableName) +TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName) { - const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); List *ddlCommandList = NIL; StringInfo queryString = NULL; queryString = makeStringInfo(); - appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, qualifiedTableName); + appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName); ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString); return ddlCommandList; @@ -915,16 +903,14 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaNam * null. */ static StringInfo -ForeignFilePath(const char *nodeName, uint32 nodePort, const char *schemaName, - const char *tableName) +ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName) { - const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName); List *foreignPathList = NIL; StringInfo foreignPathCommand = NULL; StringInfo foreignPath = NULL; foreignPathCommand = makeStringInfo(); - appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, qualifiedTableName); + appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName); foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand); if (foreignPathList != NIL) diff --git a/src/bin/csql/stage.c b/src/bin/csql/stage.c index 74af60de9..eed017978 100644 --- a/src/bin/csql/stage.c +++ b/src/bin/csql/stage.c @@ -1387,23 +1387,15 @@ static bool ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand) { const char *remoteCommand = APPLY_SHARD_DDL_COMMAND; - const char *parameterValue[3]; - const int parameterCount = 3; + const char *parameterValue[2]; + const int parameterCount = 2; PGresult *ddlResult = NULL; char shardIdString[NAMEDATALEN]; snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId); - /* - * We changed worker_apply_shard_ddl_command and now it requires schema name. Since - * \STAGE will be deprecated anyway, we use public schema for everything to make it - * work with worker_apply_shard_ddl_command. Please note that if user specifies - * schema name, this will not override it, because we prioritize schema names given - * in the query in worker_apply_shard_ddl_command. - */ parameterValue[0] = shardIdString; - parameterValue[1] = "public"; - parameterValue[2] = ddlCommand; + parameterValue[1] = ddlCommand; ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand, parameterValue, parameterCount); diff --git a/src/bin/csql/stage.h b/src/bin/csql/stage.h index 818b6bdbb..821d62ccd 100644 --- a/src/bin/csql/stage.h +++ b/src/bin/csql/stage.h @@ -65,7 +65,7 @@ "SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \ "WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';" #define APPLY_SHARD_DDL_COMMAND \ - "SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text, $3::text)" + "SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)" #define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')" #define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')" diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index f1a0a8a42..c0ee27ee1 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -51,6 +51,8 @@ /* Remote call definitions to help with data staging and deletion */ #define WORKER_APPLY_SHARD_DDL_COMMAND \ "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)" +#define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \ + "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)" #define WORKER_APPEND_TABLE_TO_SHARD \ "SELECT worker_append_table_to_shard (%s, %s, %s, %u)" #define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s" diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 5261f2d04..a23083590 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -33,9 +33,9 @@ #define RESERVED_HASHED_COLUMN_ID MaxAttrNumber #define MERGE_COLUMN_FORMAT "merge_column_%u" #define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \ - ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" + ('%s', " UINT64_FORMAT ", '%s', '%s')" #define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \ - ('%s', '%s', " UINT64_FORMAT ", '%s', '%s')" + ('%s', " UINT64_FORMAT ", '%s', '%s')" #define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \ (" UINT64_FORMAT ", %u, %u, %u, '%s', %u)" #define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 6a6019d18..49cd1ec48 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -120,7 +120,7 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject); extern int32 ArrayObjectCount(ArrayType *arrayObject); extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId); extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, - const char *schemaName, const char *tableName); + const char *tableName); /* Function declarations shared with the master planner */ extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 2e6bdc01f..1354f4c00 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -390,7 +390,7 @@ ORDER BY customer_keys.o_custkey DESC LIMIT 10 OFFSET 20; DEBUG: push down of limit count: 30 -DEBUG: building index "pg_toast_16966_index" on table "pg_toast_16966" +DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977" o_custkey | total_order_count -----------+------------------- 1466 | 1 diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8c59e720c..d4d8b1b27 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -19,6 +19,8 @@ ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-5'; +ALTER EXTENSION citus UPDATE TO '5.1-6'; +ALTER EXTENSION citus UPDATE TO '5.1-7'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 1c1fcd376..14cafa7d8 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -24,6 +24,8 @@ ALTER EXTENSION citus UPDATE TO '5.1-2'; ALTER EXTENSION citus UPDATE TO '5.1-3'; ALTER EXTENSION citus UPDATE TO '5.1-4'; ALTER EXTENSION citus UPDATE TO '5.1-5'; +ALTER EXTENSION citus UPDATE TO '5.1-6'; +ALTER EXTENSION citus UPDATE TO '5.1-7'; -- drop extension an re-create in newest version DROP EXTENSION citus;