diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 20a7524167..40a3cc0405 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -40,17 +40,20 @@ var ( ) type downloader interface { - // load downloads binlogs from blob storage for given paths. + // donload downloads insert-binlogs, stats-binlogs, and, delta-binlogs from blob storage for given paths. // The paths are 1 group of binlog paths generated by 1 `Serialize`. // - // download downloads insert-binlogs, stats-binlogs, and delta-binlogs. + // errDownloadFromBlobStorage is returned if ctx is canceled from outside while a downloading is inprogress. + // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this downloading may retry forever. download(ctx context.Context, paths []string) ([]*Blob, error) } type uploader interface { - // upload saves InsertData and DeleteData into blob storage. - // stats-binlogs are generated from InsertData. - upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error) + // upload saves InsertData and DeleteData into blob storage, stats binlogs are generated from InsertData. + // + // errUploadToBlobStorage is returned if ctx is canceled from outside while a uploading is inprogress. + // Beware of the ctx here, if no timeout or cancel is applied to this ctx, this uploading may retry forever. + upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*segPaths, error) } type binlogIO struct { @@ -78,8 +81,8 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error default: if err != errStart { + log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths)) <-time.After(50 * time.Millisecond) - log.Warn("Try multiloading again", zap.Strings("paths", paths)) } vs, err = b.MultiLoad(paths) } @@ -99,7 +102,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error return rst, nil } -type cpaths struct { +type segPaths struct { inPaths []*datapb.FieldBinlog statsPaths []*datapb.FieldBinlog deltaInfo []*datapb.FieldBinlog @@ -110,14 +113,14 @@ func (b *binlogIO) upload( segID, partID UniqueID, iDatas []*InsertData, dData *DeleteData, - meta *etcdpb.CollectionMeta) (*cpaths, error) { - - p := &cpaths{} + meta *etcdpb.CollectionMeta) (*segPaths, error) { var ( - inPathm = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog - statsPathm = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog - kvs = make(map[string]string) + p = &segPaths{} // The returns + kvs = make(map[string]string) // Key values to store in minIO + + insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its FieldBinlog + statsField2Path = make(map[UniqueID]*datapb.FieldBinlog) // FieldID > its statsBinlog ) for _, iData := range iDatas { @@ -132,7 +135,10 @@ func (b *binlogIO) upload( kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) if err != nil { - log.Warn("generate insert blobs wrong", zap.Error(err)) + log.Warn("generate insert blobs wrong", + zap.Int64("collectionID", meta.GetID()), + zap.Int64("segmentID", segID), + zap.Error(err)) return nil, err } @@ -140,79 +146,73 @@ func (b *binlogIO) upload( kvs[k] = v } - for fID, fieldBinlog := range inpaths { - tmpfb, ok := inPathm[fID] + for fID, path := range inpaths { + tmpBinlog, ok := insertField2Path[fID] if !ok { - tmpfb = fieldBinlog + tmpBinlog = path } else { - tmpfb.Binlogs = append(tmpfb.Binlogs, fieldBinlog.GetBinlogs()...) + tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) } - inPathm[fID] = tmpfb + insertField2Path[fID] = tmpBinlog } - for fID, fieldBinlog := range statspaths { - tmpfb, ok := statsPathm[fID] + for fID, path := range statspaths { + tmpBinlog, ok := statsField2Path[fID] if !ok { - tmpfb = fieldBinlog + tmpBinlog = path } else { - tmpfb.Binlogs = append(tmpfb.Binlogs, fieldBinlog.GetBinlogs()...) + tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) } - statsPathm[fID] = tmpfb + statsField2Path[fID] = tmpBinlog } } - for _, bs := range inPathm { - p.inPaths = append(p.inPaths, bs) + for _, path := range insertField2Path { + p.inPaths = append(p.inPaths, path) } - for _, bs := range statsPathm { - p.statsPaths = append(p.statsPaths, bs) + for _, path := range statsField2Path { + p.statsPaths = append(p.statsPaths, path) } - // If there are delta logs + // If there are delta binlogs if dData.RowCount > 0 { k, v, err := b.genDeltaBlobs(dData, meta.GetID(), partID, segID) if err != nil { - log.Warn("generate delta blobs wrong", zap.Error(err)) + log.Warn("generate delta blobs wrong", + zap.Int64("collectionID", meta.GetID()), + zap.Int64("segmentID", segID), + zap.Error(err)) return nil, err } kvs[k] = bytes.NewBuffer(v).String() p.deltaInfo = append(p.deltaInfo, &datapb.FieldBinlog{ - //Field id shall be primary key id - Binlogs: []*datapb.Binlog{ - { - EntriesNum: dData.RowCount, - LogPath: k, - LogSize: int64(len(v)), - }, - }, + FieldID: 0, // TODO: Not useful on deltalogs, FieldID shall be ID of primary key field + Binlogs: []*datapb.Binlog{{ + EntriesNum: dData.RowCount, + LogPath: k, + LogSize: int64(len(v)), + }}, }) } var err = errStart - g, gCtx := errgroup.WithContext(ctx) - g.Go(func() error { - for err != nil { - select { - case <-gCtx.Done(): - log.Warn("ctx done when saving kvs to blob storage") - return errUploadToBlobStorage - default: - if err != errStart { - <-time.After(50 * time.Millisecond) - log.Info("retry save binlogs") - } - err = b.MultiSave(kvs) + for err != nil { + select { + case <-ctx.Done(): + log.Warn("ctx done when saving kvs to blob storage", zap.Any("key values", kvs)) + return nil, errUploadToBlobStorage + default: + if err != errStart { + log.Info("save binlog failed, retry in 50ms", + zap.Int64("collectionID", meta.GetID()), + zap.Int64("segmentID", segID)) + <-time.After(50 * time.Millisecond) } + err = b.MultiSave(kvs) } - return nil - }) - - if err := g.Wait(); err != nil { - return nil, err } - return p, nil }