From 8e323494a51c10964ba2aa138ddcf988f5b04572 Mon Sep 17 00:00:00 2001 From: become-nice <995581097@qq.com> Date: Wed, 2 Sep 2020 10:36:39 +0800 Subject: [PATCH] Add the interface about kv store of writenode Signed-off-by: become-nice <995581097@qq.com> --- writer/writer.go | 54 ++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/writer/writer.go b/writer/writer.go index d77c404115..56d439df4b 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -10,11 +10,16 @@ import ( "sync" ) +type SegmentIdInfo struct { + CollectionName string + EntityId int64 + SegmentIds *[]string +} + type WriteNode struct { - KvStore *mock.TikvStore - mc *pulsar.MessageClient - deleteTimeSync uint64 - insertTimeSync uint64 + KvStore *mock.TikvStore + mc *pulsar.MessageClient + timeSync uint64 } func NewWriteNode(ctx context.Context, @@ -24,10 +29,9 @@ func NewWriteNode(ctx context.Context, kv, err := mock.NewTikvStore() mc := &pulsar.MessageClient{} return &WriteNode{ - KvStore: kv, - mc: mc, - insertTimeSync: timeSync, - deleteTimeSync: timeSync, + KvStore: kv, + mc: mc, + timeSync: timeSync, }, err } @@ -48,13 +52,17 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*schema.InsertM timeStamp = append(timeStamp, data[i].Timestamp) } - (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData) - wn.UpdateInsertTimeSync(timeSync) + error := (*wn.KvStore).PutRows(ctx, prefixKeys, timeStamp, suffixKeys, binaryData) + if error != nil { + fmt.Println("Can't insert data!") + return error + } wg.Done() return nil } func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteMsg, timeSync uint64, wg sync.WaitGroup) error { + var segmentInfos []*SegmentIdInfo var prefixKey string var prefixKeys [][]byte var timeStamps []uint64 @@ -64,27 +72,29 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*schema.DeleteM prefixKeys = append(prefixKeys, []byte(prefixKey)) timeStamps = append(timeStamps, data[i].Timestamp) } - + segmentIds := (*wn.KvStore).GetSegment(ctx, prefixKeys) + for i := 0; i < len(prefixKeys); i++ { + segmentInfos = append(segmentInfos, &SegmentIdInfo{ + CollectionName: data[i].CollectionName, + EntityId: data[i].EntityId, + SegmentIds: segmentIds, + }) + } err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps) if err != nil { - fmt.Println("Can't insert data") + fmt.Println("Can't delete data") } - wn.UpdateDeleteTimeSync(timeSync) wg.Done() return nil } -func (wn *WriteNode) UpdateInsertTimeSync(timeSync uint64) { - wn.insertTimeSync = timeSync +func (wn *WriteNode) UpdateTimeSync(timeSync uint64) { + wn.timeSync = timeSync } -func (wn *WriteNode) UpdateDeleteTimeSync(timeSync uint64) { - wn.deleteTimeSync = timeSync -} - -func (wn *WriteNode) doWriteNode(ctx context.Context, deleteTimeSync uint64, insertTimeSync uint64, wg sync.WaitGroup) { +func (wn *WriteNode) doWriteNode(ctx context.Context, timeSync uint64, wg sync.WaitGroup) { wg.Add(2) - go wn.InsertBatchData(ctx, wn.mc.InsertMsg, insertTimeSync, wg) - go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, deleteTimeSync, wg) + go wn.InsertBatchData(ctx, wn.mc.InsertMsg, timeSync, wg) + go wn.DeleteBatchData(ctx, wn.mc.DeleteMsg, timeSync, wg) wg.Wait() }