Address reviews

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-21 17:47:39 +03:00
parent 8a5c8c449f
commit ccea041d6a
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
16 changed files with 127 additions and 92 deletions

View File

@ -304,6 +304,26 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
}
}
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/*
* Ensure dependencies exist as we will create shell table on the other nodes
* in the MX case.
*/
EnsureDependenciesExistOnAllNodes(&tableAddress);
/*
* Make sure that existing reference tables have been replicated to all
* the nodes such that we can create foreign keys and joins work
@ -346,26 +366,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted);
ObjectAddress shellTableAddress = { 0 };
ObjectAddressSet(shellTableAddress, RelationRelationId, shellRelationId);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(shellRelationId, &attnumList,
&dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList,
attnumList);
/*
* Ensure dependencies exist as we will create shell table on the other nodes
* in the MX case.
*/
EnsureDependenciesExistOnAllNodes(&shellTableAddress);
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
}

View File

@ -432,6 +432,26 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
}
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
viaDeprecatedAPI);
@ -483,26 +503,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel, autoConverted);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
/* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId))
{
@ -594,11 +594,15 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
* If any other distributed table uses the input sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured.
* Since the owner of the sequence may not distributed yet, it should be added
* explicitly.
*/
void
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
citusTableIdList = list_append_unique_oid(citusTableIdList, ownerRelationId);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
@ -742,7 +746,7 @@ EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceLis
* that sequence is supported
*/
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(sequenceOid, seqTypId);
EnsureSequenceTypeSupported(sequenceOid, seqTypId, relationId);
/*
* Alter the sequence's data type in the coordinator if needed.

View File

@ -108,6 +108,7 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncObjectDependenciesToNode(WorkerNode *workerNode);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode);
static List * InterTableRelationshipCommandList();
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -758,7 +759,7 @@ SyncObjectDependenciesCommandList(WorkerNode *workerNode)
* Detach partitions and remove shell tables first.
*/
commandList = list_concat(commandList, DetachPartitionCommandList());
commandList = lappend(commandList, REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND);
commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
/*
* Propagate node wide objects. It includes only roles for now.
@ -766,11 +767,8 @@ SyncObjectDependenciesCommandList(WorkerNode *workerNode)
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
/*
* Replicate all objects of the pg_dist_object to the remote node. We need to
* update local group id first, as sequence replication logic depends on it.
* Replicate all objects of the pg_dist_object to the remote node.
*/
commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(
workerNode->groupId)));
commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList(
workerNode->workerName, workerNode->workerPort));
@ -817,6 +815,27 @@ SyncObjectDependenciesToNode(WorkerNode *workerNode)
}
/*
* UpdateLocalGroupIdOnNode updates local group id on node.
*/
static void
UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
List *commandList = list_make1(LocalGroupIdUpdateCommand(workerNode->groupId));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
commandList);
}
}
/*
* SyncPgDistTableMetadataToNode syncs the pg_dist_partition, pg_dist_shard
* pg_dist_placement and pg_dist_object metadata entries.
@ -1099,6 +1118,12 @@ ActivateNode(char *nodeName, int nodePort)
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
/*
* Sync object dependencies first. We must sync object dependencies before
* replicating reference tables to the remote node, as reference tables may

View File

@ -12,7 +12,7 @@
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
#include "udfs/citus_run_local_command/11.0-1.sql"
#include "udfs/worker_drop_sequence_dependency/11.0-1.sql"
#include "udfs/worker_drop_distributed_table_only/11.0-1.sql"
#include "udfs/worker_drop_shell_table/11.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);

View File

@ -47,7 +47,7 @@ DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean);
DROP FUNCTION pg_catalog.citus_run_local_command(text);
DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text);
DROP FUNCTION pg_catalog.worker_drop_distributed_table_only(table_name text);
DROP FUNCTION pg_catalog.worker_drop_shell_table(table_name text);
CREATE OR REPLACE VIEW pg_catalog.citus_shards_on_worker AS
SELECT n.nspname as "Schema",

View File

@ -1,7 +0,0 @@
CREATE FUNCTION pg_catalog.worker_drop_distributed_table_only(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_only$$;
COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text)
IS 'drop the distributed table only without the metadata';

View File

@ -1,7 +0,0 @@
CREATE FUNCTION pg_catalog.worker_drop_distributed_table_only(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table_only$$;
COMMENT ON FUNCTION worker_drop_distributed_table_only(table_name text)
IS 'drop the distributed table only without the metadata';

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.worker_drop_shell_table(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_shell_table$$;
COMMENT ON FUNCTION worker_drop_shell_table(table_name text)
IS 'drop the distributed table only without the metadata';

View File

@ -0,0 +1,7 @@
CREATE FUNCTION pg_catalog.worker_drop_shell_table(table_name text)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_drop_shell_table$$;
COMMENT ON FUNCTION worker_drop_shell_table(table_name text)
IS 'drop the distributed table only without the metadata';

View File

@ -48,6 +48,8 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
*/
WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode();
List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *syncObjectDepCommands = SyncObjectDependenciesCommandList(dummyWorkerNode);
List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands();
@ -57,6 +59,7 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
int activateNodeCommandIndex = 0;
Oid ddlCommandTypeId = TEXTOID;
activateNodeCommandList = list_concat(activateNodeCommandList, updateLocalGroupCommand);
activateNodeCommandList = list_concat(activateNodeCommandList, syncObjectDepCommands);
activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,

View File

@ -34,7 +34,7 @@
#include "utils/fmgroids.h"
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
PG_FUNCTION_INFO_V1(worker_drop_distributed_table_only);
PG_FUNCTION_INFO_V1(worker_drop_shell_table);
PG_FUNCTION_INFO_V1(worker_drop_sequence_dependency);
@ -105,8 +105,6 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
UnmarkObjectDistributed(&ownedSequenceAddress);
}
UnmarkObjectDistributed(&distributedTableObject);
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
/*
@ -146,10 +144,13 @@ worker_drop_distributed_table(PG_FUNCTION_ARGS)
/*
* worker_drop_distributed_table_only drops the distributed table with the given oid.
* worker_drop_shell_table drops the shell table of with the given distributed
* table without deleting related entries on pg_dist_placement, pg_dist_shard
* and pg_dist_placement. We've separated that logic since we handle object
* dependencies and table metadata separately while activating nodes.
*/
Datum
worker_drop_distributed_table_only(PG_FUNCTION_ARGS)
worker_drop_shell_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
@ -178,6 +179,11 @@ worker_drop_distributed_table_only(PG_FUNCTION_ARGS)
distributedTableObject.objectId = relationId;
distributedTableObject.objectSubId = 0;
if (IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
PG_RETURN_VOID();;
}
/* Drop dependent sequences from pg_dist_object */
#if PG_VERSION_NUM >= PG_VERSION_13
List *ownedSequences = getOwnedSequences(relationId);
@ -193,18 +199,15 @@ worker_drop_distributed_table_only(PG_FUNCTION_ARGS)
UnmarkObjectDistributed(&ownedSequenceAddress);
}
if (!IsObjectAddressOwnedByExtension(&distributedTableObject, NULL))
{
/*
* If the table is owned by an extension, we cannot drop it, nor should we
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
* table and only delete the metadata.
*
* We drop the table with cascade since other tables may be referring to it.
*/
performDeletion(&distributedTableObject, DROP_CASCADE,
PERFORM_DELETION_INTERNAL);
}
/*
* If the table is owned by an extension, we cannot drop it, nor should we
* until the user runs DROP EXTENSION. Therefore, we skip dropping the
* table and only delete the metadata.
*
* We drop the table with cascade since other tables may be referring to it.
*/
performDeletion(&distributedTableObject, DROP_CASCADE,
PERFORM_DELETION_INTERNAL);
PG_RETURN_VOID();
}

View File

@ -80,8 +80,8 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard CASCADE"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
#define DELETE_ALL_PARTITIONS "DELETE FROM pg_dist_partition CASCADE"
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
#define REMOVE_ALL_SHELL_TABLES_COMMAND \
"SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \

View File

@ -286,7 +286,7 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes,
uint64 *totalBytes);
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
List *sequenceList);

View File

@ -1011,8 +1011,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_shard_indexes_on_worker() SETOF record
| function citus_shards_on_worker() SETOF record
| function create_distributed_function(regprocedure,text,text,boolean) void
| function worker_drop_distributed_table_only(text) void
| function worker_drop_sequence_dependency(text) void
| function worker_drop_shell_table(text) void
(15 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;

View File

@ -72,7 +72,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -141,7 +141,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_create_truncate_trigger('public.mx_test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -187,7 +187,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_create_truncate_trigger('public.mx_test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -235,7 +235,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -289,7 +289,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -336,7 +336,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
SELECT worker_create_or_alter_role('postgres', null, 'ALTER ROLE postgres SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 0 PASSWORD ''md5c53670dddfc3bb4b5675c7872bc2249a'' VALID UNTIL ''2052-05-05 00:00:00-07''')
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'
@ -1873,7 +1873,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT worker_create_truncate_trigger('public.dist_table_1')
SELECT worker_create_truncate_trigger('public.mx_ref')
SELECT worker_create_truncate_trigger('public.test_table')
SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition
SET ROLE postgres
SET ROLE postgres
SET citus.enable_ddl_propagation TO 'off'

View File

@ -206,8 +206,8 @@ ORDER BY 1;
function worker_create_schema(bigint,text)
function worker_create_truncate_trigger(regclass)
function worker_drop_distributed_table(text)
function worker_drop_distributed_table_only(text)
function worker_drop_sequence_dependency(text)
function worker_drop_shell_table(text)
function worker_fetch_foreign_file(text,text,bigint,text[],integer[])
function worker_fetch_partition_file(bigint,integer,integer,integer,text,integer)
function worker_fix_partition_shard_index_names(regclass,text,text)