milvus/internal/datanode/binlog_io.go

274 lines
6.9 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"bytes"
"context"
"errors"
"path"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
errUploadToBlobStorage = errors.New("upload to blob storage wrong")
errDownloadFromBlobStorage = errors.New("download from blob storage wrong")
)
type downloader interface {
// load downloads binlogs from blob storage for given paths.
// The paths are 1 group of binlog paths generated by 1 `Serialize`.
//
// download downloads insert-binlogs, stats-binlogs, and delta-binlogs.
download(ctx context.Context, paths []string) ([]*Blob, error)
}
type uploader interface {
// upload saves InsertData and DeleteData into blob storage.
// stats-binlogs are generated from InsertData.
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error)
}
type binlogIO struct {
kv.BaseKV
allocatorInterface
}
var _ downloader = (*binlogIO)(nil)
var _ uploader = (*binlogIO)(nil)
func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error) {
var (
err = errStart
vs = []string{}
)
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
for err != nil {
select {
case <-gCtx.Done():
log.Warn("ctx done when downloading kvs from blob storage")
return errDownloadFromBlobStorage
default:
if err != errStart {
<-time.After(50 * time.Millisecond)
log.Warn("Try multiloading again", zap.Strings("paths", paths))
}
vs, err = b.MultiLoad(paths)
}
}
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}
rst := make([]*Blob, len(vs))
for i := range rst {
rst[i] = &Blob{Value: bytes.NewBufferString(vs[i]).Bytes()}
}
return rst, nil
}
type cpaths struct {
inPaths []*datapb.FieldBinlog
statsPaths []*datapb.FieldBinlog
deltaInfo *datapb.DeltaLogInfo
}
func (b *binlogIO) upload(
ctx context.Context,
segID, partID UniqueID,
iDatas []*InsertData,
dData *DeleteData,
meta *etcdpb.CollectionMeta) (*cpaths, error) {
var p = &cpaths{
inPaths: make([]*datapb.FieldBinlog, 0),
statsPaths: make([]*datapb.FieldBinlog, 0),
deltaInfo: &datapb.DeltaLogInfo{},
}
kvs := make(map[string]string)
for _, iData := range iDatas {
kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong", zap.Error(err))
return nil, err
}
for k, v := range kv {
kvs[k] = v
}
p.inPaths = append(p.inPaths, inpaths...)
p.statsPaths = append(p.statsPaths, statspaths...)
}
// If there are delta logs
if dData.RowCount > 0 {
k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID)
if err != nil {
log.Warn("generate delta blobs wrong", zap.Error(err))
return nil, err
}
kvs[k] = bytes.NewBuffer(v).String()
p.deltaInfo.RecordEntries = uint64(len(v))
p.deltaInfo.DeltaLogPath = k
}
var err = errStart
g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
for err != nil {
select {
case <-gCtx.Done():
log.Warn("ctx done when saving kvs to blob storage")
return errUploadToBlobStorage
default:
if err != errStart {
<-time.After(50 * time.Millisecond)
log.Info("retry save binlogs")
}
err = b.MultiSave(kvs)
}
}
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}
return p, nil
}
// genDeltaBlobs returns key, value
func (b *binlogIO) genDeltaBlobs(data *DeleteData, collID, partID, segID UniqueID) (string, []byte, error) {
dCodec := storage.NewDeleteCodec()
blob, err := dCodec.Serialize(collID, partID, segID, data)
if err != nil {
return "", nil, err
}
k, err := b.genKey(true, collID, partID, segID)
if err != nil {
return "", nil, err
}
key := path.Join(Params.DeleteBinlogRootPath, k)
return key, blob.GetValue(), nil
}
// return kvs, insert-paths, stats-paths
func (b *binlogIO) genInsertBlobs(data *InsertData, partID, segID UniqueID, meta *etcdpb.CollectionMeta) (map[string]string, []*datapb.FieldBinlog, []*datapb.FieldBinlog, error) {
inCodec := storage.NewInsertCodec(meta)
inlogs, statslogs, err := inCodec.Serialize(partID, segID, data)
if err != nil {
return nil, nil, nil, err
}
kvs := make(map[string]string, len(inlogs)+len(statslogs))
inpaths := make([]*datapb.FieldBinlog, 0, len(inlogs))
statspaths := make([]*datapb.FieldBinlog, 0, len(statslogs))
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)
generator, err := b.idxGenerator(len(inlogs)+len(statslogs), notifyGenIdx)
if err != nil {
return nil, nil, nil, err
}
for _, blob := range inlogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.InsertBinlogRootPath, k)
kvs[key] = bytes.NewBuffer(blob.GetValue()).String()
inpaths = append(inpaths, &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []string{key},
})
}
for _, blob := range statslogs {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := JoinIDPath(meta.GetID(), partID, segID, fID, <-generator)
key := path.Join(Params.StatsBinlogRootPath, k)
kvs[key] = bytes.NewBuffer(blob.GetValue()).String()
statspaths = append(statspaths, &datapb.FieldBinlog{
FieldID: fID,
Binlogs: []string{key},
})
}
return kvs, inpaths, statspaths, nil
}
func (b *binlogIO) idxGenerator(n int, done <-chan struct{}) (<-chan UniqueID, error) {
idStart, _, err := b.allocIDBatch(uint32(n))
if err != nil {
return nil, err
}
rt := make(chan UniqueID)
go func(rt chan<- UniqueID) {
for i := 0; i < n; i++ {
select {
case <-done:
close(rt)
return
case rt <- idStart + UniqueID(i):
}
}
close(rt)
}(rt)
return rt, nil
}
func (b *binlogIO) close() {
b.Close()
}