mirror of https://github.com/milvus-io/milvus.git
Refine code of DataNode binlogIO (#15759)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/15770/head
parent
867cf620ee
commit
e8edaa02fa
|
@ -40,17 +40,20 @@ var (
|
|||
)
|
||||
|
||||
type downloader interface {
|
||||
// load downloads binlogs from blob storage for given paths.
|
||||
// donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths.
|
||||
// The paths are 1 group of binlog paths generated by 1 `Serialize`.
|
||||
//
|
||||
// download downloads insert-binlogs, stats-binlogs, and delta-binlogs.
|
||||
// errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress.
|
||||
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever.
|
||||
download(ctx context.Context, paths []string) ([]*Blob, error)
|
||||
}
|
||||
|
||||
type uploader interface {
|
||||
// upload saves InsertData and DeleteData into blob storage.
|
||||
// stats-binlogs are generated from InsertData.
|
||||
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error)
|
||||
// upload saves InsertData and DeleteData into blob storage, stats binlogs are generated from InsertData.
|
||||
//
|
||||
// errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress.
|
||||
// Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever.
|
||||
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error)
|
||||
}
|
||||
|
||||
type binlogIO struct {
|
||||
|
@ -78,8 +81,8 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
|
|||
|
||||
default:
|
||||
if err != errStart {
|
||||
log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths))
|
||||
<-time.After(50 * time.Millisecond)
|
||||
log.Warn("Try multiloading again", zap.Strings("paths", paths))
|
||||
}
|
||||
vs, err = b.MultiLoad(paths)
|
||||
}
|
||||
|
@ -99,7 +102,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
|
|||
return rst, nil
|
||||
}
|
||||
|
||||
type cpaths struct {
|
||||
type segPaths struct {
|
||||
inPaths []*datapb.FieldBinlog
|
||||
statsPaths []*datapb.FieldBinlog
|
||||
deltaInfo []*datapb.FieldBinlog
|
||||
|
@ -110,14 +113,14 @@ func (b *binlogIO) upload(
|
|||
segID, partID UniqueID,
|
||||
iDatas []*InsertData,
|
||||
dData *DeleteData,
|
||||
meta *etcdpb.CollectionMeta) (*cpaths, error) {
|
||||
|
||||
p := &cpaths{}
|
||||
meta *etcdpb.CollectionMeta) (*segPaths, error) {
|
||||
|
||||
var (
|
||||
inPathm = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog
|
||||
statsPathm = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog
|
||||
kvs = make(map[string]string)
|
||||
p = &segPaths{} // The returns
|
||||
kvs = make(map[string]string) // Key values to store in minIO
|
||||
|
||||
insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog
|
||||
statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog
|
||||
)
|
||||
|
||||
for _, iData := range iDatas {
|
||||
|
@ -132,7 +135,10 @@ func (b *binlogIO) upload(
|
|||
|
||||
kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
|
||||
if err != nil {
|
||||
log.Warn("generate insert blobs wrong", zap.Error(err))
|
||||
log.Warn("generate insert blobs wrong",
|
||||
zap.Int64("collectionID", meta.GetID()),
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -140,79 +146,73 @@ func (b *binlogIO) upload(
|
|||
kvs[k] = v
|
||||
}
|
||||
|
||||
for fID, fieldBinlog := range inpaths {
|
||||
tmpfb, ok := inPathm[fID]
|
||||
for fID, path := range inpaths {
|
||||
tmpBinlog, ok := insertField2Path[fID]
|
||||
if !ok {
|
||||
tmpfb = fieldBinlog
|
||||
tmpBinlog = path
|
||||
} else {
|
||||
tmpfb.Binlogs = append(tmpfb.Binlogs, fieldBinlog.GetBinlogs()...)
|
||||
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
|
||||
}
|
||||
inPathm[fID] = tmpfb
|
||||
insertField2Path[fID] = tmpBinlog
|
||||
}
|
||||
|
||||
for fID, fieldBinlog := range statspaths {
|
||||
tmpfb, ok := statsPathm[fID]
|
||||
for fID, path := range statspaths {
|
||||
tmpBinlog, ok := statsField2Path[fID]
|
||||
if !ok {
|
||||
tmpfb = fieldBinlog
|
||||
tmpBinlog = path
|
||||
} else {
|
||||
tmpfb.Binlogs = append(tmpfb.Binlogs, fieldBinlog.GetBinlogs()...)
|
||||
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
|
||||
}
|
||||
statsPathm[fID] = tmpfb
|
||||
statsField2Path[fID] = tmpBinlog
|
||||
}
|
||||
}
|
||||
|
||||
for _, bs := range inPathm {
|
||||
p.inPaths = append(p.inPaths, bs)
|
||||
for _, path := range insertField2Path {
|
||||
p.inPaths = append(p.inPaths, path)
|
||||
}
|
||||
|
||||
for _, bs := range statsPathm {
|
||||
p.statsPaths = append(p.statsPaths, bs)
|
||||
for _, path := range statsField2Path {
|
||||
p.statsPaths = append(p.statsPaths, path)
|
||||
}
|
||||
|
||||
// If there are delta logs
|
||||
// If there are delta binlogs
|
||||
if dData.RowCount > 0 {
|
||||
k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID)
|
||||
if err != nil {
|
||||
log.Warn("generate delta blobs wrong", zap.Error(err))
|
||||
log.Warn("generate delta blobs wrong",
|
||||
zap.Int64("collectionID", meta.GetID()),
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
kvs[k] = bytes.NewBuffer(v).String()
|
||||
p.deltaInfo = append(p.deltaInfo, &datapb.FieldBinlog{
|
||||
//Field id shall be primary key id
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{
|
||||
EntriesNum: dData.RowCount,
|
||||
LogPath: k,
|
||||
LogSize: int64(len(v)),
|
||||
},
|
||||
},
|
||||
FieldID: 0, // TODO: Not useful on deltalogs, FieldID shall be ID of primary key field
|
||||
Binlogs: []*datapb.Binlog{{
|
||||
EntriesNum: dData.RowCount,
|
||||
LogPath: k,
|
||||
LogSize: int64(len(v)),
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
||||
var err = errStart
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
g.Go(func() error {
|
||||
for err != nil {
|
||||
select {
|
||||
case <-gCtx.Done():
|
||||
log.Warn("ctx done when saving kvs to blob storage")
|
||||
return errUploadToBlobStorage
|
||||
default:
|
||||
if err != errStart {
|
||||
<-time.After(50 * time.Millisecond)
|
||||
log.Info("retry save binlogs")
|
||||
}
|
||||
err = b.MultiSave(kvs)
|
||||
for err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("ctx done when saving kvs to blob storage", zap.Any("key values", kvs))
|
||||
return nil, errUploadToBlobStorage
|
||||
default:
|
||||
if err != errStart {
|
||||
log.Info("save binlog failed, retry in 50ms",
|
||||
zap.Int64("collectionID", meta.GetID()),
|
||||
zap.Int64("segmentID", segID))
|
||||
<-time.After(50 * time.Millisecond)
|
||||
}
|
||||
err = b.MultiSave(kvs)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue