Add compaction in datanode (#11353)

See also: #9530

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/11455/head
XuanYang-cn 2021-11-08 19:49:07 +08:00 committed by GitHub
parent dfbb4c33b4
commit 3cd0584a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1420 additions and 24 deletions

1
go.mod
View File

@ -44,6 +44,7 @@ require (
go.uber.org/zap v1.17.0
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/tools v0.1.7 // indirect
google.golang.org/grpc v1.38.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0

1
go.sum
View File

@ -707,6 +707,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -49,7 +49,7 @@ type downloader interface {
type uploader interface {
// upload saves InsertData and DeleteData into blob storage.
// stats-binlogs are generated from InsertData.
upload(ctx context.Context, segID, partID UniqueID, iData *InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error)
upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error)
}
type binlogIO struct {
@ -71,7 +71,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error
case <-ctx.Done():
close(r)
log.Debug("binlog download canceled by context done")
log.Warn("ctx done when downloading kvs from blob storage")
return
default:
@ -107,16 +107,32 @@ type cpaths struct {
func (b *binlogIO) upload(
ctx context.Context,
segID, partID UniqueID,
iData *InsertData,
iDatas []*InsertData,
dData *DeleteData,
meta *etcdpb.CollectionMeta) (*cpaths, error) {
kvs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong", zap.Error(err))
return nil, err
var p = &cpaths{
inPaths: make([]*datapb.FieldBinlog, 0),
statsPaths: make([]*datapb.FieldBinlog, 0),
}
kvs := make(map[string]string)
for _, iData := range iDatas {
kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
if err != nil {
log.Warn("generate insert blobs wrong", zap.Error(err))
return nil, err
}
for k, v := range kv {
kvs[k] = v
}
p.inPaths = append(p.inPaths, inpaths...)
p.statsPaths = append(p.statsPaths, statspaths...)
}
p := &cpaths{inpaths, statspaths, nil}
// If there are delta logs
if dData != nil {

View File

@ -44,7 +44,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
Data: map[int64]int64{888: 666666},
}
p, err := b.upload(context.TODO(), 1, 10, iData, dData, meta)
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
assert.NoError(t, err)
assert.Equal(t, 11, len(p.inPaths))
assert.Equal(t, 3, len(p.statsPaths))
@ -53,7 +53,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
p, err = b.upload(ctx, 1, 10, iData, dData, meta)
p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta)
assert.EqualError(t, err, errUploadToBlobStorage.Error())
assert.Nil(t, p)
})

View File

@ -0,0 +1,86 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"runtime"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
const (
maxTaskNum = 1024
)
var maxParallelCompactionNum = calculeateParallel()
type compactionExecutor struct {
parallelCh chan struct{}
taskCh chan compactor
}
// 0.5*min(8, NumCPU/2)
func calculeateParallel() int {
cores := runtime.NumCPU()
if cores < 16 {
return 4
}
return cores / 2
}
func newCompactionExecutor() *compactionExecutor {
return &compactionExecutor{
parallelCh: make(chan struct{}, maxParallelCompactionNum),
taskCh: make(chan compactor, maxTaskNum),
}
}
func (c *compactionExecutor) execute(task compactor) {
c.taskCh <- task
}
func (c *compactionExecutor) start(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case task := <-c.taskCh:
go c.executeTask(task)
}
}
}
func (c *compactionExecutor) executeTask(task compactor) {
c.parallelCh <- struct{}{}
defer func() {
<-c.parallelCh
}()
log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID()))
err := task.compact()
if err != nil {
log.Warn("compaction task failed",
zap.Int64("planID", task.getPlanID()),
zap.Error(err),
)
}
log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID()))
}

View File

@ -0,0 +1,79 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"testing"
)
func TestCompactionExecutor(t *testing.T) {
t.Run("Test execute", func(t *testing.T) {
ex := newCompactionExecutor()
go ex.start(context.TODO())
ex.execute(newMockCompactor(true))
})
t.Run("Test start", func(t *testing.T) {
ex := newCompactionExecutor()
ctx, cancel := context.WithCancel(context.TODO())
cancel()
go ex.start(ctx)
})
t.Run("Test excuteTask", func(t *testing.T) {
tests := []struct {
isvalid bool
description string
}{
{true, "compact return nil"},
{false, "compact return error"},
}
ex := newCompactionExecutor()
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
ex.executeTask(newMockCompactor(true))
} else {
ex.executeTask(newMockCompactor(false))
}
})
}
})
}
func newMockCompactor(isvalid bool) compactor {
return &mockCompactor{isvalid}
}
type mockCompactor struct {
isvalid bool
}
func (mc *mockCompactor) compact() error {
if mc.isvalid {
return errStart
}
return nil
}
func (mc *mockCompactor) getPlanID() UniqueID {
return 1
}

View File

@ -0,0 +1,586 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
errCompactionTypeUndifined = errors.New("compaction type undefined")
errIllegalCompactionPlan = errors.New("compaction plan illegal")
errTransferType = errors.New("transfer intferface to type wrong")
errUnknownDataType = errors.New("unknown shema DataType")
)
type iterator = storage.Iterator
type compactor interface {
compact() error
getPlanID() UniqueID
}
type compactionTask struct {
downloader
uploader
compactor
Replica
flushManager
allocatorInterface
dc types.DataCoord
plan *datapb.CompactionPlan
}
// check if compactionTask implements compactor
var _ compactor = (*compactionTask)(nil)
func newCompactionTask(
dl downloader,
ul uploader,
replica Replica,
fm flushManager,
alloc allocatorInterface,
dc types.DataCoord,
plan *datapb.CompactionPlan) *compactionTask {
return &compactionTask{
downloader: dl,
uploader: ul,
Replica: replica,
flushManager: fm,
allocatorInterface: alloc,
dc: dc,
plan: plan,
}
}
func (t *compactionTask) getPlanID() UniqueID {
return t.plan.GetPlanID()
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) {
dCodec := storage.NewDeleteCodec()
var (
pk2ts = make(map[UniqueID]Timestamp)
dbuff = &DelDataBuf{
delData: &DeleteData{Data: make(map[UniqueID]UniqueID)},
tsFrom: math.MaxUint64,
tsTo: 0,
}
)
for _, blobs := range dBlobs {
_, _, dData, err := dCodec.Deserialize(blobs)
if err != nil {
log.Warn("merge deltalogs wrong", zap.Error(err))
return nil, nil, err
}
for pk, ts := range dData.Data {
if timetravelTs != Timestamp(0) && Timestamp(ts) <= timetravelTs {
pk2ts[pk] = Timestamp(ts)
continue
}
dbuff.delData.Data[pk] = ts
if Timestamp(ts) < dbuff.tsFrom {
dbuff.tsFrom = Timestamp(ts)
}
if Timestamp(ts) > dbuff.tsTo {
dbuff.tsTo = Timestamp(ts)
}
}
}
dbuff.updateSize(int64(len(dbuff.delData.Data)))
return pk2ts, dbuff, nil
}
func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema) ([]*InsertData, int64, error) {
var (
dim int // dimension of vector field
num int // numOfRows in each binlog
n int // binlog number
err error
iDatas = make([]*InsertData, 0)
fID2Type = make(map[UniqueID]schemapb.DataType)
fID2Content = make(map[UniqueID][]interface{})
)
for _, fs := range schema.GetFields() {
fID2Type[fs.GetFieldID()] = fs.GetDataType()
// get dim
if fs.GetDataType() == schemapb.DataType_FloatVector ||
fs.GetDataType() == schemapb.DataType_BinaryVector {
for _, t := range fs.GetTypeParams() {
if t.Key == "dim" {
if dim, err = strconv.Atoi(t.Value); err != nil {
log.Warn("strconv wrong on get dim", zap.Error(err))
return nil, 0, err
}
break
}
}
}
}
for mergeItr.HasNext() {
// There will be no error if HasNext() returns true
vInter, _ := mergeItr.Next()
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong")
return nil, 0, errors.New("Unexpected error")
}
if _, ok := delta[v.ID]; ok {
continue
}
row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong")
return nil, 0, errors.New("Unexpected error")
}
for fID, vInter := range row {
if _, ok := fID2Content[fID]; !ok {
fID2Content[fID] = make([]interface{}, 0)
}
fID2Content[fID] = append(fID2Content[fID], vInter)
}
}
// calculate numRows from rowID field, fieldID 0
numRows := int64(len(fID2Content[0]))
num = int(Params.FlushInsertBufferSize / (int64(dim) * 4))
n = int(numRows)/num + 1
for i := 0; i < n; i++ {
iDatas = append(iDatas, &InsertData{Data: make(map[storage.FieldID]storage.FieldData)})
}
for fID, content := range fID2Content {
tp, ok := fID2Type[fID]
if !ok {
log.Warn("no field ID in this schema", zap.Int64("fieldID", fID))
return nil, 0, errors.New("Unexpected error")
}
for i := 0; i < n; i++ {
var c []interface{}
if i == n-1 {
c = content[i*num:]
} else {
c = content[i*num : i*num+num]
}
fData, err := interface2FieldData(tp, c, int64(len(c)))
if err != nil {
log.Warn("transfer interface to FieldData wrong", zap.Error(err))
return nil, 0, err
}
iDatas[i].Data[fID] = fData
}
}
return iDatas, numRows, nil
}
func (t *compactionTask) compact() error {
ctxTimeout, cancelAll := context.WithTimeout(context.Background(), time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
var targetSegID UniqueID
var err error
switch {
case t.plan.GetType() == datapb.CompactionType_UndefinedCompaction:
log.Error("compact wrong, compaction type undefined")
return errCompactionTypeUndifined
case len(t.plan.GetSegmentBinlogs()) < 1:
log.Error("compact wrong, there's no segments in segment binlogs")
return errIllegalCompactionPlan
case t.plan.GetType() == datapb.CompactionType_MergeCompaction:
targetSegID, err = t.allocID()
if err != nil {
log.Error("compact wrong", zap.Error(err))
return err
}
case t.plan.GetType() == datapb.CompactionType_InnerCompaction:
targetSegID = t.plan.GetSegmentBinlogs()[0].GetSegmentID()
}
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()))
segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs()))
for _, s := range t.plan.GetSegmentBinlogs() {
segIDs = append(segIDs, s.GetSegmentID())
}
collID, partID, meta, err := t.getSegmentMeta(segIDs[0])
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
// Inject to stop flush
ti := taskInjection{
injected: make(chan struct{}),
injectOver: make(chan bool),
postInjection: func(pack *segmentFlushPack) {
pack.segmentID = targetSegID
},
}
defer close(ti.injectOver)
t.injectFlush(ti, segIDs...)
<-ti.injected
var (
iItr = make([]iterator, 0)
imu sync.Mutex
// SegmentID to deltaBlobs
dblobs = make(map[UniqueID][]*Blob)
dmu sync.Mutex
)
g, gCtx := errgroup.WithContext(ctxTimeout)
for _, s := range t.plan.GetSegmentBinlogs() {
// TODO may panic
fieldNum := len(s.GetFieldBinlogs()[0].GetBinlogs())
for idx := 0; idx < fieldNum; idx++ {
ps := make([]string, 0, fieldNum)
for _, f := range s.GetFieldBinlogs() {
ps = append(ps, f.GetBinlogs()[idx])
}
g.Go(func() error {
bs, err := t.download(gCtx, ps)
if err != nil {
log.Warn("download insertlogs wrong")
return err
}
itr, err := storage.NewInsertBinlogIterator(bs)
if err != nil {
log.Warn("new insert binlogs Itr wrong")
return err
}
imu.Lock()
iItr = append(iItr, itr)
imu.Unlock()
return nil
})
}
segID := s.GetSegmentID()
for _, d := range s.GetDeltalogs() {
g.Go(func() error {
bs, err := t.download(gCtx, []string{d.GetDeltaLogPath()})
if err != nil {
log.Warn("download deltalogs wrong")
return err
}
dmu.Lock()
dblobs[segID] = append(dblobs[segID], bs...)
dmu.Unlock()
return nil
})
}
}
if err := g.Wait(); err != nil {
log.Error("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
mergeItr := storage.NewMergeIterator(iItr)
deltaMap, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
iDatas, numRows, err := t.merge(mergeItr, deltaMap, meta.GetSchema())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta)
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
cpaths.deltaInfo.DeltaLogSize = deltaBuf.size
cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom
cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo
pack := &datapb.CompactionResult{
PlanID: t.plan.GetPlanID(),
SegmentID: targetSegID,
InsertLogs: cpaths.inPaths,
Field2StatslogPaths: cpaths.statsPaths,
NumOfRows: numRows,
Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo},
}
status, err := t.dc.CompleteCompaction(ctxTimeout, pack)
if err != nil {
log.Error("complete compaction rpc wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
if status.ErrorCode != commonpb.ErrorCode_Success {
log.Error("complete compaction wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.String("reason", status.GetReason()))
return fmt.Errorf("complete comapction wrong: %s", status.GetReason())
}
// Compaction I: update pk range.
// Compaction II: remove the segments and add a new flushed segment with pk range.
fd := []UniqueID{}
for _, iData := range iDatas {
fd = append(fd, iData.Data[0].(*storage.Int64FieldData).Data...)
}
if t.hasSegment(targetSegID, true) {
t.refreshFlushedSegStatistics(targetSegID, numRows)
t.refreshFlushedSegmentPKRange(targetSegID, fd)
} else {
t.addFlushedSegmentWithPKs(targetSegID, collID, partID, t.plan.GetChannel(), numRows, fd)
for _, seg := range segIDs {
t.removeSegment(seg)
}
}
ti.injectOver <- true
log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()))
return nil
}
// TODO copy maybe expensive, but this seems to be the only convinent way.
func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) {
var rst storage.FieldData
numOfRows := []int64{numRows}
switch schemaDataType {
case schemapb.DataType_Bool:
var data = &storage.BoolFieldData{
NumRows: numOfRows,
Data: make([]bool, 0, len(content)),
}
for _, c := range content {
r, ok := c.(bool)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Int8:
var data = &storage.Int8FieldData{
NumRows: numOfRows,
Data: make([]int8, 0, len(content)),
}
for _, c := range content {
r, ok := c.(int8)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Int16:
var data = &storage.Int16FieldData{
NumRows: numOfRows,
Data: make([]int16, 0, len(content)),
}
for _, c := range content {
r, ok := c.(int16)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Int32:
var data = &storage.Int32FieldData{
NumRows: numOfRows,
Data: make([]int32, 0, len(content)),
}
for _, c := range content {
r, ok := c.(int32)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Int64:
var data = &storage.Int64FieldData{
NumRows: numOfRows,
Data: make([]int64, 0, len(content)),
}
for _, c := range content {
r, ok := c.(int64)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Float:
var data = &storage.FloatFieldData{
NumRows: numOfRows,
Data: make([]float32, 0, len(content)),
}
for _, c := range content {
r, ok := c.(float32)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_Double:
var data = &storage.DoubleFieldData{
NumRows: numOfRows,
Data: make([]float64, 0, len(content)),
}
for _, c := range content {
r, ok := c.(float64)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
rst = data
case schemapb.DataType_FloatVector:
var data = &storage.FloatVectorFieldData{
NumRows: numOfRows,
Data: make([]float32, 0, len(content)),
}
for _, c := range content {
r, ok := c.(float32)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
data.Dim = len(data.Data) / int(numRows)
rst = data
case schemapb.DataType_BinaryVector:
var data = &storage.BinaryVectorFieldData{
NumRows: numOfRows,
Data: make([]byte, 0, len(content)),
}
for _, c := range content {
r, ok := c.(byte)
if !ok {
return nil, errTransferType
}
data.Data = append(data.Data, r)
}
data.Dim = len(data.Data) * 8 / int(numRows)
rst = data
default:
return nil, errUnknownDataType
}
return rst, nil
}
func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) {
collID, partID, err := t.getCollectionAndPartitionID(segID)
if err != nil {
return -1, -1, nil, err
}
// TODO current compaction timestamp replace zero? why?
// Bad desgin of describe collection.
sch, err := t.getCollectionSchema(collID, 0)
if err != nil {
return -1, -1, nil, err
}
meta := &etcdpb.CollectionMeta{
ID: collID,
Schema: sch,
}
return collID, partID, meta, nil
}

View File

@ -0,0 +1,531 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"context"
"testing"
"time"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCompactionTaskInnerMethods(t *testing.T) {
t.Run("Test getSegmentMeta", func(t *testing.T) {
rc := &RootCoordFactory{}
replica, err := newReplica(context.TODO(), rc, 1)
require.NoError(t, err)
task := &compactionTask{
Replica: replica,
}
_, _, _, err = task.getSegmentMeta(100)
assert.Error(t, err)
err = replica.addNewSegment(100, 1, 10, "a", new(internalpb.MsgPosition), nil)
require.NoError(t, err)
collID, partID, meta, err := task.getSegmentMeta(100)
assert.NoError(t, err)
assert.Equal(t, UniqueID(1), collID)
assert.Equal(t, UniqueID(10), partID)
assert.NotNil(t, meta)
rc.setCollectionID(-2)
_, _, _, err = task.getSegmentMeta(100)
assert.Error(t, err)
})
t.Run("Test.interface2FieldData", func(t *testing.T) {
tests := []struct {
isvalid bool
tp schemapb.DataType
content []interface{}
description string
}{
{true, schemapb.DataType_Bool, []interface{}{true, false}, "valid bool"},
{true, schemapb.DataType_Int8, []interface{}{int8(1), int8(2)}, "valid int8"},
{true, schemapb.DataType_Int16, []interface{}{int16(1), int16(2)}, "valid int16"},
{true, schemapb.DataType_Int32, []interface{}{int32(1), int32(2)}, "valid int32"},
{true, schemapb.DataType_Int64, []interface{}{int64(1), int64(2)}, "valid int64"},
{true, schemapb.DataType_Float, []interface{}{float32(1), float32(2)}, "valid float32"},
{true, schemapb.DataType_Double, []interface{}{float64(1), float64(2)}, "valid float64"},
{true, schemapb.DataType_FloatVector, []interface{}{float32(1), float32(2)}, "valid floatvector"},
{true, schemapb.DataType_BinaryVector, []interface{}{byte(255), byte(1)}, "valid binaryvector"},
{false, schemapb.DataType_Bool, []interface{}{1, 2}, "invalid bool"},
{false, schemapb.DataType_Int8, []interface{}{nil, nil}, "invalid int8"},
{false, schemapb.DataType_Int16, []interface{}{nil, nil}, "invalid int16"},
{false, schemapb.DataType_Int32, []interface{}{nil, nil}, "invalid int32"},
{false, schemapb.DataType_Int64, []interface{}{nil, nil}, "invalid int64"},
{false, schemapb.DataType_Float, []interface{}{nil, nil}, "invalid float32"},
{false, schemapb.DataType_Double, []interface{}{nil, nil}, "invalid float64"},
{false, schemapb.DataType_FloatVector, []interface{}{nil, nil}, "invalid floatvector"},
{false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"},
{false, schemapb.DataType_String, nil, "invalid data type"},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
fd, err := interface2FieldData(test.tp, test.content, 2)
assert.NoError(t, err)
assert.Equal(t, 2, fd.Length())
} else {
fd, err := interface2FieldData(test.tp, test.content, 2)
assert.Error(t, err)
assert.Nil(t, fd)
}
})
}
})
t.Run("Test mergeDeltalogs", func(t *testing.T) {
t.Run("One segment with timetravel", func(t *testing.T) {
invalidBlobs := map[UniqueID][]*Blob{
1: {},
}
blobs, err := getDeltaBlobs(
100, map[int64]int64{
1: 20000,
2: 20001,
3: 20002,
4: 30000,
5: 50000,
})
require.NoError(t, err)
validBlobs := map[UniqueID][]*Blob{
100: blobs,
}
tests := []struct {
isvalid bool
dBlobs map[UniqueID][]*Blob
timetravel Timestamp
description string
}{
{false, invalidBlobs, 0, "invalid dBlobs"},
{true, validBlobs, 21000, "valid blobs"},
}
for _, test := range tests {
task := &compactionTask{}
t.Run(test.description, func(t *testing.T) {
if test.isvalid {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
assert.NoError(t, err)
assert.Equal(t, 3, len(pk2ts))
assert.Equal(t, int64(2), db.size)
} else {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
assert.Error(t, err)
assert.Nil(t, pk2ts)
assert.Nil(t, db)
}
})
}
})
t.Run("Multiple segments with timetravel", func(t *testing.T) {
tests := []struct {
segIDA UniqueID
dataA map[int64]int64
segIDB UniqueID
dataB map[int64]int64
segIDC UniqueID
dataC map[int64]int64
timetravel Timestamp
expectedpk2ts int
expecteddb int
description string
}{
{
0, nil,
100, map[int64]int64{
1: 20000,
2: 30000,
3: 20005},
200, map[int64]int64{
4: 50000,
5: 50001,
6: 50002},
40000, 3, 3, "2 segments with timetravel 40000",
},
{
300, map[int64]int64{
10: 20001,
20: 40001,
},
100, map[int64]int64{
1: 20000,
2: 30000,
3: 20005},
200, map[int64]int64{
4: 50000,
5: 50001,
6: 50002},
40000, 4, 4, "3 segments with timetravel 40000",
},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
dBlobs := make(map[UniqueID][]*Blob)
if test.segIDA != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDA, test.dataA)
require.NoError(t, err)
dBlobs[test.segIDA] = d
}
if test.segIDB != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDB, test.dataB)
require.NoError(t, err)
dBlobs[test.segIDB] = d
}
if test.segIDC != UniqueID(0) {
d, err := getDeltaBlobs(test.segIDC, test.dataC)
require.NoError(t, err)
dBlobs[test.segIDC] = d
}
task := &compactionTask{}
pk2ts, db, err := task.mergeDeltalogs(dBlobs, test.timetravel)
assert.NoError(t, err)
assert.Equal(t, test.expectedpk2ts, len(pk2ts))
assert.Equal(t, test.expecteddb, int(db.size))
})
}
})
})
t.Run("Test merge", func(t *testing.T) {
iData := genInsertData()
meta := NewMetaFactory().GetCollectionMeta(1, "test")
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[UniqueID]Timestamp{
1: 10000,
}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema())
assert.NoError(t, err)
assert.Equal(t, int64(1), numOfRow)
assert.Equal(t, 1, len(idata))
})
}
func getDeltaBlobs(segID UniqueID, pk2ts map[int64]int64) ([]*Blob, error) {
deltaData := &DeleteData{Data: pk2ts}
dCodec := storage.NewDeleteCodec()
blob, err := dCodec.Serialize(1, 10, segID, deltaData)
return []*Blob{blob}, err
}
func getInsertBlobs(segID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) ([]*Blob, error) {
iCodec := storage.NewInsertCodec(meta)
iblobs, _, err := iCodec.Serialize(10, segID, iData)
return iblobs, err
}
func TestCompactorInterfaceMethods(t *testing.T) {
notEmptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{{
SegmentID: 100,
FieldBinlogs: nil,
Field2StatslogPaths: nil,
Deltalogs: nil,
}}
t.Run("Test compact invalid", func(t *testing.T) {
invalidAlloc := NewAllocatorFactory(-1)
emptyTask := &compactionTask{}
emptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{}
plan := &datapb.CompactionPlan{
PlanID: 999,
SegmentBinlogs: notEmptySegmentBinlogs,
StartTime: 0,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_UndefinedCompaction,
Channel: "",
}
emptyTask.plan = plan
err := emptyTask.compact()
assert.Error(t, err)
plan.Type = datapb.CompactionType_InnerCompaction
plan.SegmentBinlogs = emptySegmentBinlogs
err = emptyTask.compact()
assert.Error(t, err)
plan.Type = datapb.CompactionType_MergeCompaction
emptyTask.allocatorInterface = invalidAlloc
plan.SegmentBinlogs = notEmptySegmentBinlogs
err = emptyTask.compact()
assert.Error(t, err)
})
t.Run("Test typeI compact valid", func(t *testing.T) {
var collID, partID, segID UniqueID = 1, 10, 100
alloc := NewAllocatorFactory(1)
rc := &RootCoordFactory{}
dc := &DataCoordFactory{}
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1})
iData := genInsertData()
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
dData := &DeleteData{Data: map[int64]int64{
1: 20000,
}}
cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
require.Equal(t, 11, len(cpaths.inPaths))
plan := &datapb.CompactionPlan{
PlanID: 10080,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segID,
FieldBinlogs: cpaths.inPaths,
Field2StatslogPaths: cpaths.statsPaths,
Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo},
},
},
StartTime: 0,
TimeoutInSeconds: 1,
Type: datapb.CompactionType_InnerCompaction,
Timetravel: 30000,
Channel: "channelname",
}
task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
err = task.compact()
assert.NoError(t, err)
updates, err := replica.getSegmentStatisticsUpdates(segID)
assert.NoError(t, err)
assert.Equal(t, int64(1), updates.GetNumRows())
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
plan.PlanID++
plan.Timetravel = Timestamp(10000)
err = task.compact()
assert.NoError(t, err)
updates, err = replica.getSegmentStatisticsUpdates(segID)
assert.NoError(t, err)
assert.Equal(t, int64(2), updates.GetNumRows())
// New test, remove all the binlogs in memkv
// Timeout
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
require.NoError(t, err)
plan.PlanID++
mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1)
err = task.compact()
assert.Error(t, err)
})
t.Run("Test typeII compact valid", func(t *testing.T) {
var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201
alloc := NewAllocatorFactory(1)
rc := &RootCoordFactory{}
dc := &DataCoordFactory{}
mockfm := &mockFlushManager{}
mockKv := memkv.NewMemoryKV()
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9})
require.True(t, replica.hasSegment(segID1, true))
require.True(t, replica.hasSegment(segID2, true))
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
iData1 := genInsertDataWithRowIDs([2]int64{1, 2})
dData1 := &DeleteData{Data: map[int64]int64{
1: 20000,
}}
iData2 := genInsertDataWithRowIDs([2]int64{9, 10})
dData2 := &DeleteData{Data: map[int64]int64{
9: 30000,
}}
cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta)
require.NoError(t, err)
require.Equal(t, 11, len(cpaths1.inPaths))
cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta)
require.NoError(t, err)
require.Equal(t, 11, len(cpaths2.inPaths))
plan := &datapb.CompactionPlan{
PlanID: 10080,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segID1,
FieldBinlogs: cpaths1.inPaths,
Field2StatslogPaths: cpaths1.statsPaths,
Deltalogs: []*datapb.DeltaLogInfo{cpaths1.deltaInfo},
},
{
SegmentID: segID2,
FieldBinlogs: cpaths2.inPaths,
Field2StatslogPaths: cpaths2.statsPaths,
Deltalogs: []*datapb.DeltaLogInfo{cpaths2.deltaInfo},
},
},
StartTime: 0,
TimeoutInSeconds: 1,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 40000,
Channel: "channelname",
}
alloc.random = false // generated ID = 19530
task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan)
err = task.compact()
assert.NoError(t, err)
assert.False(t, replica.hasSegment(segID1, true))
assert.False(t, replica.hasSegment(segID2, true))
assert.True(t, replica.hasSegment(19530, true))
updates, err := replica.getSegmentStatisticsUpdates(19530)
assert.NoError(t, err)
assert.Equal(t, int64(2), updates.GetNumRows())
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
plan.PlanID++
plan.Timetravel = Timestamp(25000)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9})
replica.removeSegment(19530)
require.True(t, replica.hasSegment(segID1, true))
require.True(t, replica.hasSegment(segID2, true))
require.False(t, replica.hasSegment(19530, true))
err = task.compact()
assert.NoError(t, err)
assert.False(t, replica.hasSegment(segID1, true))
assert.False(t, replica.hasSegment(segID2, true))
assert.True(t, replica.hasSegment(19530, true))
updates, err = replica.getSegmentStatisticsUpdates(19530)
assert.NoError(t, err)
assert.Equal(t, int64(3), updates.GetNumRows())
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
plan.PlanID++
plan.Timetravel = Timestamp(10000)
replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9})
replica.removeSegment(19530)
require.True(t, replica.hasSegment(segID1, true))
require.True(t, replica.hasSegment(segID2, true))
require.False(t, replica.hasSegment(19530, true))
err = task.compact()
assert.NoError(t, err)
assert.False(t, replica.hasSegment(segID1, true))
assert.False(t, replica.hasSegment(segID2, true))
assert.True(t, replica.hasSegment(19530, true))
updates, err = replica.getSegmentStatisticsUpdates(19530)
assert.NoError(t, err)
assert.Equal(t, int64(4), updates.GetNumRows())
})
}
type mockFlushManager struct {
sleepSeconds int32
}
var _ flushManager = (*mockFlushManager)(nil)
func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error {
return nil
}
func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error {
return nil
}
func (mfm *mockFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) {
go func() {
time.Sleep(time.Second * time.Duration(mfm.sleepSeconds))
injection.injected <- struct{}{}
<-injection.injectOver
}()
}

View File

@ -40,6 +40,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/logutil"
"github.com/milvus-io/milvus/internal/metrics"
@ -101,14 +102,16 @@ type DataNode struct {
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels
clearSignal chan UniqueID // collection ID
segmentCache *Cache
clearSignal chan UniqueID // collection ID
segmentCache *Cache
compactionExecutor *compactionExecutor
rootCoord types.RootCoord
dataCoord types.DataCoord
session *sessionutil.Session
watchKv kv.MetaKv
blobKv kv.BaseKV
closer io.Closer
@ -124,10 +127,11 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
cancel: cancel2,
Role: typeutil.DataNodeRole,
rootCoord: nil,
dataCoord: nil,
msFactory: factory,
segmentCache: newCache(),
rootCoord: nil,
dataCoord: nil,
msFactory: factory,
segmentCache: newCache(),
compactionExecutor: newCompactionExecutor(),
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushChs: make(map[string]chan flushMsg),
@ -406,6 +410,22 @@ func (node *DataNode) Start() error {
return errors.New("DataNode fail to connect etcd")
}
option := &miniokv.Option{
Address: Params.MinioAddress,
AccessKeyID: Params.MinioAccessKeyID,
SecretAccessKeyID: Params.MinioSecretAccessKey,
UseSSL: Params.MinioUseSSL,
CreateBucket: true,
BucketName: Params.MinioBucketName,
}
kv, err := miniokv.NewMinIOKV(node.ctx, option)
if err != nil {
return err
}
node.blobKv = kv
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
return errors.New("DataNode fail to start")
}
@ -414,6 +434,8 @@ func (node *DataNode) Start() error {
go node.BackGroundGC(node.clearSignal)
go node.compactionExecutor.start(node.ctx)
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()
@ -704,5 +726,30 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
}
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
ds, ok := node.vchan2SyncService[req.GetChannel()]
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel()))
status.Reason = errIllegalCompactionPlan.Error()
return status, nil
}
binlogIO := &binlogIO{node.blobKv, ds.idAllocator}
task := newCompactionTask(
binlogIO, binlogIO,
ds.replica,
ds.flushManager,
ds.idAllocator,
node.dataCoord,
req,
)
node.compactionExecutor.execute(task)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}

View File

@ -137,6 +137,10 @@ func clearEtcd(rootPath string) error {
type MetaFactory struct {
}
func NewMetaFactory() *MetaFactory {
return &MetaFactory{}
}
type DataFactory struct {
rawData []byte
}
@ -151,15 +155,29 @@ type RootCoordFactory struct {
type DataCoordFactory struct {
types.DataCoord
SaveBinlogPathError bool
SaveBinlogPathNotSucess bool
SaveBinlogPathError bool
SaveBinlogPathNotSuccess bool
CompleteCompactionError bool
CompleteCompactionNotSuccess bool
}
func (ds *DataCoordFactory) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
if ds.CompleteCompactionError {
return nil, errors.New("Error")
}
if ds.CompleteCompactionNotSuccess {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
if ds.SaveBinlogPathError {
return nil, errors.New("Error")
}
if ds.SaveBinlogPathNotSucess {
if ds.SaveBinlogPathNotSuccess {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
@ -514,23 +532,34 @@ func genFlowGraphDeleteMsg(pks []int64, chanName string) flowGraphMsg {
type AllocatorFactory struct {
sync.Mutex
r *rand.Rand
r *rand.Rand
isvalid bool
random bool
}
var _ allocatorInterface = &AllocatorFactory{}
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
f := &AllocatorFactory{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
isvalid: len(id) == 0 || (len(id) > 0 && id[0] > 0),
}
return f
}
func (alloc *AllocatorFactory) allocID() (UniqueID, error) {
alloc.Lock()
defer alloc.Unlock()
return alloc.r.Int63n(10000), nil
if !alloc.isvalid {
return -1, errors.New("allocID error")
}
if alloc.random {
return alloc.r.Int63n(10000), nil
}
return 19530, nil
}
func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, error) {
@ -661,6 +690,13 @@ func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstrea
return nil, errors.New("mocked failure")
}
func genInsertDataWithRowIDs(rowIDs [2]int64) *InsertData {
iD := genInsertData()
iD.Data[0].(*s.Int64FieldData).Data = rowIDs[:]
return iD
}
func genInsertData() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{

View File

@ -64,6 +64,7 @@ type Replica interface {
removeSegment(segID UniqueID)
updateStatistics(segID UniqueID, numRows int64)
refreshFlushedSegStatistics(segID UniqueID, numRows int64)
getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
segmentFlushed(segID UniqueID)
}
@ -541,6 +542,18 @@ func (replica *SegmentReplica) hasSegment(segID UniqueID, countFlushed bool) boo
return inNew || inNormal || inFlush
}
func (replica *SegmentReplica) refreshFlushedSegStatistics(segID UniqueID, numRows int64) {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
if seg, ok := replica.flushedSegments[segID]; ok {
seg.memorySize = 0
seg.numRows = numRows
return
}
log.Warn("refesh numRow on not exists segment", zap.Int64("segID", segID))
}
// updateStatistics updates the number of rows of a segment in replica.
func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) {