mirror of https://github.com/milvus-io/milvus.git
Add the interface about kv store of writenode
Signed-off-by: become-nice <995581097@qq.com>pull/4973/head^2
parent
29d6a3ad0a
commit
8e323494a5
|
@ -10,11 +10,16 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
type SegmentIdInfo struct {
|
||||
CollectionName string
|
||||
EntityId int64
|
||||
SegmentIds *[]string
|
||||
}
|
||||
|
||||
type WriteNode struct {
|
||||
KvStore *mock.TikvStore
|
||||
mc *pulsar.MessageClient
|
||||
deleteTimeSync uint64
|
||||
insertTimeSync uint64
|
||||
timeSync uint64
|
||||
}
|
||||
|
||||
func NewWriteNode(ctx context.Context,
|
||||
|
@ -26,8 +31,7 @@ func NewWriteNode(ctx context.Context,
|
|||
return &WriteNode{
|
||||
KvStore: kv,
|
||||
mc: mc,
|
||||
insertTimeSync: timeSync,
|
||||
deleteTimeSync: timeSync,
|
||||
timeSync: timeSync,
|
||||
}, err
|
||||
}
|
||||
|
||||
|
@ -48,13 +52,17 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM
|
|||
timeStamp = append(timeStamp, data[i].Timestamp)
|
||||
}
|
||||
|
||||
(*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData)
|
||||
wn.UpdateInsertTimeSync(timeSync)
|
||||
error := (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData)
|
||||
if error != nil {
|
||||
fmt.Println("Can't insert data!")
|
||||
return error
|
||||
}
|
||||
wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64, wg sync.WaitGroup) error {
|
||||
var segmentInfos []*SegmentIdInfo
|
||||
var prefixKey string
|
||||
var prefixKeys [][]byte
|
||||
var timeStamps []uint64
|
||||
|
@ -64,27 +72,29 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM
|
|||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
timeStamps = append(timeStamps, data[i].Timestamp)
|
||||
}
|
||||
|
||||
segmentIds := (*wn.KvStore).GetSegment(ctx, prefixKeys)
|
||||
for i := 0; i < len(prefixKeys); i++ {
|
||||
segmentInfos = append(segmentInfos, &SegmentIdInfo{
|
||||
CollectionName: data[i].CollectionName,
|
||||
EntityId: data[i].EntityId,
|
||||
SegmentIds: segmentIds,
|
||||
})
|
||||
}
|
||||
err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps)
|
||||
if err != nil {
|
||||
fmt.Println("Can't insert data")
|
||||
fmt.Println("Can't delete data")
|
||||
}
|
||||
wn.UpdateDeleteTimeSync(timeSync)
|
||||
wg.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wn *WriteNode) UpdateInsertTimeSync(timeSync uint64) {
|
||||
wn.insertTimeSync = timeSync
|
||||
func (wn *WriteNode) UpdateTimeSync(timeSync uint64) {
|
||||
wn.timeSync = timeSync
|
||||
}
|
||||
|
||||
func (wn *WriteNode) UpdateDeleteTimeSync(timeSync uint64) {
|
||||
wn.deleteTimeSync = timeSync
|
||||
}
|
||||
|
||||
func (wn *WriteNode) doWriteNode(ctx context.Context, deleteTimeSync uint64, insertTimeSync uint64, wg sync.WaitGroup) {
|
||||
func (wn *WriteNode) doWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) {
|
||||
wg.Add(2)
|
||||
go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg)
|
||||
go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg)
|
||||
go wn.InsertBatchData(ctx, wn.mc.InsertMsg, timeSync, wg)
|
||||
go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, timeSync, wg)
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue