Fix bulkinsert row count error (#25869)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
pull/25893/head
groot 2023-07-25 11:29:00 +08:00 committed by GitHub
parent 5e3cbb584a
commit a6808e6484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 13 deletions

View File

@ -679,6 +679,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
colID := req.GetImportTask().GetCollectionId()
segmentIDReq := composeAssignSegmentIDRequest(1, shardID, chNames, colID, partID)
targetChName := segmentIDReq.GetSegmentIDRequests()[0].GetChannelName()
logFields = append(logFields, zap.Int64("collection ID", colID))
logFields = append(logFields, zap.String("target channel name", targetChName))
log.Info("assign segment for the import task", logFields...)
resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq)
@ -739,6 +740,7 @@ func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *sc
log.Info("fields data is empty, no need to generate binlog", logFields...)
return nil, nil, nil
}
logFields = append(logFields, zap.Int("row count", rowNum))
colID := req.GetImportTask().GetCollectionId()
fieldInsert, fieldStats, err := createBinLogs(rowNum, schema, ts, fields, node, segmentID, colID, partID)
@ -832,12 +834,6 @@ func composeAssignSegmentIDRequest(rowNum int, shardID int, chNames []string,
// use the first field's row count as segment row count
// all the fields row count are same, checked by ImportWrapper
// ask DataCoord to alloc a new segment
log.Info("import task flush segment",
zap.Int("rowCount", rowNum),
zap.Any("channel names", chNames),
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partID),
zap.Int("shardID", shardID))
segReqs := []*datapb.SegmentIDRequest{
{
ChannelName: chNames[shardID],

View File

@ -455,33 +455,39 @@ func (p *ImportWrapper) flushFunc(fields BlockData, shardID int, partitionID int
}
// if fields data is empty, do nothing
var rowNum int
rowNum := 0
memSize := 0
for _, field := range fields {
rowNum = field.RowNum()
memSize += field.GetMemorySize()
break
}
if rowNum <= 0 {
log.Warn("import wrapper: fields data is empty", logFields...)
return nil
}
logFields = append(logFields, zap.Int("rowNum", rowNum), zap.Int("memSize", memSize))
log.Info("import wrapper: flush block data to binlog", logFields...)
// if there is no segment for this shard, create a new one
// if the segment exists and its size almost exceed segmentSize, close it and create a new one
var segment *WorkingSegment
if shard, ok := p.workingSegments[shardID]; ok {
if segment, exists := shard[partitionID]; exists {
if segmentTemp, exists := shard[partitionID]; exists {
log.Info("import wrapper: compare working segment memSize with segmentSize",
zap.Int("memSize", segmentTemp.memSize), zap.Int64("segmentSize", p.segmentSize))
if int64(segmentTemp.memSize)+int64(memSize) >= p.segmentSize {
// the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment
if int64(segment.memSize)+int64(memSize) >= p.segmentSize {
err := p.closeWorkingSegment(segment)
err := p.closeWorkingSegment(segmentTemp)
if err != nil {
logFields = append(logFields, zap.Error(err))
log.Warn("import wrapper: failed to close working segment", logFields...)
return err
}
segment = nil
p.workingSegments[shardID][partitionID] = nil
} else {
// the exist segment size is small, no need to close
segment = segmentTemp
}
}
} else {