mirror of https://github.com/milvus-io/milvus.git
Correct auto-flush behaviour (#5282)
Before this PR, DataNode considered auto-flush a valid flush complete. It's wrong. So I open this PR to correct this behaviour in DataNode. Now binlog paths from auto-flush will be buffered in replica, waiting until the manul flush to save into etcd all together. See also: #5220, #5268 A follow up job of #5271 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/5286/head
parent
6dba0fd9cf
commit
fc630bc1c8
|
@ -13,6 +13,8 @@ package datanode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"path"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
|
||||
|
@ -22,11 +24,15 @@ import (
|
|||
|
||||
type allocatorInterface interface {
|
||||
allocID() (UniqueID, error)
|
||||
genKey(alloc bool, ids ...UniqueID) (key string, err error)
|
||||
}
|
||||
|
||||
type allocator struct {
|
||||
masterService types.MasterService
|
||||
}
|
||||
|
||||
var _ allocatorInterface = &allocator{}
|
||||
|
||||
func newAllocator(s types.MasterService) *allocator {
|
||||
return &allocator{
|
||||
masterService: s,
|
||||
|
@ -49,3 +55,24 @@ func (alloc *allocator) allocID() (UniqueID, error) {
|
|||
}
|
||||
return resp.ID, nil
|
||||
}
|
||||
|
||||
// genKey gives a valid key string for lists of UniqueIDs:
|
||||
// if alloc is true, the returned keys will have a generated-unique ID at the end.
|
||||
// if alloc is false, the returned keys will only consist of provided ids.
|
||||
func (alloc *allocator) genKey(isalloc bool, ids ...UniqueID) (key string, err error) {
|
||||
if isalloc {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ids = append(ids, idx)
|
||||
}
|
||||
|
||||
idStr := make([]string, len(ids))
|
||||
for _, id := range ids {
|
||||
idStr = append(idStr, strconv.FormatInt(id, 10))
|
||||
}
|
||||
|
||||
key = path.Join(idStr...)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -63,20 +63,22 @@ func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error
|
|||
// SaveSegmentBinlogMetaTxn stores all fields' binlog paths of a segment in a transaction.
|
||||
// segment binlog etcd meta key:
|
||||
// ${prefix}/${segmentID}/${fieldID}/${idx}
|
||||
func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID]string) error {
|
||||
func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID][]string) error {
|
||||
|
||||
etcdKey2binlogPath := make(map[string]string, len(field2Path))
|
||||
for fieldID, p := range field2Path {
|
||||
key, err := bm.genKey(true, segmentID, fieldID)
|
||||
if err != nil {
|
||||
return err
|
||||
etcdKey2binlogPath := make(map[string]string)
|
||||
for fieldID, paths := range field2Path {
|
||||
for _, p := range paths {
|
||||
key, err := bm.genKey(true, segmentID, fieldID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{
|
||||
FieldID: fieldID,
|
||||
BinlogPath: p,
|
||||
})
|
||||
etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath
|
||||
}
|
||||
|
||||
binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{
|
||||
FieldID: fieldID,
|
||||
BinlogPath: p,
|
||||
})
|
||||
etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath
|
||||
}
|
||||
return bm.client.MultiSave(etcdKey2binlogPath)
|
||||
}
|
||||
|
|
|
@ -63,10 +63,10 @@ func TestMetaTable_Basic(t *testing.T) {
|
|||
|
||||
t.Run("TestBasic_SaveSegmentBinlogMetaTxn", func(t *testing.T) {
|
||||
segID := UniqueID(999999)
|
||||
fieldID2Path := map[UniqueID]string{
|
||||
100: "a",
|
||||
200: "b",
|
||||
300: "c",
|
||||
fieldID2Path := map[UniqueID][]string{
|
||||
100: {"a"},
|
||||
200: {"b"},
|
||||
300: {"c"},
|
||||
}
|
||||
|
||||
err := meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path)
|
||||
|
@ -87,10 +87,10 @@ func TestMetaTable_Basic(t *testing.T) {
|
|||
assert.Equal(t, 1, len(metas))
|
||||
assert.Equal(t, "c", metas[0].GetBinlogPath())
|
||||
|
||||
fieldID2Path2 := map[UniqueID]string{
|
||||
100: "aa",
|
||||
200: "bb",
|
||||
300: "cc",
|
||||
fieldID2Path2 := map[UniqueID][]string{
|
||||
100: {"aa"},
|
||||
200: {"bb"},
|
||||
300: {"cc"},
|
||||
}
|
||||
|
||||
err = meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path2)
|
||||
|
|
|
@ -21,10 +21,12 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
func TestDataNode(t *testing.T) {
|
||||
func TestMain(t *testing.M) {
|
||||
Params.Init()
|
||||
refreshChannelNames()
|
||||
}
|
||||
|
||||
func TestDataNode(t *testing.T) {
|
||||
node := newDataNodeMock()
|
||||
node.Start()
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ func (dsService *dataSyncService) initNodes() {
|
|||
|
||||
var filterDmNode Node = newFilteredDmNode()
|
||||
var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica)
|
||||
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory)
|
||||
var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory, dsService.idAllocator)
|
||||
var gcNode Node = newGCNode(dsService.replica)
|
||||
|
||||
dsService.fg.AddNode(dmStreamNode)
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -50,7 +51,8 @@ type insertBufferNode struct {
|
|||
BaseNode
|
||||
insertBuffer *insertBuffer
|
||||
replica Replica
|
||||
flushMeta *binlogMeta
|
||||
flushMeta *binlogMeta // GOOSE TODO remove
|
||||
idAllocator allocatorInterface
|
||||
flushMap sync.Map
|
||||
|
||||
minIOKV kv.BaseKV
|
||||
|
@ -163,9 +165,9 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs))
|
||||
}
|
||||
|
||||
segIDs := make([]UniqueID, 0, len(uniqueSeg))
|
||||
segToUpdate := make([]UniqueID, 0, len(uniqueSeg))
|
||||
for id, num := range uniqueSeg {
|
||||
segIDs = append(segIDs, id)
|
||||
segToUpdate = append(segToUpdate, id)
|
||||
|
||||
err := ibNode.replica.updateStatistics(id, num)
|
||||
if err != nil {
|
||||
|
@ -173,8 +175,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
}
|
||||
|
||||
if len(segIDs) > 0 {
|
||||
err := ibNode.updateSegStatistics(segIDs)
|
||||
if len(segToUpdate) > 0 {
|
||||
err := ibNode.updateSegStatistics(segToUpdate)
|
||||
if err != nil {
|
||||
log.Error("update segment statistics error", zap.Error(err))
|
||||
}
|
||||
|
@ -465,37 +467,6 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
ibNode.replica.setEndPosition(currentSegID, endPosition)
|
||||
}
|
||||
|
||||
// 1.4 if full, auto flush
|
||||
if ibNode.insertBuffer.full(currentSegID) {
|
||||
log.Debug(". Insert Buffer full, auto flushing ",
|
||||
zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID)))
|
||||
|
||||
collSch, err := ibNode.getCollectionSchemaByID(collection.GetID())
|
||||
if err != nil {
|
||||
log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
collMeta := &etcdpb.CollectionMeta{
|
||||
Schema: collSch,
|
||||
ID: collection.GetID(),
|
||||
}
|
||||
|
||||
ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
|
||||
delete(ibNode.insertBuffer.insertData, currentSegID)
|
||||
|
||||
finishCh := make(chan bool)
|
||||
go flushSegmentTxn(collMeta, currentSegID, msg.GetPartitionID(), collection.GetID(),
|
||||
&ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV,
|
||||
finishCh)
|
||||
|
||||
go func(finishCh <-chan bool) {
|
||||
if finished := <-finishCh; !finished {
|
||||
log.Debug(".. Auto Flush failed ..")
|
||||
return
|
||||
}
|
||||
log.Debug(".. Auto Flush completed ..")
|
||||
}(finishCh)
|
||||
}
|
||||
}
|
||||
|
||||
if len(iMsg.insertMessages) > 0 {
|
||||
|
@ -517,12 +488,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
for _, currentSegID := range msg.segmentIDs {
|
||||
log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
|
||||
|
||||
finishCh := make(chan bool)
|
||||
// finishCh := make(chan bool)
|
||||
finishCh := make(chan map[UniqueID]string)
|
||||
go ibNode.completeFlush(currentSegID, finishCh)
|
||||
|
||||
if ibNode.insertBuffer.size(currentSegID) <= 0 {
|
||||
log.Debug(".. Buffer empty ...")
|
||||
finishCh <- true
|
||||
finishCh <- make(map[UniqueID]string)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -530,7 +502,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID])
|
||||
delete(ibNode.insertBuffer.insertData, currentSegID)
|
||||
clearFn := func() {
|
||||
finishCh <- false
|
||||
finishCh <- nil
|
||||
log.Debug(".. Clearing flush Buffer ..")
|
||||
ibNode.flushMap.Delete(currentSegID)
|
||||
}
|
||||
|
@ -554,8 +526,36 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
ID: seg.collectionID,
|
||||
}
|
||||
|
||||
go flushSegmentTxn(collMeta, currentSegID, seg.partitionID, seg.collectionID,
|
||||
&ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, finishCh)
|
||||
go flushSegment(collMeta, currentSegID, seg.partitionID, seg.collectionID,
|
||||
&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
|
||||
}
|
||||
}
|
||||
|
||||
for _, segToFlush := range segToUpdate {
|
||||
// If full, auto flush
|
||||
if ibNode.insertBuffer.full(segToFlush) {
|
||||
log.Debug(". Insert Buffer full, auto flushing ",
|
||||
zap.Int32("num of rows", ibNode.insertBuffer.size(segToFlush)))
|
||||
|
||||
collMeta, err := ibNode.getCollMetabySegID(segToFlush)
|
||||
if err != nil {
|
||||
log.Error("Auto flush failed .. cannot get collection meta ..", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
ibNode.flushMap.Store(segToFlush, ibNode.insertBuffer.insertData[segToFlush])
|
||||
delete(ibNode.insertBuffer.insertData, segToFlush)
|
||||
|
||||
collID, partitionID, err := ibNode.getCollectionandPartitionIDbySegID(segToFlush)
|
||||
if err != nil {
|
||||
log.Error("Auto flush failed .. cannot get collection ID or partition ID..", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
finishCh := make(chan map[UniqueID]string)
|
||||
go flushSegment(collMeta, segToFlush, partitionID, collID,
|
||||
&ibNode.flushMap, ibNode.minIOKV, finishCh, ibNode.idAllocator)
|
||||
go ibNode.bufferAutoFlushPaths(finishCh, segToFlush)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -574,11 +574,14 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
return []Msg{res}
|
||||
}
|
||||
|
||||
func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID,
|
||||
insertData *sync.Map, meta *binlogMeta, kv kv.BaseKV, finishCh chan<- bool) {
|
||||
func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID UniqueID,
|
||||
insertData *sync.Map, kv kv.BaseKV, field2PathCh chan<- map[UniqueID]string, idAllocator allocatorInterface) {
|
||||
|
||||
clearFn := func(isSuccess bool) {
|
||||
finishCh <- isSuccess
|
||||
if !isSuccess {
|
||||
field2PathCh <- nil
|
||||
}
|
||||
|
||||
log.Debug(".. Clearing flush Buffer ..")
|
||||
insertData.Delete(segID)
|
||||
}
|
||||
|
@ -612,7 +615,7 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
|
|||
return
|
||||
}
|
||||
|
||||
k, err := meta.genKey(true, collID, partitionID, segID, fieldID)
|
||||
k, err := idAllocator.genKey(true, collID, partitionID, segID, fieldID)
|
||||
if err != nil {
|
||||
log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err))
|
||||
clearFn(false)
|
||||
|
@ -633,20 +636,38 @@ func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionI
|
|||
return
|
||||
}
|
||||
|
||||
log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number", len(binLogs)))
|
||||
err = meta.SaveSegmentBinlogMetaTxn(segID, field2Path)
|
||||
if err != nil {
|
||||
log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
|
||||
_ = kv.MultiRemove(paths)
|
||||
clearFn(false)
|
||||
return
|
||||
}
|
||||
|
||||
field2PathCh <- field2Path
|
||||
clearFn(true)
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) {
|
||||
if finished := <-finishCh; !finished {
|
||||
func (ibNode *insertBufferNode) bufferAutoFlushPaths(wait <-chan map[UniqueID]string, segID UniqueID) error {
|
||||
field2Path := <-wait
|
||||
if field2Path == nil {
|
||||
return errors.New("Nil field2Path")
|
||||
}
|
||||
|
||||
return ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) completeFlush(segID UniqueID, wait <-chan map[UniqueID]string) {
|
||||
field2Path := <-wait
|
||||
|
||||
if field2Path == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO Call DataService RPC SaveBinlogPaths
|
||||
ibNode.replica.bufferAutoFlushBinlogPaths(segID, field2Path)
|
||||
bufferField2Paths, err := ibNode.replica.getBufferPaths(segID)
|
||||
if err != nil {
|
||||
log.Error("Flush failed ... cannot get buffered paths", zap.Error(err))
|
||||
}
|
||||
|
||||
// GOOSE TODO remove the below
|
||||
log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number of fields", len(field2Path)))
|
||||
err = ibNode.flushMeta.SaveSegmentBinlogMetaTxn(segID, bufferField2Paths)
|
||||
if err != nil {
|
||||
log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -672,7 +693,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bo
|
|||
}
|
||||
|
||||
msgPack.Msgs = append(msgPack.Msgs, msg)
|
||||
err := ibNode.completeFlushStream.Produce(&msgPack)
|
||||
err = ibNode.completeFlushStream.Produce(&msgPack)
|
||||
if err != nil {
|
||||
log.Error(".. Produce complete flush msg failed ..", zap.Error(err))
|
||||
}
|
||||
|
@ -742,8 +763,33 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (
|
|||
return ret.schema, nil
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID) (meta *etcdpb.CollectionMeta, err error) {
|
||||
ret, err := ibNode.replica.getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
meta.ID = ret.collectionID
|
||||
|
||||
coll, err := ibNode.replica.getCollectionByID(ret.collectionID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
meta.Schema = coll.GetSchema()
|
||||
return
|
||||
}
|
||||
|
||||
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
|
||||
seg, err := ibNode.replica.getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
collID = seg.collectionID
|
||||
partitionID = seg.partitionID
|
||||
return
|
||||
}
|
||||
|
||||
func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta,
|
||||
replica Replica, factory msgstream.Factory) *insertBufferNode {
|
||||
replica Replica, factory msgstream.Factory, idAllocator allocatorInterface) *insertBufferNode {
|
||||
maxQueueLength := Params.FlowGraphMaxQueueLength
|
||||
maxParallelism := Params.FlowGraphMaxParallelism
|
||||
|
||||
|
@ -803,5 +849,6 @@ func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta,
|
|||
replica: replica,
|
||||
flushMeta: flushMeta,
|
||||
flushMap: sync.Map{},
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,7 +61,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
|
|||
err = msFactory.SetParams(m)
|
||||
assert.Nil(t, err)
|
||||
|
||||
iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory)
|
||||
iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory, NewAllocatorFactory())
|
||||
inMsg := genInsertMsg()
|
||||
var iMsg flowgraph.Msg = &inMsg
|
||||
iBNode.Operate([]flowgraph.Msg{iMsg})
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"encoding/binary"
|
||||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -111,6 +112,7 @@ func newBinlogMeta() *binlogMeta {
|
|||
|
||||
func clearEtcd(rootPath string) error {
|
||||
etcdAddr := Params.EtcdAddress
|
||||
log.Info("etcd tests address", zap.String("address", etcdAddr))
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -421,6 +423,8 @@ type AllocatorFactory struct {
|
|||
r *rand.Rand
|
||||
}
|
||||
|
||||
var _ allocatorInterface = &AllocatorFactory{}
|
||||
|
||||
func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory {
|
||||
f := &AllocatorFactory{
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
|
@ -435,6 +439,24 @@ func (alloc *AllocatorFactory) allocID() (UniqueID, error) {
|
|||
return alloc.r.Int63n(1000000), nil
|
||||
}
|
||||
|
||||
func (alloc *AllocatorFactory) genKey(isalloc bool, ids ...UniqueID) (key string, err error) {
|
||||
if isalloc {
|
||||
idx, err := alloc.allocID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ids = append(ids, idx)
|
||||
}
|
||||
|
||||
idStr := make([]string, len(ids))
|
||||
for _, id := range ids {
|
||||
idStr = append(idStr, strconv.FormatInt(id, 10))
|
||||
}
|
||||
|
||||
key = path.Join(idStr...)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *MasterServiceFactory) setID(id UniqueID) {
|
||||
m.ID = id // GOOSE TODO: random ID generator
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue