Add ut and comments for datacoord meta (#7560)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/7557/merge
congqixia 2021-09-08 11:35:59 +08:00 committed by GitHub
parent 58700dedbc
commit 664aeb09c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 55 deletions

View File

@ -34,6 +34,7 @@ type meta struct {
segments *SegmentsInfo // segment id to segment info
}
// NewMeta create meta from provided `kv.TxnKV`
func NewMeta(kv kv.TxnKV) (*meta, error) {
mt := &meta{
client: kv,
@ -47,6 +48,7 @@ func NewMeta(kv kv.TxnKV) (*meta, error) {
return mt, nil
}
// realodFromKV load meta from KV storage
func (m *meta) reloadFromKV() error {
_, values, err := m.client.LoadWithPrefix(segmentPrefix)
if err != nil {
@ -55,6 +57,7 @@ func (m *meta) reloadFromKV() error {
for _, value := range values {
segmentInfo := &datapb.SegmentInfo{}
// TODO deprecate all proto text marshal/unmarsahl
err = proto.UnmarshalText(value, segmentInfo)
if err != nil {
return fmt.Errorf("DataCoord reloadFromKV UnMarshalText datapb.SegmentInfo err:%w", err)
@ -65,12 +68,15 @@ func (m *meta) reloadFromKV() error {
return nil
}
// AddCollection add collection into meta
// Note that collection info is just for caching and will not be set into etcd from datacoord
func (m *meta) AddCollection(collection *datapb.CollectionInfo) {
m.Lock()
defer m.Unlock()
m.collections[collection.ID] = collection
}
// GetCollection get collection info with provided collection id from local cache
func (m *meta) GetCollection(collectionID UniqueID) *datapb.CollectionInfo {
m.RLock()
defer m.RUnlock()
@ -81,6 +87,7 @@ func (m *meta) GetCollection(collectionID UniqueID) *datapb.CollectionInfo {
return collection
}
// GetNumRowsOfCollection returns total rows count of segments belongs to provided collection
func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
m.RLock()
defer m.RUnlock()
@ -94,6 +101,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) int64 {
return ret
}
// AddSegment records segment info, persisting info into kv store
func (m *meta) AddSegment(segment *SegmentInfo) error {
m.Lock()
defer m.Unlock()
@ -104,30 +112,14 @@ func (m *meta) AddSegment(segment *SegmentInfo) error {
return nil
}
func (m *meta) SetRowCount(segmentID UniqueID, rowCount int64) error {
m.Lock()
defer m.Unlock()
m.segments.SetRowCount(segmentID, rowCount)
if segment := m.segments.GetSegment(segmentID); segment != nil {
return m.saveSegmentInfo(segment)
}
return nil
}
func (m *meta) SetLastExpireTime(segmentID UniqueID, expireTs Timestamp) error {
m.Lock()
defer m.Unlock()
m.segments.SetLasteExpiraTime(segmentID, expireTs)
if segment := m.segments.GetSegment(segmentID); segment != nil {
return m.saveSegmentInfo(segment)
}
return nil
}
// DropSegment remove segment with provided id, etcd persistence also removed
func (m *meta) DropSegment(segmentID UniqueID) error {
m.Lock()
defer m.Unlock()
segment := m.segments.GetSegment(segmentID)
if segment == nil {
return nil
}
m.segments.DropSegment(segmentID)
if err := m.removeSegmentInfo(segment); err != nil {
return err
@ -135,12 +127,15 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
return nil
}
// GetSegment returns segment info with provided id
// if not segment is found, nil will be returned
func (m *meta) GetSegment(segID UniqueID) *SegmentInfo {
m.RLock()
defer m.RUnlock()
return m.segments.GetSegment(segID)
}
// SetState setting segment with provided ID state
func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
m.Lock()
defer m.Unlock()
@ -151,6 +146,9 @@ func (m *meta) SetState(segmentID UniqueID, state commonpb.SegmentState) error {
return nil
}
// UpdateFlushSegmentsInfo update segment partial/completed flush info
// `flushed` parameter indicating whether segment is flushed completely or partially
// `binlogs`, `checkpoints` and `statPositions` are persistence data for segment
func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
binlogs []*datapb.FieldBinlog, checkpoints []*datapb.CheckPoint,
startPositions []*datapb.SegmentStartPosition) error {
@ -222,7 +220,8 @@ func (m *meta) UpdateFlushSegmentsInfo(segmentID UniqueID, flushed bool,
return nil
}
func (m *meta) ListSegmentIds() []UniqueID {
// ListSegmentIDs list all segment ids stored in meta (no collection filter)
func (m *meta) ListSegmentIDs() []UniqueID {
m.RLock()
defer m.RUnlock()
@ -231,10 +230,11 @@ func (m *meta) ListSegmentIds() []UniqueID {
for _, segment := range segments {
infos = append(infos, segment.GetID())
}
return infos
return infos
}
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
@ -249,6 +249,7 @@ func (m *meta) GetSegmentsByChannel(dmlCh string) []*SegmentInfo {
return infos
}
// GetSegmentsOfCollection returns all segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
@ -262,6 +263,7 @@ func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []UniqueID {
return ret
}
// GetSegmentsOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsOfPartition(collectionID, partitionID UniqueID) []UniqueID {
m.RLock()
defer m.RUnlock()
@ -275,6 +277,7 @@ func (m *meta) GetSegmentsOfPartition(collectionID, partitionID UniqueID) []Uniq
return ret
}
// GetNumRowsOfPartition returns row count of segments belongs to provided collection & partition
func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID) int64 {
m.RLock()
defer m.RUnlock()
@ -288,6 +291,7 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
return ret
}
// GetUnFlushedSegments get all segments which state is not `Flushing` nor `Flushed`
func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
@ -301,6 +305,7 @@ func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
return ret
}
// GetFlushingSegments get all segments which state is `Flushing`
func (m *meta) GetFlushingSegments() []*SegmentInfo {
m.RLock()
defer m.RUnlock()
@ -314,6 +319,7 @@ func (m *meta) GetFlushingSegments() []*SegmentInfo {
return ret
}
// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
m.Lock()
defer m.Unlock()
@ -324,24 +330,31 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
return nil
}
// SetAllocations set Segment allocations, will overwrite ALL original allocations
// Note that allocations is not persisted in KV store
func (m *meta) SetAllocations(segmentID UniqueID, allocations []*Allocation) {
m.Lock()
defer m.Unlock()
m.segments.SetAllocations(segmentID, allocations)
}
// SetCurrentRows set current row count for segment with provided `segmentID`
// Note that currRows is not persisted in KV store
func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) {
m.Lock()
defer m.Unlock()
m.segments.SetCurrentRows(segmentID, rows)
}
// SetLastFlushTime set LastFlushTime for segment with provided `segmentID`
// Note that lastFlushTime is not persisted in KV store
func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) {
m.Lock()
defer m.Unlock()
m.segments.SetFlushTime(segmentID, t)
}
// MoveSegmentBinlogs migration logic, moving segment binlong information for legacy keys
func (m *meta) MoveSegmentBinlogs(segmentID UniqueID, oldPathPrefix string, field2Binlogs map[UniqueID][]string) error {
m.Lock()
defer m.Unlock()
@ -359,6 +372,7 @@ func (m *meta) MoveSegmentBinlogs(segmentID UniqueID, oldPathPrefix string, fiel
return nil
}
// saveSegmentInfo utility function saving segment info into kv store
func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
segBytes := proto.MarshalTextString(segment.SegmentInfo)
@ -366,27 +380,24 @@ func (m *meta) saveSegmentInfo(segment *SegmentInfo) error {
return m.client.Save(key, segBytes)
}
// removeSegmentInfo utility function removing segment info from kv store
// Note that nil parameter will cause panicking
func (m *meta) removeSegmentInfo(segment *SegmentInfo) error {
key := buildSegmentPath(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID())
return m.client.Remove(key)
}
// saveKvTxn batch save kvs
func (m *meta) saveKvTxn(kv map[string]string) error {
return m.client.MultiSave(kv)
}
// buildSegmentPath common logic mapping segment info to corresponding key in kv store
func buildSegmentPath(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", segmentPrefix, collectionID, partitionID, segmentID)
}
func buildCollectionPath(collectionID UniqueID) string {
return fmt.Sprintf("%s/%d/", segmentPrefix, collectionID)
}
func buildPartitionPath(collectionID UniqueID, partitionID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/", segmentPrefix, collectionID, partitionID)
}
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info
func buildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) *SegmentInfo {
info := &datapb.SegmentInfo{
ID: segmentID,

View File

@ -15,6 +15,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/stretchr/testify/assert"
@ -119,6 +120,30 @@ func TestMeta_Basic(t *testing.T) {
assert.EqualValues(t, commonpb.SegmentState_Flushed, info0_0.State)
})
t.Run("Test segment with kv fails", func(t *testing.T) {
// inject error for `Save`
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := NewMeta(fkv)
assert.Nil(t, err)
err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{}))
assert.NotNil(t, err)
fkv2 := &removeFailKV{TxnKV: memoryKV}
meta, err = NewMeta(fkv2)
assert.Nil(t, err)
// nil, since no segment yet
err = meta.DropSegment(0)
assert.Nil(t, err)
// nil, since Save error not injected
err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{}))
assert.Nil(t, err)
// error injected
err = meta.DropSegment(0)
assert.NotNil(t, err)
})
t.Run("Test GetCount", func(t *testing.T) {
const rowCount0 = 100
const rowCount1 = 300

View File

@ -16,6 +16,7 @@ import (
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -54,6 +55,8 @@ func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {
return val, nil
}
var _ allocator = (*FailsAllocator)(nil)
// FailsAllocator allocator that fails
type FailsAllocator struct{}
@ -65,6 +68,22 @@ func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) {
return 0, errors.New("always fail")
}
// a mock kv that always fail when do `Save`
type saveFailKV struct{ kv.TxnKV }
// Save override behavior
func (kv *saveFailKV) Save(key, value string) error {
return errors.New("mocked fail")
}
// a mock kv that always fail when do `Remove`
type removeFailKV struct{ kv.TxnKV }
// Remove override behavior, inject error
func (kv *removeFailKV) Remove(key string) error {
return errors.New("mocked fail")
}
func newMockAllocator() *MockAllocator {
return &MockAllocator{}
}

View File

@ -25,7 +25,7 @@ func NewMoveBinlogPathHelper(kv kv.TxnKV, meta *meta) *MoveBinlogPathHelper {
}
func (h *MoveBinlogPathHelper) Execute() error {
segmentIds := h.meta.ListSegmentIds()
segmentIds := h.meta.ListSegmentIDs()
if len(segmentIds) == 1 {
log.Debug("there's 1 segment's binlogs to move", zap.Int64("segmentID", segmentIds[0]))

View File

@ -20,6 +20,10 @@ type SegmentInfo struct {
lastFlushTime time.Time
}
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
// assign current rows to 0 and pre-allocate `allocations` slice
// Note that the allocation information is not preserved,
// the worst case scenario is to have a segment with twice size we expects
func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: info,
@ -62,12 +66,6 @@ func (s *SegmentsInfo) SetRowCount(segmentID UniqueID, rowCount int64) {
}
}
func (s *SegmentsInfo) SetLasteExpiraTime(segmentID UniqueID, expireTs Timestamp) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetExpireTime(expireTs))
}
}
func (s *SegmentsInfo) SetState(segmentID UniqueID, state commonpb.SegmentState) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetState(state))

View File

@ -11,14 +11,11 @@ package datacoord
import (
"context"
"errors"
"fmt"
"math"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -316,17 +313,6 @@ func TestGetFlushableSegments(t *testing.T) {
})
}
// a mock kv that always fail when do `Save`
type saveFailKv struct {
kv.TxnKV
}
// LoadWithPrefix override behavior
func (kv *saveFailKv) Save(key, value string) error {
fmt.Println("here")
return errors.New("mocked fail")
}
func TestTryToSealSegment(t *testing.T) {
t.Run("normal seal with segment policies", func(t *testing.T) {
Params.Init()
@ -409,7 +395,7 @@ func TestTryToSealSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKv{TxnKV: memoryKV}
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := NewMeta(memoryKV)
assert.Nil(t, err)
@ -435,7 +421,7 @@ func TestTryToSealSegment(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKv{TxnKV: memoryKV}
fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := NewMeta(memoryKV)
assert.Nil(t, err)

View File

@ -56,13 +56,15 @@ var (
)
type (
UniqueID = typeutil.UniqueID
// UniqueID shortcut for typeutil.UniqueID
UniqueID = typeutil.UniqueID
// Timestamp shortcurt for typeutil.Timestamp
Timestamp = typeutil.Timestamp
)
var errNilKvClient = errors.New("kv client not initialized")
// ServerState type alias
// ServerState type alias, presents datacoord Server State
type ServerState = int64
const (