diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 9951bd1cf..ce9f1bf9f 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1947,6 +1947,44 @@ DeletePartitionRow(Oid distributedRelationId) } +void +DeleteShardgroupRow(uint32 shardgroupId) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + + Relation pgDistShardgroup = table_open(DistShardgroupRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_shardid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistShardgroup, + DistShardShardidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard " + UINT64_FORMAT, shardId))); + } + + Form_pg_dist_shard pgDistShardForm = (Form_pg_dist_shard) GETSTRUCT(heapTuple); + Oid distributedRelationId = pgDistShardForm->logicalrelid; + + simple_heap_delete(pgDistShardgroup, &heapTuple->t_self); + + systable_endscan(scanDescriptor); + + /* invalidate previous cache entry */ + CitusInvalidateRelcacheByRelid(distributedRelationId); + + CommandCounterIncrement(); + table_close(pgDistShardgroup, NoLock); +} + + /* * DeleteShardRow opens the shard system catalog, finds the unique row that has * the given shardId, and deletes this row. diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 6678fbdb1..d1ea324d5 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -92,19 +92,22 @@ static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitInterval static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); -static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, - List *splitPointsForShard); -static void CreateSplitIntervalsForShard(ShardInterval *sourceShard, - List *splitPointsForShard, - List **shardSplitChildrenIntervalList); +static List * CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, + List *splitPointsForShard); +static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, + List *newShardgroups); +static List * CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard, + List *newShardgroups); static void BlockingShardSplit(SplitOperation splitOperation, uint64 splitWorkflowId, + uint32 colocationId, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, DistributionColumnMap *distributionColumnOverrides); static void NonBlockingShardSplit(SplitOperation splitOperation, uint64 splitWorkflowId, + uint32 colocationId, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, @@ -127,6 +130,7 @@ static void UpdateDistributionColumnsForShardGroup(List *colocatedShardList, char distributionMethod, int shardCount, uint32 colocationId); +static void InsertSplitChildrenShardgroupMetadata(List *newShardgroups); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreatePartitioningHierarchyForBlockingSplit( @@ -155,6 +159,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 static uint64 GetNextShardIdForSplitChild(void); static void AcquireNonblockingSplitLock(Oid relationId); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); +static void DropShardgroupMetadata(uint32 shardgroupId); static void DropShardListMetadata(List *shardIntervalList); /* Customize error message strings based on operation type */ @@ -493,6 +498,10 @@ SplitShard(SplitMode splitMode, { sourceColocatedShardIntervalList = colocatedShardIntervalList; } + + /* Find the colocationId we are splitting in */ + CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId); + uint32 colocationId = citusTableCacheEntry->colocationId; DropOrphanedResourcesInSeparateTransaction(); @@ -509,6 +518,7 @@ SplitShard(SplitMode splitMode, BlockingShardSplit( splitOperation, splitWorkflowId, + colocationId, sourceColocatedShardIntervalList, shardSplitPointsList, workersForPlacementList, @@ -521,6 +531,7 @@ SplitShard(SplitMode splitMode, NonBlockingShardSplit( splitOperation, splitWorkflowId, + colocationId, sourceColocatedShardIntervalList, shardSplitPointsList, workersForPlacementList, @@ -549,6 +560,7 @@ SplitShard(SplitMode splitMode, static void BlockingShardSplit(SplitOperation splitOperation, uint64 splitWorkflowId, + uint32 colocationId, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, @@ -558,10 +570,17 @@ BlockingShardSplit(SplitOperation splitOperation, BlockWritesToShardList(sourceColocatedShardIntervalList); + ShardInterval *sampleShardInterval = + (ShardInterval *) linitial(sourceColocatedShardIntervalList); + List *shardgroupSplits = CreateNewShardgroups( + colocationId, + sampleShardInterval, + shardSplitPointsList); + /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( sourceColocatedShardIntervalList, - shardSplitPointsList); + shardgroupSplits); /* Only single placement allowed (already validated RelationReplicationFactor = 1) */ ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList); @@ -611,9 +630,10 @@ BlockingShardSplit(SplitOperation splitOperation, InsertDeferredDropCleanupRecordsForShards(sourceColocatedShardIntervalList); + DropShardgroupMetadata(sampleShardInterval->shardGroupId); DropShardListMetadata(sourceColocatedShardIntervalList); - /* Insert new shard and placement metdata */ + InsertSplitChildrenShardgroupMetadata(shardgroupSplits); InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1010,28 +1030,82 @@ CreateObjectOnPlacement(List *objectCreationCommandList, /* - * Create split children intervals for a shardgroup given list of split points. + * CreateNewShardgroups returns new shardgroup structs given a list of splitpoints * Example: - * 'sourceColocatedShardIntervalList': Colocated shard S1[-2147483648, 2147483647] & S2[-2147483648, 2147483647] + * 'sourceColocatedShardIntervalList': sampleInterval S1[-2147483648, 2147483647] * 'splitPointsForShard': [0] (2 way split) * 'shardGroupSplitIntervalListList': - * [ - * [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ], // Split Interval List for S1. - * [ S2_1(-2147483648, 0), S2_2(1, 2147483647) ] // Split Interval List for S2. - * ] + * [ S1_1(-2147483648, 0), S1_2(1, 2147483647) ] + * TODO fix comment above to reflect shardgroups instead of colocated tables + */ +static List * +CreateNewShardgroups(uint32 colocationId, ShardInterval *sampleInterval, + List *splitPointsForShard) +{ + + int32 splitParentMaxValue = DatumGetInt32(sampleInterval->maxValue); + int32 currentSplitChildMinValue = DatumGetInt32(sampleInterval->minValue); + + /* if we are splitting a Citus local table, assume whole shard range */ + /* TODO before splitting a local table we should assign it the hashrange correctly */ + if (!sampleInterval->maxValueExists) + { + splitParentMaxValue = PG_INT32_MAX; + } + if (!sampleInterval->minValueExists) + { + currentSplitChildMinValue = PG_INT32_MIN; + } + + List *newShardgroups = NIL; + ListCell *splitPointCell = NULL; + foreach(splitPointCell, splitPointsForShard) + { + Datum splitPoint = (Datum) lfirst(splitPointCell); + + Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); + shardgroup->shardgroupId = GetNextShardIdForSplitChild(); + shardgroup->colocationId = colocationId; + shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); + shardgroup->maxShardValue = splitPoint; + + /* increment for next shardgroup */ + currentSplitChildMinValue = Int32GetDatum(DatumGetInt32(splitPoint) + 1); + newShardgroups = lappend(newShardgroups, shardgroup); + } + + /* + * We have added ranges for start of sampleInterval till the last entry in the + * splitpoints. Now we need to add one more from the last splitpoint till the end to + * complete all splits. + */ + + Shardgroup *shardgroup = palloc0(sizeof(Shardgroup)); + shardgroup->shardgroupId = GetNextShardIdForSplitChild(); + shardgroup->colocationId = colocationId; + shardgroup->minShardValue = Int32GetDatum(currentSplitChildMinValue); + shardgroup->maxShardValue = splitParentMaxValue; + newShardgroups = lappend(newShardgroups, shardgroup); + + return newShardgroups; +} + + +/* + * Create split children intervals for a shardgroup given list of split points. */ static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, - List *splitPointsForShard) + List *newShardgroups) { List *shardGroupSplitIntervalListList = NIL; ShardInterval *shardToSplitInterval = NULL; foreach_ptr(shardToSplitInterval, sourceColocatedShardIntervalList) { - List *shardSplitIntervalList = NIL; - CreateSplitIntervalsForShard(shardToSplitInterval, splitPointsForShard, - &shardSplitIntervalList); + List *shardSplitIntervalList = + CreateShardIntervalsForNewShardgroups(shardToSplitInterval, + newShardgroups); shardGroupSplitIntervalListList = lappend(shardGroupSplitIntervalListList, shardSplitIntervalList); @@ -1045,58 +1119,31 @@ CreateSplitIntervalsForShardGroup(List *sourceColocatedShardIntervalList, * Create split children intervals given a sourceshard and a list of split points. * Example: SourceShard is range [0, 100] and SplitPoints are (15, 30) will give us: * [(0, 15) (16, 30) (31, 100)] + * TODO Fix comment */ -static void -CreateSplitIntervalsForShard(ShardInterval *sourceShard, - List *splitPointsForShard, - List **shardSplitChildrenIntervalList) +static List * +CreateShardIntervalsForNewShardgroups(ShardInterval *sourceShard, + List *newShardgroups) { - /* For 'N' split points, we will have N+1 shard intervals created. */ - int shardIntervalCount = list_length(splitPointsForShard) + 1; - ListCell *splitPointCell = list_head(splitPointsForShard); - int32 splitParentMaxValue = DatumGetInt32(sourceShard->maxValue); - int32 currentSplitChildMinValue = DatumGetInt32(sourceShard->minValue); + List *shardSplitChildrenIntervalList = NIL; - /* if we are splitting a Citus local table, assume whole shard range */ - if (!sourceShard->maxValueExists) - { - splitParentMaxValue = PG_INT32_MAX; - } - - if (!sourceShard->minValueExists) - { - currentSplitChildMinValue = PG_INT32_MIN; - } - - for (int index = 0; index < shardIntervalCount; index++) + Shardgroup *shardgroup = NULL; + foreach_ptr(shardgroup, newShardgroups) { ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); splitChildShardInterval->shardIndex = -1; splitChildShardInterval->shardId = GetNextShardIdForSplitChild(); - - /* TODO find shardgroup id */ - + splitChildShardInterval->shardGroupId = shardgroup->shardgroupId; splitChildShardInterval->minValueExists = true; - splitChildShardInterval->minValue = currentSplitChildMinValue; + splitChildShardInterval->minValue = shardgroup->minShardValue; splitChildShardInterval->maxValueExists = true; + splitChildShardInterval->maxValue = shardgroup->maxShardValue; - /* Length of splitPointsForShard is one less than 'shardIntervalCount' and we need to account */ - /* for 'splitPointCell' being NULL for last iteration. */ - if (splitPointCell) - { - splitChildShardInterval->maxValue = DatumGetInt32((Datum) lfirst( - splitPointCell)); - splitPointCell = lnext(splitPointsForShard, splitPointCell); - } - else - { - splitChildShardInterval->maxValue = splitParentMaxValue; - } - - currentSplitChildMinValue = splitChildShardInterval->maxValue + 1; - *shardSplitChildrenIntervalList = lappend(*shardSplitChildrenIntervalList, - splitChildShardInterval); + shardSplitChildrenIntervalList = lappend(shardSplitChildrenIntervalList, + splitChildShardInterval); } + + return shardSplitChildrenIntervalList; } @@ -1148,6 +1195,23 @@ UpdateDistributionColumnsForShardGroup(List *colocatedShardList, } +static void +InsertSplitChildrenShardgroupMetadata(List *newShardgroups) +{ + Shardgroup *shardgroup = NULL; + foreach_ptr(shardgroup, newShardgroups) + { + InsertShardGroupRow( + shardgroup->shardgroupId, + shardgroup->colocationId, + IntegerToText(DatumGetInt32(shardgroup->minShardValue)), + IntegerToText(DatumGetInt32(shardgroup->maxShardValue))); + } + + /* TODO sync new shardgroups to workers */ +} + + /* * Insert new shard and placement metadata. * Sync the Metadata with all nodes if enabled. @@ -1178,7 +1242,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, shardInterval->storageType, IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->maxValue)), - NULL); + &shardInterval->shardGroupId); InsertShardPlacementRow( shardInterval->shardId, @@ -1294,6 +1358,13 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, } +static void +DropShardgroupMetadata(uint32 shardgroupId) +{ + DeleteShardgroupRow(shardgroupId); +} + + /* * DropShardListMetadata drops shard metadata from both the coordinator and * mx nodes. @@ -1383,6 +1454,7 @@ AcquireNonblockingSplitLock(Oid relationId) void NonBlockingShardSplit(SplitOperation splitOperation, uint64 splitWorkflowId, + uint32 colocationId, List *sourceColocatedShardIntervalList, List *shardSplitPointsList, List *workersForPlacementList, @@ -1396,10 +1468,15 @@ NonBlockingShardSplit(SplitOperation splitOperation, char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); + List *shardgroupSplits = CreateNewShardgroups( + colocationId, + (ShardInterval *) linitial(sourceColocatedShardIntervalList), + shardSplitPointsList); + /* First create shard interval metadata for split children */ List *shardGroupSplitIntervalListList = CreateSplitIntervalsForShardGroup( sourceColocatedShardIntervalList, - shardSplitPointsList); + shardgroupSplits); ShardInterval *firstShard = linitial(sourceColocatedShardIntervalList); @@ -1598,9 +1675,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, distributionMethod, shardCount, targetColocationId); - } - /* 12) Insert new shard and placement metdata */ + /* 12) Insert new shardgroup, shard and placement metadata */ + InsertSplitChildrenShardgroupMetadata(shardgroupSplits); InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 2ac847471..b93d1eccb 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -184,7 +184,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS) candidateNodeIndex++; } - InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, NULL); + CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); + + uint64 shardgroupId = shardId; + InsertShardGroupRow(shardgroupId, tableEntry->colocationId, + nullMinValue, nullMaxValue); + + InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue, &shardgroupId); CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList, ShardReplicationFactor); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 6ee02ae95..a8b0cb1f5 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -71,6 +71,15 @@ typedef struct ShardInterval } ShardInterval; +typedef struct Shardgroup +{ + uint64 shardgroupId; + uint32 colocationId; + Datum minShardValue; /* a shard's typed min value datum */ + Datum maxShardValue; /* a shard's typed max value datum */ +} Shardgroup; + + /* In-memory representation of a tuple in pg_dist_placement. */ typedef struct GroupShardPlacement { @@ -323,6 +332,7 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe extern void UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distributionColumn, int colocationId); extern void DeletePartitionRow(Oid distributedRelationId); +extern void DeleteShardgroupRow(uint32 shardgroupId); extern void DeleteShardRow(uint64 shardId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRow(uint64 placementId); diff --git a/src/test/regress/sql/multi_behavioral_analytics_create_table_superuser.sql b/src/test/regress/sql/multi_behavioral_analytics_create_table_superuser.sql index bc4715760..2632656d1 100644 --- a/src/test/regress/sql/multi_behavioral_analytics_create_table_superuser.sql +++ b/src/test/regress/sql/multi_behavioral_analytics_create_table_superuser.sql @@ -147,21 +147,29 @@ SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('events') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); COPY events FROM STDIN WITH CSV; "(1,1001)",20001,click,1472807012 @@ -191,21 +199,29 @@ SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(1,1)', shardmaxvalue = '(1,2000000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(1,2000000001)', shardmaxvalue = '(1,4300000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(2,1)', shardmaxvalue = '(2,2000000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('users') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = '(2,2000000001)', shardmaxvalue = '(2,4300000000)' +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); COPY users FROM STDIN WITH CSV; "(1,1001)",1472807115 @@ -274,21 +290,29 @@ SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('lineitem_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 5986 WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = 1, shardmaxvalue = 5986 +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); SELECT master_create_empty_shard('orders_subquery') AS new_shard_id \gset UPDATE pg_dist_shard SET shardminvalue = 8997, shardmaxvalue = 14947 WHERE shardid = :new_shard_id; +UPDATE pg_dist_shardgroup SET shardminvalue = 8997, shardmaxvalue = 14947 +WHERE shardgroupid = (SELECT shardgroupid FROM pg_dist_shard WHERE shardid = :new_shard_id); \set lineitem_1_data_file :abs_srcdir '/data/lineitem.1.data' \set client_side_copy_command '\\copy lineitem_subquery FROM ' :'lineitem_1_data_file' ' with delimiter '''|''';'