From 3cd0584a04ca6cf886fbe75d99a34c1cdb73a521 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 8 Nov 2021 19:49:07 +0800 Subject: [PATCH] Add compaction in datanode (#11353) See also: #9530 Signed-off-by: yangxuan --- go.mod | 1 + go.sum | 1 + internal/datanode/binlog_io.go | 32 +- internal/datanode/binlog_io_test.go | 4 +- internal/datanode/compaction_executor.go | 86 +++ internal/datanode/compaction_executor_test.go | 79 +++ internal/datanode/compactor.go | 586 ++++++++++++++++++ internal/datanode/compactor_test.go | 531 ++++++++++++++++ internal/datanode/data_node.go | 61 +- internal/datanode/mock_test.go | 50 +- internal/datanode/segment_replica.go | 13 + 11 files changed, 1420 insertions(+), 24 deletions(-) create mode 100644 internal/datanode/compaction_executor.go create mode 100644 internal/datanode/compaction_executor_test.go create mode 100644 internal/datanode/compactor.go create mode 100644 internal/datanode/compactor_test.go diff --git a/go.mod b/go.mod index c72882270e..161f68136e 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( go.uber.org/zap v1.17.0 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/tools v0.1.7 // indirect google.golang.org/grpc v1.38.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index 2cdcb2c1ae..bdeeb7df5a 100644 --- a/go.sum +++ b/go.sum @@ -707,6 +707,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index a1b5c22420..aedabf17f6 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -49,7 +49,7 @@ type downloader interface { type uploader interface { // upload saves InsertData and DeleteData into blob storage. // stats-binlogs are generated from InsertData. - upload(ctx context.Context, segID, partID UniqueID, iData *InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error) + upload(ctx context.Context, segID, partID UniqueID, iData []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error) } type binlogIO struct { @@ -71,7 +71,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error case <-ctx.Done(): close(r) - log.Debug("binlog download canceled by context done") + log.Warn("ctx done when downloading kvs from blob storage") return default: @@ -107,16 +107,32 @@ type cpaths struct { func (b *binlogIO) upload( ctx context.Context, segID, partID UniqueID, - iData *InsertData, + iDatas []*InsertData, dData *DeleteData, meta *etcdpb.CollectionMeta) (*cpaths, error) { - kvs, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) - if err != nil { - log.Warn("generate insert blobs wrong", zap.Error(err)) - return nil, err + var p = &cpaths{ + inPaths: make([]*datapb.FieldBinlog, 0), + statsPaths: make([]*datapb.FieldBinlog, 0), + } + + kvs := make(map[string]string) + + for _, iData := range iDatas { + kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta) + if err != nil { + log.Warn("generate insert blobs wrong", zap.Error(err)) + return nil, err + } + + for k, v := range kv { + kvs[k] = v + } + + p.inPaths = append(p.inPaths, inpaths...) + p.statsPaths = append(p.statsPaths, statspaths...) + } - p := &cpaths{inpaths, statspaths, nil} // If there are delta logs if dData != nil { diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index f1dce73273..d8c5c098ee 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -44,7 +44,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { Data: map[int64]int64{888: 666666}, } - p, err := b.upload(context.TODO(), 1, 10, iData, dData, meta) + p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta) assert.NoError(t, err) assert.Equal(t, 11, len(p.inPaths)) assert.Equal(t, 3, len(p.statsPaths)) @@ -53,7 +53,7 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - p, err = b.upload(ctx, 1, 10, iData, dData, meta) + p, err = b.upload(ctx, 1, 10, []*InsertData{iData}, dData, meta) assert.EqualError(t, err, errUploadToBlobStorage.Error()) assert.Nil(t, p) }) diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go new file mode 100644 index 0000000000..5a1358b897 --- /dev/null +++ b/internal/datanode/compaction_executor.go @@ -0,0 +1,86 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "runtime" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +const ( + maxTaskNum = 1024 +) + +var maxParallelCompactionNum = calculeateParallel() + +type compactionExecutor struct { + parallelCh chan struct{} + taskCh chan compactor +} + +// 0.5*min(8, NumCPU/2) +func calculeateParallel() int { + cores := runtime.NumCPU() + if cores < 16 { + return 4 + } + return cores / 2 +} + +func newCompactionExecutor() *compactionExecutor { + return &compactionExecutor{ + parallelCh: make(chan struct{}, maxParallelCompactionNum), + taskCh: make(chan compactor, maxTaskNum), + } +} + +func (c *compactionExecutor) execute(task compactor) { + c.taskCh <- task +} + +func (c *compactionExecutor) start(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case task := <-c.taskCh: + go c.executeTask(task) + } + } +} + +func (c *compactionExecutor) executeTask(task compactor) { + c.parallelCh <- struct{}{} + defer func() { + <-c.parallelCh + }() + + log.Info("start to execute compaction", zap.Int64("planID", task.getPlanID())) + + err := task.compact() + if err != nil { + log.Warn("compaction task failed", + zap.Int64("planID", task.getPlanID()), + zap.Error(err), + ) + } + + log.Info("end to execute compaction", zap.Int64("planID", task.getPlanID())) +} diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go new file mode 100644 index 0000000000..e489526381 --- /dev/null +++ b/internal/datanode/compaction_executor_test.go @@ -0,0 +1,79 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "testing" +) + +func TestCompactionExecutor(t *testing.T) { + t.Run("Test execute", func(t *testing.T) { + ex := newCompactionExecutor() + go ex.start(context.TODO()) + ex.execute(newMockCompactor(true)) + }) + + t.Run("Test start", func(t *testing.T) { + ex := newCompactionExecutor() + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + go ex.start(ctx) + }) + + t.Run("Test excuteTask", func(t *testing.T) { + tests := []struct { + isvalid bool + + description string + }{ + {true, "compact return nil"}, + {false, "compact return error"}, + } + + ex := newCompactionExecutor() + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + if test.isvalid { + ex.executeTask(newMockCompactor(true)) + } else { + ex.executeTask(newMockCompactor(false)) + } + }) + } + }) + +} + +func newMockCompactor(isvalid bool) compactor { + return &mockCompactor{isvalid} +} + +type mockCompactor struct { + isvalid bool +} + +func (mc *mockCompactor) compact() error { + if mc.isvalid { + return errStart + } + return nil +} + +func (mc *mockCompactor) getPlanID() UniqueID { + return 1 +} diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go new file mode 100644 index 0000000000..55e9dc9d0d --- /dev/null +++ b/internal/datanode/compactor.go @@ -0,0 +1,586 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "errors" + "fmt" + "math" + "strconv" + "sync" + "time" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/types" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +var ( + errCompactionTypeUndifined = errors.New("compaction type undefined") + errIllegalCompactionPlan = errors.New("compaction plan illegal") + errTransferType = errors.New("transfer intferface to type wrong") + errUnknownDataType = errors.New("unknown shema DataType") +) + +type iterator = storage.Iterator + +type compactor interface { + compact() error + getPlanID() UniqueID +} + +type compactionTask struct { + downloader + uploader + compactor + Replica + flushManager + allocatorInterface + + dc types.DataCoord + plan *datapb.CompactionPlan +} + +// check if compactionTask implements compactor +var _ compactor = (*compactionTask)(nil) + +func newCompactionTask( + dl downloader, + ul uploader, + replica Replica, + fm flushManager, + alloc allocatorInterface, + dc types.DataCoord, + plan *datapb.CompactionPlan) *compactionTask { + return &compactionTask{ + downloader: dl, + uploader: ul, + Replica: replica, + flushManager: fm, + allocatorInterface: alloc, + dc: dc, + plan: plan, + } +} + +func (t *compactionTask) getPlanID() UniqueID { + return t.plan.GetPlanID() +} + +func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[UniqueID]Timestamp, *DelDataBuf, error) { + + dCodec := storage.NewDeleteCodec() + + var ( + pk2ts = make(map[UniqueID]Timestamp) + dbuff = &DelDataBuf{ + delData: &DeleteData{Data: make(map[UniqueID]UniqueID)}, + tsFrom: math.MaxUint64, + tsTo: 0, + } + ) + + for _, blobs := range dBlobs { + _, _, dData, err := dCodec.Deserialize(blobs) + if err != nil { + log.Warn("merge deltalogs wrong", zap.Error(err)) + return nil, nil, err + } + + for pk, ts := range dData.Data { + if timetravelTs != Timestamp(0) && Timestamp(ts) <= timetravelTs { + pk2ts[pk] = Timestamp(ts) + continue + } + + dbuff.delData.Data[pk] = ts + + if Timestamp(ts) < dbuff.tsFrom { + dbuff.tsFrom = Timestamp(ts) + } + + if Timestamp(ts) > dbuff.tsTo { + dbuff.tsTo = Timestamp(ts) + } + } + } + + dbuff.updateSize(int64(len(dbuff.delData.Data))) + + return pk2ts, dbuff, nil +} + +func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema) ([]*InsertData, int64, error) { + + var ( + dim int // dimension of vector field + num int // numOfRows in each binlog + n int // binlog number + err error + + iDatas = make([]*InsertData, 0) + fID2Type = make(map[UniqueID]schemapb.DataType) + fID2Content = make(map[UniqueID][]interface{}) + ) + + for _, fs := range schema.GetFields() { + fID2Type[fs.GetFieldID()] = fs.GetDataType() + + // get dim + if fs.GetDataType() == schemapb.DataType_FloatVector || + fs.GetDataType() == schemapb.DataType_BinaryVector { + for _, t := range fs.GetTypeParams() { + if t.Key == "dim" { + if dim, err = strconv.Atoi(t.Value); err != nil { + log.Warn("strconv wrong on get dim", zap.Error(err)) + return nil, 0, err + } + break + } + } + } + } + + for mergeItr.HasNext() { + // There will be no error if HasNext() returns true + vInter, _ := mergeItr.Next() + + v, ok := vInter.(*storage.Value) + if !ok { + log.Warn("transfer interface to Value wrong") + return nil, 0, errors.New("Unexpected error") + } + + if _, ok := delta[v.ID]; ok { + continue + } + + row, ok := v.Value.(map[UniqueID]interface{}) + if !ok { + log.Warn("transfer interface to map wrong") + return nil, 0, errors.New("Unexpected error") + } + + for fID, vInter := range row { + if _, ok := fID2Content[fID]; !ok { + fID2Content[fID] = make([]interface{}, 0) + } + fID2Content[fID] = append(fID2Content[fID], vInter) + } + } + + // calculate numRows from rowID field, fieldID 0 + numRows := int64(len(fID2Content[0])) + num = int(Params.FlushInsertBufferSize / (int64(dim) * 4)) + n = int(numRows)/num + 1 + + for i := 0; i < n; i++ { + iDatas = append(iDatas, &InsertData{Data: make(map[storage.FieldID]storage.FieldData)}) + } + + for fID, content := range fID2Content { + tp, ok := fID2Type[fID] + if !ok { + log.Warn("no field ID in this schema", zap.Int64("fieldID", fID)) + return nil, 0, errors.New("Unexpected error") + } + + for i := 0; i < n; i++ { + var c []interface{} + + if i == n-1 { + c = content[i*num:] + } else { + c = content[i*num : i*num+num] + } + + fData, err := interface2FieldData(tp, c, int64(len(c))) + + if err != nil { + log.Warn("transfer interface to FieldData wrong", zap.Error(err)) + return nil, 0, err + } + iDatas[i].Data[fID] = fData + } + + } + return iDatas, numRows, nil +} + +func (t *compactionTask) compact() error { + ctxTimeout, cancelAll := context.WithTimeout(context.Background(), time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) + defer cancelAll() + + var targetSegID UniqueID + var err error + switch { + + case t.plan.GetType() == datapb.CompactionType_UndefinedCompaction: + log.Error("compact wrong, compaction type undefined") + return errCompactionTypeUndifined + + case len(t.plan.GetSegmentBinlogs()) < 1: + log.Error("compact wrong, there's no segments in segment binlogs") + return errIllegalCompactionPlan + + case t.plan.GetType() == datapb.CompactionType_MergeCompaction: + targetSegID, err = t.allocID() + if err != nil { + log.Error("compact wrong", zap.Error(err)) + return err + } + + case t.plan.GetType() == datapb.CompactionType_InnerCompaction: + targetSegID = t.plan.GetSegmentBinlogs()[0].GetSegmentID() + } + + log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID())) + segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs())) + for _, s := range t.plan.GetSegmentBinlogs() { + segIDs = append(segIDs, s.GetSegmentID()) + } + + collID, partID, meta, err := t.getSegmentMeta(segIDs[0]) + if err != nil { + log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + + // Inject to stop flush + ti := taskInjection{ + injected: make(chan struct{}), + injectOver: make(chan bool), + postInjection: func(pack *segmentFlushPack) { + pack.segmentID = targetSegID + }, + } + defer close(ti.injectOver) + + t.injectFlush(ti, segIDs...) + <-ti.injected + + var ( + iItr = make([]iterator, 0) + imu sync.Mutex + + // SegmentID to deltaBlobs + dblobs = make(map[UniqueID][]*Blob) + dmu sync.Mutex + ) + + g, gCtx := errgroup.WithContext(ctxTimeout) + for _, s := range t.plan.GetSegmentBinlogs() { + + // TODO may panic + fieldNum := len(s.GetFieldBinlogs()[0].GetBinlogs()) + + for idx := 0; idx < fieldNum; idx++ { + ps := make([]string, 0, fieldNum) + for _, f := range s.GetFieldBinlogs() { + ps = append(ps, f.GetBinlogs()[idx]) + } + + g.Go(func() error { + bs, err := t.download(gCtx, ps) + if err != nil { + log.Warn("download insertlogs wrong") + return err + } + + itr, err := storage.NewInsertBinlogIterator(bs) + if err != nil { + log.Warn("new insert binlogs Itr wrong") + return err + } + + imu.Lock() + iItr = append(iItr, itr) + imu.Unlock() + + return nil + }) + } + + segID := s.GetSegmentID() + for _, d := range s.GetDeltalogs() { + g.Go(func() error { + bs, err := t.download(gCtx, []string{d.GetDeltaLogPath()}) + if err != nil { + log.Warn("download deltalogs wrong") + return err + } + + dmu.Lock() + dblobs[segID] = append(dblobs[segID], bs...) + dmu.Unlock() + + return nil + }) + } + } + + if err := g.Wait(); err != nil { + log.Error("compaction IO wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + + mergeItr := storage.NewMergeIterator(iItr) + + deltaMap, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel()) + if err != nil { + log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + + iDatas, numRows, err := t.merge(mergeItr, deltaMap, meta.GetSchema()) + if err != nil { + log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + + cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta) + if err != nil { + log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + + cpaths.deltaInfo.DeltaLogSize = deltaBuf.size + cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom + cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo + + pack := &datapb.CompactionResult{ + PlanID: t.plan.GetPlanID(), + SegmentID: targetSegID, + InsertLogs: cpaths.inPaths, + Field2StatslogPaths: cpaths.statsPaths, + NumOfRows: numRows, + + Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo}, + } + + status, err := t.dc.CompleteCompaction(ctxTimeout, pack) + if err != nil { + log.Error("complete compaction rpc wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) + return err + } + if status.ErrorCode != commonpb.ErrorCode_Success { + log.Error("complete compaction wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.String("reason", status.GetReason())) + return fmt.Errorf("complete comapction wrong: %s", status.GetReason()) + } + + // Compaction I: update pk range. + // Compaction II: remove the segments and add a new flushed segment with pk range. + fd := []UniqueID{} + for _, iData := range iDatas { + fd = append(fd, iData.Data[0].(*storage.Int64FieldData).Data...) + } + + if t.hasSegment(targetSegID, true) { + t.refreshFlushedSegStatistics(targetSegID, numRows) + t.refreshFlushedSegmentPKRange(targetSegID, fd) + } else { + t.addFlushedSegmentWithPKs(targetSegID, collID, partID, t.plan.GetChannel(), numRows, fd) + + for _, seg := range segIDs { + t.removeSegment(seg) + } + } + + ti.injectOver <- true + log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID())) + return nil +} + +// TODO copy maybe expensive, but this seems to be the only convinent way. +func interface2FieldData(schemaDataType schemapb.DataType, content []interface{}, numRows int64) (storage.FieldData, error) { + var rst storage.FieldData + numOfRows := []int64{numRows} + switch schemaDataType { + case schemapb.DataType_Bool: + var data = &storage.BoolFieldData{ + NumRows: numOfRows, + Data: make([]bool, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(bool) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Int8: + var data = &storage.Int8FieldData{ + NumRows: numOfRows, + Data: make([]int8, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(int8) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Int16: + var data = &storage.Int16FieldData{ + NumRows: numOfRows, + Data: make([]int16, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(int16) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Int32: + var data = &storage.Int32FieldData{ + NumRows: numOfRows, + Data: make([]int32, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(int32) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Int64: + var data = &storage.Int64FieldData{ + NumRows: numOfRows, + Data: make([]int64, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(int64) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Float: + var data = &storage.FloatFieldData{ + NumRows: numOfRows, + Data: make([]float32, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(float32) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_Double: + var data = &storage.DoubleFieldData{ + NumRows: numOfRows, + Data: make([]float64, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(float64) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + rst = data + + case schemapb.DataType_FloatVector: + var data = &storage.FloatVectorFieldData{ + NumRows: numOfRows, + Data: make([]float32, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(float32) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + + data.Dim = len(data.Data) / int(numRows) + rst = data + + case schemapb.DataType_BinaryVector: + var data = &storage.BinaryVectorFieldData{ + NumRows: numOfRows, + Data: make([]byte, 0, len(content)), + } + + for _, c := range content { + r, ok := c.(byte) + if !ok { + return nil, errTransferType + } + data.Data = append(data.Data, r) + } + + data.Dim = len(data.Data) * 8 / int(numRows) + rst = data + + default: + return nil, errUnknownDataType + } + + return rst, nil +} + +func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *etcdpb.CollectionMeta, error) { + collID, partID, err := t.getCollectionAndPartitionID(segID) + if err != nil { + return -1, -1, nil, err + } + + // TODO current compaction timestamp replace zero? why? + // Bad desgin of describe collection. + sch, err := t.getCollectionSchema(collID, 0) + if err != nil { + return -1, -1, nil, err + } + + meta := &etcdpb.CollectionMeta{ + ID: collID, + Schema: sch, + } + return collID, partID, meta, nil +} diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go new file mode 100644 index 0000000000..4ceffb8be4 --- /dev/null +++ b/internal/datanode/compactor_test.go @@ -0,0 +1,531 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datanode + +import ( + "context" + "testing" + "time" + + memkv "github.com/milvus-io/milvus/internal/kv/mem" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/storage" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCompactionTaskInnerMethods(t *testing.T) { + t.Run("Test getSegmentMeta", func(t *testing.T) { + rc := &RootCoordFactory{} + replica, err := newReplica(context.TODO(), rc, 1) + require.NoError(t, err) + + task := &compactionTask{ + Replica: replica, + } + + _, _, _, err = task.getSegmentMeta(100) + assert.Error(t, err) + + err = replica.addNewSegment(100, 1, 10, "a", new(internalpb.MsgPosition), nil) + require.NoError(t, err) + + collID, partID, meta, err := task.getSegmentMeta(100) + assert.NoError(t, err) + assert.Equal(t, UniqueID(1), collID) + assert.Equal(t, UniqueID(10), partID) + assert.NotNil(t, meta) + + rc.setCollectionID(-2) + _, _, _, err = task.getSegmentMeta(100) + assert.Error(t, err) + }) + + t.Run("Test.interface2FieldData", func(t *testing.T) { + tests := []struct { + isvalid bool + + tp schemapb.DataType + content []interface{} + + description string + }{ + {true, schemapb.DataType_Bool, []interface{}{true, false}, "valid bool"}, + {true, schemapb.DataType_Int8, []interface{}{int8(1), int8(2)}, "valid int8"}, + {true, schemapb.DataType_Int16, []interface{}{int16(1), int16(2)}, "valid int16"}, + {true, schemapb.DataType_Int32, []interface{}{int32(1), int32(2)}, "valid int32"}, + {true, schemapb.DataType_Int64, []interface{}{int64(1), int64(2)}, "valid int64"}, + {true, schemapb.DataType_Float, []interface{}{float32(1), float32(2)}, "valid float32"}, + {true, schemapb.DataType_Double, []interface{}{float64(1), float64(2)}, "valid float64"}, + {true, schemapb.DataType_FloatVector, []interface{}{float32(1), float32(2)}, "valid floatvector"}, + {true, schemapb.DataType_BinaryVector, []interface{}{byte(255), byte(1)}, "valid binaryvector"}, + {false, schemapb.DataType_Bool, []interface{}{1, 2}, "invalid bool"}, + {false, schemapb.DataType_Int8, []interface{}{nil, nil}, "invalid int8"}, + {false, schemapb.DataType_Int16, []interface{}{nil, nil}, "invalid int16"}, + {false, schemapb.DataType_Int32, []interface{}{nil, nil}, "invalid int32"}, + {false, schemapb.DataType_Int64, []interface{}{nil, nil}, "invalid int64"}, + {false, schemapb.DataType_Float, []interface{}{nil, nil}, "invalid float32"}, + {false, schemapb.DataType_Double, []interface{}{nil, nil}, "invalid float64"}, + {false, schemapb.DataType_FloatVector, []interface{}{nil, nil}, "invalid floatvector"}, + {false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"}, + {false, schemapb.DataType_String, nil, "invalid data type"}, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + if test.isvalid { + fd, err := interface2FieldData(test.tp, test.content, 2) + assert.NoError(t, err) + assert.Equal(t, 2, fd.Length()) + } else { + fd, err := interface2FieldData(test.tp, test.content, 2) + assert.Error(t, err) + assert.Nil(t, fd) + } + }) + } + + }) + + t.Run("Test mergeDeltalogs", func(t *testing.T) { + t.Run("One segment with timetravel", func(t *testing.T) { + invalidBlobs := map[UniqueID][]*Blob{ + 1: {}, + } + + blobs, err := getDeltaBlobs( + 100, map[int64]int64{ + 1: 20000, + 2: 20001, + 3: 20002, + 4: 30000, + 5: 50000, + }) + require.NoError(t, err) + + validBlobs := map[UniqueID][]*Blob{ + 100: blobs, + } + + tests := []struct { + isvalid bool + + dBlobs map[UniqueID][]*Blob + timetravel Timestamp + + description string + }{ + {false, invalidBlobs, 0, "invalid dBlobs"}, + {true, validBlobs, 21000, "valid blobs"}, + } + + for _, test := range tests { + task := &compactionTask{} + t.Run(test.description, func(t *testing.T) { + if test.isvalid { + pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel) + assert.NoError(t, err) + assert.Equal(t, 3, len(pk2ts)) + assert.Equal(t, int64(2), db.size) + + } else { + + pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel) + assert.Error(t, err) + assert.Nil(t, pk2ts) + assert.Nil(t, db) + } + }) + + } + }) + + t.Run("Multiple segments with timetravel", func(t *testing.T) { + tests := []struct { + segIDA UniqueID + dataA map[int64]int64 + + segIDB UniqueID + dataB map[int64]int64 + + segIDC UniqueID + dataC map[int64]int64 + + timetravel Timestamp + expectedpk2ts int + expecteddb int + description string + }{ + { + 0, nil, + 100, map[int64]int64{ + 1: 20000, + 2: 30000, + 3: 20005}, + 200, map[int64]int64{ + 4: 50000, + 5: 50001, + 6: 50002}, + 40000, 3, 3, "2 segments with timetravel 40000", + }, + { + 300, map[int64]int64{ + 10: 20001, + 20: 40001, + }, + 100, map[int64]int64{ + 1: 20000, + 2: 30000, + 3: 20005}, + 200, map[int64]int64{ + 4: 50000, + 5: 50001, + 6: 50002}, + 40000, 4, 4, "3 segments with timetravel 40000", + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + dBlobs := make(map[UniqueID][]*Blob) + if test.segIDA != UniqueID(0) { + d, err := getDeltaBlobs(test.segIDA, test.dataA) + require.NoError(t, err) + dBlobs[test.segIDA] = d + } + if test.segIDB != UniqueID(0) { + d, err := getDeltaBlobs(test.segIDB, test.dataB) + require.NoError(t, err) + dBlobs[test.segIDB] = d + } + if test.segIDC != UniqueID(0) { + d, err := getDeltaBlobs(test.segIDC, test.dataC) + require.NoError(t, err) + dBlobs[test.segIDC] = d + } + + task := &compactionTask{} + pk2ts, db, err := task.mergeDeltalogs(dBlobs, test.timetravel) + assert.NoError(t, err) + assert.Equal(t, test.expectedpk2ts, len(pk2ts)) + assert.Equal(t, test.expecteddb, int(db.size)) + }) + } + }) + + }) + + t.Run("Test merge", func(t *testing.T) { + iData := genInsertData() + meta := NewMetaFactory().GetCollectionMeta(1, "test") + + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) + + iitr, err := storage.NewInsertBinlogIterator(iblobs) + require.NoError(t, err) + + mitr := storage.NewMergeIterator([]iterator{iitr}) + + dm := map[UniqueID]Timestamp{ + 1: 10000, + } + + ct := &compactionTask{} + idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema()) + assert.NoError(t, err) + assert.Equal(t, int64(1), numOfRow) + assert.Equal(t, 1, len(idata)) + + }) +} + +func getDeltaBlobs(segID UniqueID, pk2ts map[int64]int64) ([]*Blob, error) { + deltaData := &DeleteData{Data: pk2ts} + + dCodec := storage.NewDeleteCodec() + blob, err := dCodec.Serialize(1, 10, segID, deltaData) + return []*Blob{blob}, err +} + +func getInsertBlobs(segID UniqueID, iData *InsertData, meta *etcdpb.CollectionMeta) ([]*Blob, error) { + iCodec := storage.NewInsertCodec(meta) + + iblobs, _, err := iCodec.Serialize(10, segID, iData) + return iblobs, err +} + +func TestCompactorInterfaceMethods(t *testing.T) { + notEmptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{{ + SegmentID: 100, + FieldBinlogs: nil, + Field2StatslogPaths: nil, + Deltalogs: nil, + }} + + t.Run("Test compact invalid", func(t *testing.T) { + invalidAlloc := NewAllocatorFactory(-1) + emptyTask := &compactionTask{} + emptySegmentBinlogs := []*datapb.CompactionSegmentBinlogs{} + + plan := &datapb.CompactionPlan{ + PlanID: 999, + SegmentBinlogs: notEmptySegmentBinlogs, + StartTime: 0, + TimeoutInSeconds: 10, + Type: datapb.CompactionType_UndefinedCompaction, + Channel: "", + } + + emptyTask.plan = plan + err := emptyTask.compact() + assert.Error(t, err) + + plan.Type = datapb.CompactionType_InnerCompaction + plan.SegmentBinlogs = emptySegmentBinlogs + err = emptyTask.compact() + assert.Error(t, err) + + plan.Type = datapb.CompactionType_MergeCompaction + emptyTask.allocatorInterface = invalidAlloc + plan.SegmentBinlogs = notEmptySegmentBinlogs + err = emptyTask.compact() + assert.Error(t, err) + }) + + t.Run("Test typeI compact valid", func(t *testing.T) { + var collID, partID, segID UniqueID = 1, 10, 100 + + alloc := NewAllocatorFactory(1) + rc := &RootCoordFactory{} + dc := &DataCoordFactory{} + mockfm := &mockFlushManager{} + mockKv := memkv.NewMemoryKV() + mockbIO := &binlogIO{mockKv, alloc} + replica, err := newReplica(context.TODO(), rc, collID) + require.NoError(t, err) + replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1}) + + iData := genInsertData() + meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") + dData := &DeleteData{Data: map[int64]int64{ + 1: 20000, + }} + + cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) + require.NoError(t, err) + require.Equal(t, 11, len(cpaths.inPaths)) + + plan := &datapb.CompactionPlan{ + PlanID: 10080, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segID, + FieldBinlogs: cpaths.inPaths, + Field2StatslogPaths: cpaths.statsPaths, + Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo}, + }, + }, + StartTime: 0, + TimeoutInSeconds: 1, + Type: datapb.CompactionType_InnerCompaction, + Timetravel: 30000, + Channel: "channelname", + } + + task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + err = task.compact() + assert.NoError(t, err) + + updates, err := replica.getSegmentStatisticsUpdates(segID) + assert.NoError(t, err) + assert.Equal(t, int64(1), updates.GetNumRows()) + + // New test, remove all the binlogs in memkv + // Deltas in timetravel range + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) + require.NoError(t, err) + plan.PlanID++ + + plan.Timetravel = Timestamp(10000) + err = task.compact() + assert.NoError(t, err) + + updates, err = replica.getSegmentStatisticsUpdates(segID) + assert.NoError(t, err) + assert.Equal(t, int64(2), updates.GetNumRows()) + + // New test, remove all the binlogs in memkv + // Timeout + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) + require.NoError(t, err) + plan.PlanID++ + + mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1) + err = task.compact() + assert.Error(t, err) + }) + + t.Run("Test typeII compact valid", func(t *testing.T) { + var collID, partID, segID1, segID2 UniqueID = 1, 10, 200, 201 + + alloc := NewAllocatorFactory(1) + rc := &RootCoordFactory{} + dc := &DataCoordFactory{} + mockfm := &mockFlushManager{} + mockKv := memkv.NewMemoryKV() + mockbIO := &binlogIO{mockKv, alloc} + replica, err := newReplica(context.TODO(), rc, collID) + require.NoError(t, err) + + replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) + replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9}) + require.True(t, replica.hasSegment(segID1, true)) + require.True(t, replica.hasSegment(segID2, true)) + + meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name") + iData1 := genInsertDataWithRowIDs([2]int64{1, 2}) + dData1 := &DeleteData{Data: map[int64]int64{ + 1: 20000, + }} + iData2 := genInsertDataWithRowIDs([2]int64{9, 10}) + dData2 := &DeleteData{Data: map[int64]int64{ + 9: 30000, + }} + + cpaths1, err := mockbIO.upload(context.TODO(), segID1, partID, []*InsertData{iData1}, dData1, meta) + require.NoError(t, err) + require.Equal(t, 11, len(cpaths1.inPaths)) + + cpaths2, err := mockbIO.upload(context.TODO(), segID2, partID, []*InsertData{iData2}, dData2, meta) + require.NoError(t, err) + require.Equal(t, 11, len(cpaths2.inPaths)) + + plan := &datapb.CompactionPlan{ + PlanID: 10080, + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segID1, + FieldBinlogs: cpaths1.inPaths, + Field2StatslogPaths: cpaths1.statsPaths, + Deltalogs: []*datapb.DeltaLogInfo{cpaths1.deltaInfo}, + }, + { + SegmentID: segID2, + FieldBinlogs: cpaths2.inPaths, + Field2StatslogPaths: cpaths2.statsPaths, + Deltalogs: []*datapb.DeltaLogInfo{cpaths2.deltaInfo}, + }, + }, + StartTime: 0, + TimeoutInSeconds: 1, + Type: datapb.CompactionType_MergeCompaction, + Timetravel: 40000, + Channel: "channelname", + } + + alloc.random = false // generated ID = 19530 + task := newCompactionTask(mockbIO, mockbIO, replica, mockfm, alloc, dc, plan) + err = task.compact() + assert.NoError(t, err) + + assert.False(t, replica.hasSegment(segID1, true)) + assert.False(t, replica.hasSegment(segID2, true)) + assert.True(t, replica.hasSegment(19530, true)) + updates, err := replica.getSegmentStatisticsUpdates(19530) + assert.NoError(t, err) + assert.Equal(t, int64(2), updates.GetNumRows()) + + // New test, remove all the binlogs in memkv + // Deltas in timetravel range + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + plan.PlanID++ + + plan.Timetravel = Timestamp(25000) + replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) + replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9}) + replica.removeSegment(19530) + require.True(t, replica.hasSegment(segID1, true)) + require.True(t, replica.hasSegment(segID2, true)) + require.False(t, replica.hasSegment(19530, true)) + + err = task.compact() + assert.NoError(t, err) + + assert.False(t, replica.hasSegment(segID1, true)) + assert.False(t, replica.hasSegment(segID2, true)) + assert.True(t, replica.hasSegment(19530, true)) + updates, err = replica.getSegmentStatisticsUpdates(19530) + assert.NoError(t, err) + assert.Equal(t, int64(3), updates.GetNumRows()) + + // New test, remove all the binlogs in memkv + // Deltas in timetravel range + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + plan.PlanID++ + + plan.Timetravel = Timestamp(10000) + replica.addFlushedSegmentWithPKs(segID1, collID, partID, "channelname", 2, []UniqueID{1}) + replica.addFlushedSegmentWithPKs(segID2, collID, partID, "channelname", 2, []UniqueID{9}) + replica.removeSegment(19530) + require.True(t, replica.hasSegment(segID1, true)) + require.True(t, replica.hasSegment(segID2, true)) + require.False(t, replica.hasSegment(19530, true)) + + err = task.compact() + assert.NoError(t, err) + + assert.False(t, replica.hasSegment(segID1, true)) + assert.False(t, replica.hasSegment(segID2, true)) + assert.True(t, replica.hasSegment(19530, true)) + updates, err = replica.getSegmentStatisticsUpdates(19530) + assert.NoError(t, err) + assert.Equal(t, int64(4), updates.GetNumRows()) + }) +} + +type mockFlushManager struct { + sleepSeconds int32 +} + +var _ flushManager = (*mockFlushManager)(nil) + +func (mfm *mockFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool, pos *internalpb.MsgPosition) error { + return nil +} + +func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error { + return nil +} + +func (mfm *mockFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) { + go func() { + time.Sleep(time.Second * time.Duration(mfm.sleepSeconds)) + injection.injected <- struct{}{} + <-injection.injectOver + }() +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index eb24c70749..e36db94ef9 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -40,6 +40,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/logutil" "github.com/milvus-io/milvus/internal/metrics" @@ -101,14 +102,16 @@ type DataNode struct { vchan2SyncService map[string]*dataSyncService // vchannel name vchan2FlushChs map[string]chan flushMsg // vchannel name to flush channels - clearSignal chan UniqueID // collection ID - segmentCache *Cache + clearSignal chan UniqueID // collection ID + segmentCache *Cache + compactionExecutor *compactionExecutor rootCoord types.RootCoord dataCoord types.DataCoord session *sessionutil.Session watchKv kv.MetaKv + blobKv kv.BaseKV closer io.Closer @@ -124,10 +127,11 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { cancel: cancel2, Role: typeutil.DataNodeRole, - rootCoord: nil, - dataCoord: nil, - msFactory: factory, - segmentCache: newCache(), + rootCoord: nil, + dataCoord: nil, + msFactory: factory, + segmentCache: newCache(), + compactionExecutor: newCompactionExecutor(), vchan2SyncService: make(map[string]*dataSyncService), vchan2FlushChs: make(map[string]chan flushMsg), @@ -406,6 +410,22 @@ func (node *DataNode) Start() error { return errors.New("DataNode fail to connect etcd") } + option := &miniokv.Option{ + Address: Params.MinioAddress, + AccessKeyID: Params.MinioAccessKeyID, + SecretAccessKeyID: Params.MinioSecretAccessKey, + UseSSL: Params.MinioUseSSL, + CreateBucket: true, + BucketName: Params.MinioBucketName, + } + + kv, err := miniokv.NewMinIOKV(node.ctx, option) + if err != nil { + return err + } + + node.blobKv = kv + if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil { return errors.New("DataNode fail to start") } @@ -414,6 +434,8 @@ func (node *DataNode) Start() error { go node.BackGroundGC(node.clearSignal) + go node.compactionExecutor.start(node.ctx) + Params.CreatedTime = time.Now() Params.UpdatedTime = time.Now() @@ -704,5 +726,30 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe } func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { - panic("not implemented") // TODO: Implement + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + } + + ds, ok := node.vchan2SyncService[req.GetChannel()] + if !ok { + log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel())) + status.Reason = errIllegalCompactionPlan.Error() + return status, nil + } + + binlogIO := &binlogIO{node.blobKv, ds.idAllocator} + task := newCompactionTask( + binlogIO, binlogIO, + ds.replica, + ds.flushManager, + ds.idAllocator, + node.dataCoord, + req, + ) + + node.compactionExecutor.execute(task) + + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, nil } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 2f8655bbe6..1dee6eb1f6 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -137,6 +137,10 @@ func clearEtcd(rootPath string) error { type MetaFactory struct { } +func NewMetaFactory() *MetaFactory { + return &MetaFactory{} +} + type DataFactory struct { rawData []byte } @@ -151,15 +155,29 @@ type RootCoordFactory struct { type DataCoordFactory struct { types.DataCoord - SaveBinlogPathError bool - SaveBinlogPathNotSucess bool + SaveBinlogPathError bool + SaveBinlogPathNotSuccess bool + + CompleteCompactionError bool + CompleteCompactionNotSuccess bool +} + +func (ds *DataCoordFactory) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) { + if ds.CompleteCompactionError { + return nil, errors.New("Error") + } + if ds.CompleteCompactionNotSuccess { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil + } + + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil } func (ds *DataCoordFactory) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { if ds.SaveBinlogPathError { return nil, errors.New("Error") } - if ds.SaveBinlogPathNotSucess { + if ds.SaveBinlogPathNotSuccess { return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil } @@ -514,23 +532,34 @@ func genFlowGraphDeleteMsg(pks []int64, chanName string) flowGraphMsg { type AllocatorFactory struct { sync.Mutex - r *rand.Rand + r *rand.Rand + isvalid bool + random bool } var _ allocatorInterface = &AllocatorFactory{} func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { f := &AllocatorFactory{ - r: rand.New(rand.NewSource(time.Now().UnixNano())), + r: rand.New(rand.NewSource(time.Now().UnixNano())), + isvalid: len(id) == 0 || (len(id) > 0 && id[0] > 0), } - return f } func (alloc *AllocatorFactory) allocID() (UniqueID, error) { alloc.Lock() defer alloc.Unlock() - return alloc.r.Int63n(10000), nil + + if !alloc.isvalid { + return -1, errors.New("allocID error") + } + + if alloc.random { + return alloc.r.Int63n(10000), nil + } + + return 19530, nil } func (alloc *AllocatorFactory) allocIDBatch(count uint32) (UniqueID, uint32, error) { @@ -661,6 +690,13 @@ func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstrea return nil, errors.New("mocked failure") } +func genInsertDataWithRowIDs(rowIDs [2]int64) *InsertData { + iD := genInsertData() + iD.Data[0].(*s.Int64FieldData).Data = rowIDs[:] + + return iD +} + func genInsertData() *InsertData { return &InsertData{ Data: map[int64]s.FieldData{ diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go index 4c361fdb19..05dbadb391 100644 --- a/internal/datanode/segment_replica.go +++ b/internal/datanode/segment_replica.go @@ -64,6 +64,7 @@ type Replica interface { removeSegment(segID UniqueID) updateStatistics(segID UniqueID, numRows int64) + refreshFlushedSegStatistics(segID UniqueID, numRows int64) getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) segmentFlushed(segID UniqueID) } @@ -541,6 +542,18 @@ func (replica *SegmentReplica) hasSegment(segID UniqueID, countFlushed bool) boo return inNew || inNormal || inFlush } +func (replica *SegmentReplica) refreshFlushedSegStatistics(segID UniqueID, numRows int64) { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + if seg, ok := replica.flushedSegments[segID]; ok { + seg.memorySize = 0 + seg.numRows = numRows + return + } + + log.Warn("refesh numRow on not exists segment", zap.Int64("segID", segID)) +} // updateStatistics updates the number of rows of a segment in replica. func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) {