mirror of https://github.com/milvus-io/milvus.git
Update the interface of writenode
Signed-off-by: become-nice <995581097@qq.com>pull/4973/head^2
parent
bb9c906ef6
commit
558a81dcbe
4
go.mod
4
go.mod
|
@ -22,7 +22,7 @@ require (
|
|||
github.com/stretchr/testify v1.6.1
|
||||
github.com/tikv/client-go v0.0.0-20200824032810-95774393107b
|
||||
github.com/tikv/pd v2.1.19+incompatible
|
||||
go.etcd.io/etcd v3.3.25+incompatible
|
||||
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
|
||||
go.uber.org/zap v1.15.0
|
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202
|
||||
google.golang.org/grpc v1.31.1
|
||||
|
@ -30,3 +30,5 @@ require (
|
|||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
sigs.k8s.io/yaml v1.2.0 // indirect
|
||||
)
|
||||
|
||||
replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5
|
||||
|
|
134
writer/writer.go
134
writer/writer.go
|
@ -1,7 +1,6 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/czs007/suvlim/pulsar"
|
||||
|
@ -11,29 +10,11 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
//type PartitionMeta struct {
|
||||
// collectionName string
|
||||
// partitionName string
|
||||
// openSegmentId string
|
||||
// segmentCloseTime uint64
|
||||
// nextSegmentId string
|
||||
// nextSegmentCloseTime uint64
|
||||
//}
|
||||
//
|
||||
//type CollectionMeta struct {
|
||||
// collionName string
|
||||
// partitionMetaMap map[string]*PartitionMeta
|
||||
// deleteTimeSync uint64
|
||||
// insertTimeSync uint64
|
||||
//}
|
||||
|
||||
type WriteNode struct {
|
||||
KvStore *mock.TikvStore
|
||||
mc *pulsar.MessageClient
|
||||
gtInsertMsgBuffer *list.List
|
||||
gtDeleteMsgBuffer *list.List
|
||||
deleteTimeSync uint64
|
||||
insertTimeSync uint64
|
||||
KvStore *mock.TikvStore
|
||||
mc *pulsar.MessageClient
|
||||
deleteTimeSync uint64
|
||||
insertTimeSync uint64
|
||||
}
|
||||
|
||||
func NewWriteNode(ctx context.Context,
|
||||
|
@ -43,12 +24,10 @@ func NewWriteNode(ctx context.Context,
|
|||
kv, err := mock.NewTikvStore()
|
||||
mc := &pulsar.MessageClient{}
|
||||
return &WriteNode{
|
||||
KvStore: kv,
|
||||
mc: mc,
|
||||
gtInsertMsgBuffer: list.New(),
|
||||
gtDeleteMsgBuffer: list.New(),
|
||||
insertTimeSync: timeSync,
|
||||
deleteTimeSync: timeSync,
|
||||
KvStore: kv,
|
||||
mc: mc,
|
||||
insertTimeSync: timeSync,
|
||||
deleteTimeSync: timeSync,
|
||||
}, err
|
||||
}
|
||||
|
||||
|
@ -60,19 +39,13 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM
|
|||
var binaryData [][]byte
|
||||
var timeStamp []uint64
|
||||
|
||||
wn.AddInsertMsgBufferData(&prefixKeys, &suffixKeys, &binaryData, &timeStamp, timeSync)
|
||||
|
||||
for i := 0; i < len(data); i++ {
|
||||
if data[i].Timestamp <= timeSync {
|
||||
prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10)
|
||||
suffixKey = data[i].PartitionTag + "_" + strconv.FormatUint(data[i].SegmentId, 10)
|
||||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
suffixKeys = append(suffixKeys, []byte(suffixKey))
|
||||
binaryData = append(binaryData, data[i].Serialization())
|
||||
timeStamp = append(timeStamp, data[i].Timestamp)
|
||||
} else {
|
||||
wn.gtInsertMsgBuffer.PushBack(data[i])
|
||||
}
|
||||
prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10)
|
||||
suffixKey = data[i].PartitionTag + "_" + strconv.FormatUint(data[i].SegmentId, 10)
|
||||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
suffixKeys = append(suffixKeys, []byte(suffixKey))
|
||||
binaryData = append(binaryData, data[i].Serialization())
|
||||
timeStamp = append(timeStamp, data[i].Timestamp)
|
||||
}
|
||||
|
||||
(*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData)
|
||||
|
@ -86,16 +59,10 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM
|
|||
var prefixKeys [][]byte
|
||||
var timeStamps []uint64
|
||||
|
||||
wn.AddDeleteMsgBufferData(&prefixKeys, &timeStamps, timeSync)
|
||||
|
||||
for i := 0; i < len(data); i++ {
|
||||
if data[i].Timestamp <= timeSync {
|
||||
prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) + "_"
|
||||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
timeStamps = append(timeStamps, data[i].Timestamp)
|
||||
} else {
|
||||
wn.gtDeleteMsgBuffer.PushBack(data[i])
|
||||
}
|
||||
prefixKey = data[i].CollectionName + "_" + strconv.FormatInt(data[i].EntityId, 10) + "_"
|
||||
prefixKeys = append(prefixKeys, []byte(prefixKey))
|
||||
timeStamps = append(timeStamps, data[i].Timestamp)
|
||||
}
|
||||
|
||||
err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps)
|
||||
|
@ -115,66 +82,9 @@ func (wn *WriteNode) UpdateDeleteTimeSync(timeSync uint64) {
|
|||
wn.deleteTimeSync = timeSync
|
||||
}
|
||||
|
||||
func (wn *WriteNode) AddInsertMsgBufferData(
|
||||
prefixKeys *[][]byte,
|
||||
suffixKeys *[][]byte,
|
||||
data *[][]byte,
|
||||
timeStamp *[]uint64,
|
||||
timeSync uint64) {
|
||||
var prefixKey string
|
||||
var suffixKey string
|
||||
var selectElement []*list.Element
|
||||
for e := wn.gtInsertMsgBuffer.Front(); e != nil; e = e.Next() {
|
||||
collectionName := e.Value.(*schema.InsertMsg).CollectionName
|
||||
partitionTag := e.Value.(*schema.InsertMsg).PartitionTag
|
||||
segmentId := e.Value.(*schema.InsertMsg).SegmentId
|
||||
if e.Value.(*schema.InsertMsg).Timestamp <= timeSync {
|
||||
prefixKey = collectionName + "_" + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10)
|
||||
suffixKey = partitionTag + "_" + strconv.FormatUint(segmentId, 10)
|
||||
*prefixKeys = append(*prefixKeys, []byte(prefixKey))
|
||||
*suffixKeys = append(*suffixKeys, []byte(suffixKey))
|
||||
*data = append(*data, e.Value.(*schema.InsertMsg).Serialization())
|
||||
*timeStamp = append(*timeStamp, e.Value.(*schema.InsertMsg).Timestamp)
|
||||
selectElement = append(selectElement, e)
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(selectElement); i++ {
|
||||
wn.gtInsertMsgBuffer.Remove(selectElement[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (wn *WriteNode) AddDeleteMsgBufferData(prefixKeys *[][]byte,
|
||||
timeStamps *[]uint64,
|
||||
timeSync uint64) {
|
||||
var prefixKey string
|
||||
var selectElement []*list.Element
|
||||
for e := wn.gtDeleteMsgBuffer.Front(); e != nil; e = e.Next() {
|
||||
collectionName := e.Value.(*schema.InsertMsg).CollectionName
|
||||
if e.Value.(*schema.DeleteMsg).Timestamp <= timeSync {
|
||||
prefixKey = collectionName + "_" + strconv.FormatInt(e.Value.(*schema.InsertMsg).EntityId, 10) + "_"
|
||||
*prefixKeys = append(*prefixKeys, []byte(prefixKey))
|
||||
*timeStamps = append(*timeStamps, e.Value.(*schema.DeleteMsg).Timestamp)
|
||||
selectElement = append(selectElement, e)
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(selectElement); i++ {
|
||||
wn.gtDeleteMsgBuffer.Remove(selectElement[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (wn *WriteNode) GetInsertBuffer() *list.List {
|
||||
return wn.gtInsertMsgBuffer
|
||||
}
|
||||
|
||||
func (wn *WriteNode) GetDeleteBuffer() *list.List {
|
||||
return wn.gtDeleteMsgBuffer
|
||||
}
|
||||
|
||||
func (wn *WriteNode) doWriteNode(ctx context.Context, wg sync.WaitGroup) {
|
||||
//deleteTimeSync := make(map[string]uint64)
|
||||
//insertTimeSync := make(map[string]uint64)
|
||||
//wg.Add(2)
|
||||
//go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg)
|
||||
//go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg)
|
||||
//wg.Wait()
|
||||
func (wn *WriteNode) doWriteNode(ctx context.Context, deleteTimeSync uint64, insertTimeSync uint64, wg sync.WaitGroup) {
|
||||
wg.Add(2)
|
||||
go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg)
|
||||
go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg)
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue