Refine save dropped segments into kv metastore (#18779)

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>

Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
pull/18812/head
jaime 2022-08-25 16:48:53 +08:00 committed by GitHub
parent dcf45df029
commit ef9098f84a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 61 deletions

View File

@ -503,48 +503,27 @@ func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) *SegmentInfo {
// since the flag is not marked so DataNode can re-consume the drop collection msg
// 2. when failure occurs between save meta and unwatch channel, the removal flag shall be check before let datanode watch this channel
func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*SegmentInfo) error {
// the limitation of etcd operations number per transaction is 128, since segment number might be enormous so we shall split
// all save operations into batches
// since the removal flag shall always be with the last batch, so the last batch shall be maxOperationNumber - 1
for len(modSegments) > maxOperationsPerTxn-1 {
err := m.saveDropSegmentAndRemove(channel, modSegments, false)
if err != nil {
return err
}
}
// removal flag should be saved with last batch
return m.saveDropSegmentAndRemove(channel, modSegments, true)
}
func (m *meta) saveDropSegmentAndRemove(channel string, segments map[int64]*SegmentInfo, withFlag bool) error {
segmentMap := make(map[int64]*datapb.SegmentInfo)
for id, seg := range segments {
segmentMap[id] = seg.SegmentInfo
}
// TODO: RootCoord supports read-write prohibit when dropping collection
// divides two api calls: save dropped segments & mark channel deleted
updateIDs, err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segmentMap)
segments := make([]*datapb.SegmentInfo, 0)
for _, seg := range modSegments {
segments = append(segments, seg.SegmentInfo)
}
err := m.catalog.SaveDroppedSegmentsInBatch(m.ctx, segments)
if err != nil {
return err
}
if withFlag {
err = m.catalog.MarkChannelDeleted(m.ctx, channel)
if err != nil {
return err
}
if err = m.catalog.MarkChannelDeleted(m.ctx, channel); err != nil {
return err
}
// update memory info
for _, id := range updateIDs {
m.segments.SetSegment(id, segments[id])
delete(segments, id)
for id, segment := range modSegments {
m.segments.SetSegment(id, segment)
}
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(updateIDs)))
metrics.DataCoordNumSegments.WithLabelValues(metrics.DropedSegmentLabel).Add(float64(len(segments)))
return nil
}

View File

@ -58,7 +58,7 @@ type DataCoordCatalog interface {
AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo) error
// AlterSegmentsAndAddNewSegment for transaction
AlterSegmentsAndAddNewSegment(ctx context.Context, segments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error
SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error)
SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error
DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error
MarkChannelDeleted(ctx context.Context, channel string) error
IsChannelDropped(ctx context.Context, channel string) bool

View File

@ -25,6 +25,7 @@ const (
ChannelRemovePrefix = MetaPrefix + "/channel-removal"
RemoveFlagTomestone = "removed"
MaxOperationsPerTxn = 64
MaxBytesPerTxn = 1024 * 1024
maxOperationsPerTxn = 64
maxBytesPerTxn = 1024 * 1024
)

View File

@ -179,38 +179,54 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
return nil
}
func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, modSegments map[int64]*datapb.SegmentInfo) ([]int64, error) {
func (kc *Catalog) SaveDroppedSegmentsInBatch(ctx context.Context, segments []*datapb.SegmentInfo) error {
kvs := make(map[string]string)
batchIDs := make([]int64, 0, MaxOperationsPerTxn)
batchIDs := make([]int64, 0, maxOperationsPerTxn)
size := 0
for id, s := range modSegments {
multiSave := func() error {
if len(kvs) == 0 {
return nil
}
if err := kc.Txn.MultiSave(kvs); err != nil {
log.Error("Failed to save segments in batch for DropChannel",
zap.Any("segmentIDs", batchIDs),
zap.Error(err))
return err
}
return nil
}
// the limitation of etcd operations number per transaction is 128,
// since segment number might be enormous, so we shall split all save operations into batches
splitCount := 0
for _, s := range segments {
key := buildSegmentPath(s.GetCollectionID(), s.GetPartitionID(), s.GetID())
segBytes, err := proto.Marshal(s)
if err != nil {
return nil, fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err)
return fmt.Errorf("failed to marshal segment: %d, err: %w", s.GetID(), err)
}
kvSize := len(key) + len(segBytes)
splitCount += kvSize
if len(kvs) == maxOperationsPerTxn || (len(kvs) > 0 && splitCount >= maxBytesPerTxn) {
if err := multiSave(); err != nil {
return err
}
kvs = make(map[string]string)
batchIDs = make([]int64, 0, maxOperationsPerTxn)
if splitCount >= maxBytesPerTxn {
splitCount = kvSize
}
}
kvs[key] = string(segBytes)
batchIDs = append(batchIDs, s.ID)
size += len(key) + len(segBytes)
// remove record from map `modSegments`
delete(modSegments, id)
// batch stops when one of conditions matched:
// 1. number of records is equals MaxOperationsPerTxn
// 2. left number of modSegments is equals 1
// 3. bytes size is greater than MaxBytesPerTxn
if len(kvs) == MaxOperationsPerTxn || len(modSegments) == 1 || size >= MaxBytesPerTxn {
break
}
}
err := kc.Txn.MultiSave(kvs)
if err != nil {
log.Error("Failed to save segments in batch for DropChannel", zap.Any("segmentIDs", batchIDs), zap.Error(err))
return nil, err
}
return batchIDs, nil
return multiSave()
}
func (kc *Catalog) DropSegment(ctx context.Context, segment *datapb.SegmentInfo) error {

View File

@ -3,6 +3,7 @@ package datacoord
import (
"context"
"errors"
"math/rand"
"testing"
"github.com/milvus-io/milvus/internal/kv"
@ -56,17 +57,97 @@ func Test_SaveDroppedSegmentsInBatch_SaveError(t *testing.T) {
}
catalog := &Catalog{txn}
segments := map[int64]*datapb.SegmentInfo{
1: {
segments := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1000,
},
}
ids, err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments)
assert.Nil(t, ids)
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments)
assert.Error(t, err)
}
func Test_SaveDroppedSegmentsInBatch_MultiSave(t *testing.T) {
txn := &MockedTxnKV{}
count := 0
kvSize := 0
txn.multiSave = func(kvs map[string]string) error {
count++
kvSize += len(kvs)
return nil
}
catalog := &Catalog{txn}
// testing for no splitting
{
segments1 := []*datapb.SegmentInfo{
{
ID: 1,
CollectionID: 1000,
PartitionID: 100,
},
}
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments1)
assert.Nil(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 1, kvSize)
}
// testing for reaching max operation
{
segments2 := make([]*datapb.SegmentInfo, 65)
for i := 0; i < 65; i++ {
segments2[i] = &datapb.SegmentInfo{
ID: int64(i),
CollectionID: 1000,
PartitionID: 100,
}
}
count = 0
kvSize = 0
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments2)
assert.Nil(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, 65, kvSize)
}
// testing for reaching max bytes size
{
segments3 := []*datapb.SegmentInfo{
{
ID: int64(1),
CollectionID: 1000,
PartitionID: 100,
InsertChannel: randomString(1024 * 1024 * 2),
},
{
ID: int64(2),
CollectionID: 1000,
PartitionID: 100,
InsertChannel: randomString(1024),
},
}
count = 0
kvSize = 0
err := catalog.SaveDroppedSegmentsInBatch(context.TODO(), segments3)
assert.Nil(t, err)
assert.Equal(t, 2, count)
assert.Equal(t, 2, kvSize)
}
}
func randomString(len int) string {
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(65 + rand.Intn(25))
}
return string(bytes)
}
func Test_MarkChannelDeleted_SaveError(t *testing.T) {
txn := &MockedTxnKV{}
txn.save = func(key, value string) error {