mirror of https://github.com/milvus-io/milvus.git
Fix empty segment compact and load (#12710)
1. Fix compaction save empty segment bug 2. Fix load empty segment bug 3. Add UT for dmlchannels to 100% Resolves: #12450 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/13009/head
parent
434cc49706
commit
a9a332dbcf
|
@ -1856,7 +1856,7 @@ func TestCompleteCompaction(t *testing.T) {
|
|||
func TestManualCompaction(t *testing.T) {
|
||||
Params.EnableCompaction = true
|
||||
t.Run("test manual compaction successfully", func(t *testing.T) {
|
||||
svr := &Server{}
|
||||
svr := &Server{allocator: &MockAllocator{}}
|
||||
svr.isServing = ServerStateHealthy
|
||||
svr.compactionTrigger = &mockCompactionTrigger{
|
||||
methods: map[string]interface{}{
|
||||
|
@ -1875,7 +1875,7 @@ func TestManualCompaction(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("test manual compaction failure", func(t *testing.T) {
|
||||
svr := &Server{}
|
||||
svr := &Server{allocator: &MockAllocator{}}
|
||||
svr.isServing = ServerStateHealthy
|
||||
svr.compactionTrigger = &mockCompactionTrigger{
|
||||
methods: map[string]interface{}{
|
||||
|
|
|
@ -515,12 +515,17 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
|||
if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing {
|
||||
continue
|
||||
}
|
||||
binlogs := segment.GetBinlogs()
|
||||
|
||||
if len(binlogs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
_, ok := flushedIDs[id]
|
||||
if !ok {
|
||||
flushedIDs[id] = struct{}{}
|
||||
}
|
||||
|
||||
binlogs := segment.GetBinlogs()
|
||||
field2Binlog := make(map[UniqueID][]string)
|
||||
for _, field := range binlogs {
|
||||
field2Binlog[field.GetFieldID()] = append(field2Binlog[field.GetFieldID()], field.GetBinlogs()...)
|
||||
|
@ -766,7 +771,14 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID, &timetravel{req.Timetravel})
|
||||
tt, err := getTimetravelReverseTime(ctx, s.allocator)
|
||||
if err != nil {
|
||||
log.Warn("failed to get timetravel reverse time", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID, tt)
|
||||
if err != nil {
|
||||
log.Error("failed to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
@ -122,6 +123,15 @@ func (b *binlogIO) upload(
|
|||
kvs := make(map[string]string)
|
||||
|
||||
for _, iData := range iDatas {
|
||||
tf, ok := iData.Data[common.TimeStampField]
|
||||
if !ok || tf.RowNum() == 0 {
|
||||
log.Warn("binlog io uploading empty insert data",
|
||||
zap.Int64("segmentID", segID),
|
||||
zap.Int64("collectionID", meta.GetID()),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
kv, inpaths, statspaths, err := b.genInsertBlobs(iData, partID, segID, meta)
|
||||
if err != nil {
|
||||
log.Warn("generate insert blobs wrong", zap.Error(err))
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/kv"
|
||||
memkv "github.com/milvus-io/milvus/internal/kv/mem"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -71,8 +72,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
|
|||
|
||||
iData := genEmptyInsertData()
|
||||
p, err := b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
|
||||
assert.Error(t, err)
|
||||
assert.Empty(t, p)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, p.inPaths)
|
||||
assert.Empty(t, p.statsPaths)
|
||||
assert.Empty(t, p.deltaInfo)
|
||||
|
||||
iData = &InsertData{Data: make(map[int64]storage.FieldData)}
|
||||
p, err = b.upload(context.TODO(), 1, 10, []*InsertData{iData}, dData, meta)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, p.inPaths)
|
||||
assert.Empty(t, p.statsPaths)
|
||||
assert.Empty(t, p.deltaInfo)
|
||||
|
||||
iData = genInsertData()
|
||||
dData = &DeleteData{
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
|
@ -439,11 +440,13 @@ func (t *compactionTask) compact() error {
|
|||
|
||||
// Compaction I: update pk range.
|
||||
// Compaction II: remove the segments and add a new flushed segment with pk range.
|
||||
fd := []UniqueID{}
|
||||
for _, iData := range iDatas {
|
||||
fd = append(fd, iData.Data[0].(*storage.Int64FieldData).Data...)
|
||||
}
|
||||
fd := make([]UniqueID, 0, numRows)
|
||||
if numRows > 0 {
|
||||
for _, iData := range iDatas {
|
||||
fd = append(fd, iData.Data[common.TimeStampField].(*storage.Int64FieldData).Data...)
|
||||
}
|
||||
|
||||
}
|
||||
if t.hasSegment(targetSegID, true) {
|
||||
t.refreshFlushedSegStatistics(targetSegID, numRows)
|
||||
t.refreshFlushedSegmentPKRange(targetSegID, fd)
|
||||
|
|
|
@ -13,11 +13,15 @@ package rootcoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/mqclient"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDmlChannels(t *testing.T) {
|
||||
|
@ -71,3 +75,82 @@ func TestDmlChannels(t *testing.T) {
|
|||
dml.removeChannels(chanName0)
|
||||
assert.Equal(t, 0, dml.getChannelNum())
|
||||
}
|
||||
|
||||
func TestDmChannelsFailure(t *testing.T) {
|
||||
t.Run("Test newDmlChannels", func(t *testing.T) {
|
||||
mockFactory := &FailMessageStreamFactory{}
|
||||
assert.Panics(t, func() { newDmlChannels(context.TODO(), mockFactory, "test-newdmlchannel-root", 1) })
|
||||
})
|
||||
|
||||
t.Run("Test broadcast", func(t *testing.T) {
|
||||
mockFactory := &FailMessageStreamFactory{errBroadcast: true}
|
||||
dml := newDmlChannels(context.TODO(), mockFactory, "test-newdmlchannel-root", 1)
|
||||
chanName0 := dml.getChannelName()
|
||||
dml.addChannels(chanName0)
|
||||
require.Equal(t, 1, dml.getChannelNum())
|
||||
|
||||
err := dml.broadcast([]string{chanName0}, nil)
|
||||
assert.Error(t, err)
|
||||
|
||||
v, err := dml.broadcastMark([]string{chanName0}, nil)
|
||||
assert.Empty(t, v)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// FailMessageStreamFactory mock MessageStreamFactory failure
|
||||
type FailMessageStreamFactory struct {
|
||||
msgstream.Factory
|
||||
errBroadcast bool
|
||||
}
|
||||
|
||||
func (f *FailMessageStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
|
||||
if f.errBroadcast {
|
||||
return &FailMsgStream{errBroadcast: true}, nil
|
||||
}
|
||||
return nil, errors.New("mocked failure")
|
||||
}
|
||||
|
||||
func (f *FailMessageStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error) {
|
||||
return nil, errors.New("mocked failure")
|
||||
}
|
||||
|
||||
type FailMsgStream struct {
|
||||
msgstream.MsgStream
|
||||
errBroadcast bool
|
||||
}
|
||||
|
||||
func (ms *FailMsgStream) Start() {}
|
||||
func (ms *FailMsgStream) Close() {}
|
||||
func (ms *FailMsgStream) Chan() <-chan *msgstream.MsgPack { return nil }
|
||||
func (ms *FailMsgStream) AsProducer(channels []string) {}
|
||||
func (ms *FailMsgStream) AsConsumer(channels []string, subName string) {}
|
||||
func (ms *FailMsgStream) AsReader(channels []string, subName string) {}
|
||||
func (ms *FailMsgStream) AsConsumerWithPosition(channels []string, subName string, position mqclient.SubscriptionInitialPosition) {
|
||||
}
|
||||
func (ms *FailMsgStream) SetRepackFunc(repackFunc msgstream.RepackFunc) {}
|
||||
func (ms *FailMsgStream) ComputeProduceChannelIndexes(tsMsgs []msgstream.TsMsg) [][]int32 { return nil }
|
||||
func (ms *FailMsgStream) GetProduceChannels() []string { return nil }
|
||||
func (ms *FailMsgStream) Produce(*msgstream.MsgPack) error { return nil }
|
||||
func (ms *FailMsgStream) ProduceMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (ms *FailMsgStream) Broadcast(*msgstream.MsgPack) error {
|
||||
if ms.errBroadcast {
|
||||
return errors.New("broadcast error")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (ms *FailMsgStream) BroadcastMark(*msgstream.MsgPack) (map[string][]msgstream.MessageID, error) {
|
||||
if ms.errBroadcast {
|
||||
return nil, errors.New("broadcastMark error")
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (ms *FailMsgStream) Consume() *msgstream.MsgPack { return nil }
|
||||
func (ms *FailMsgStream) Next(ctx context.Context, channelName string) (msgstream.TsMsg, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (ms *FailMsgStream) HasNext(channelName string) bool { return true }
|
||||
func (ms *FailMsgStream) Seek(offset []*msgstream.MsgPosition) error { return nil }
|
||||
func (ms *FailMsgStream) SeekReaders(msgPositions []*msgstream.MsgPosition) error { return nil }
|
||||
|
|
Loading…
Reference in New Issue