Use new stream segment reader in clustering compaction (#34232)

#32939

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/34276/head
wayblink 2024-06-30 20:26:07 +08:00 committed by GitHub
parent 73ffc1b424
commit e5d691d854
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 19 deletions

View File

@ -19,6 +19,7 @@ package compaction
import (
"context"
"fmt"
sio "io"
"math"
"path"
"sort"
@ -481,33 +482,33 @@ func (t *clusteringCompactionTask) mappingSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}
for _, path := range fieldBinlogPaths {
bytesArr, err := t.binlogIO.Download(ctx, path)
blobs := make([]*storage.Blob, len(bytesArr))
var segmentSize int64
for i := range bytesArr {
blobs[i] = &storage.Blob{Value: bytesArr[i]}
segmentSize = segmentSize + int64(len(bytesArr[i]))
}
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return err
}
pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType())
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})
pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID())
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err))
return err
}
var offset int64 = -1
for pkIter.HasNext() {
vInter, _ := pkIter.Next()
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return errors.New("unexpected error")
for {
err := pkIter.Next()
if err != nil {
if err == sio.EOF {
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
}
}
v := pkIter.Value()
offset++
// Filtering deleted entity
@ -524,7 +525,7 @@ func (t *clusteringCompactionTask) mappingSegment(
row, ok := v.Value.(map[typeutil.UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
log.Warn("transfer interface to map wrong", zap.Strings("paths", paths))
return errors.New("unexpected error")
}