Refactor query node deletion and insertion behavior

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-09-09 10:28:16 +08:00 committed by yefu.chen
parent 6b8c82dede
commit 5dd9a82434
10 changed files with 317 additions and 245 deletions

View File

@ -9,10 +9,15 @@ using idx_t = int64_t;
namespace milvus {
namespace dog_segment {
using engine::DataChunk;
using engine::DataChunkPtr;
using engine::QueryResult;
using DogDataChunkPtr = std::shared_ptr<DataChunk>;
int
TestABI();
class SegmentBase {
public:
// definitions
@ -25,17 +30,19 @@ class SegmentBase {
public:
virtual ~SegmentBase() = default;
// SegmentBase(std::shared_ptr<FieldsInfo> collection);
// single threaded
virtual Status
Insert(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values, std::pair<Timestamp, Timestamp> timestamp_range) = 0;
// TODO: add id into delete log, possibly bitmap
// single threaded
virtual int64_t PreInsert(int64_t size) = 0;
virtual Status
Delete(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, std::pair<Timestamp, Timestamp> timestamp_range) = 0;
Insert(int64_t reserved_offset, int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0;
virtual int64_t PreDelete(int64_t size) = 0;
// TODO: add id into delete log, possibly bitmap
virtual Status
Delete(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) = 0;
// query contains metadata of
// multi-threaded
virtual Status
Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) = 0;
@ -44,7 +51,6 @@ class SegmentBase {
// GetEntityByIds(Timestamp timestamp, const std::vector<Id>& ids, DataChunkPtr& results) = 0;
// stop receive insert requests
// single threaded
virtual Status
Close() = 0;
@ -53,15 +59,8 @@ class SegmentBase {
// virtual Status
// Flush(Timestamp timestamp) = 0;
// BuildIndex With Paramaters, must with Frozen State
// This function is atomic
// NOTE: index_params contains serveral policies for several index
virtual Status
BuildIndex(std::shared_ptr<IndexConfig> index_params) = 0;
// Remove Index
virtual Status
DropIndex(std::string_view field_name) = 0;
// watch changes
// NOTE: Segment will use this ptr as correct
virtual Status
DropRawData(std::string_view field_name) = 0;
@ -69,6 +68,9 @@ class SegmentBase {
virtual Status
LoadRawData(std::string_view field_name, const char* blob, int64_t blob_size) = 0;
virtual Status
BuildIndex() = 0;
public:
virtual ssize_t
get_row_count() const = 0;
@ -78,12 +80,12 @@ class SegmentBase {
virtual ssize_t
get_deleted_count() const = 0;
};
using SegmentBasePtr = std::unique_ptr<SegmentBase>;
SegmentBasePtr CreateSegment(SchemaPtr& ptr);
SegmentBasePtr
CreateSegment(SchemaPtr schema, IndexMetaPtr index_meta);
} // namespace engine
} // namespace milvus

View File

@ -14,25 +14,35 @@ package reader
import "C"
import (
"errors"
"fmt"
msgPb "github.com/czs007/suvlim/pkg/message"
"github.com/czs007/suvlim/reader/message_client"
"sort"
"strconv"
"sync"
"time"
)
type InsertData struct {
insertIDs map[int64][]int64
insertTimestamps map[int64][]uint64
insertRecords map[int64][][]byte
insertOffset map[int64]int64
}
type DeleteData struct {
deleteIDs map[int64][]int64
deleteTimestamps map[int64][]uint64
deleteOffset map[int64]int64
}
type DeleteRecord struct {
entityID int64
timestamp uint64
segmentID int64
}
type DeleteRecords struct {
deleteRecords *[]DeleteRecord
count chan int
type DeletePreprocessData struct {
deleteRecords []*DeleteRecord
count chan int
}
type QueryNodeDataBuffer struct {
@ -43,13 +53,15 @@ type QueryNodeDataBuffer struct {
}
type QueryNode struct {
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
messageClient message_client.MessageClient
queryNodeTimeSync *QueryNodeTime
deleteRecordsMap map[TimeRange]DeleteRecords
buffer QueryNodeDataBuffer
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
messageClient message_client.MessageClient
queryNodeTimeSync *QueryNodeTime
buffer QueryNodeDataBuffer
deletePreprocessData DeletePreprocessData
deleteData DeleteData
insertData InsertData
}
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
@ -71,7 +83,6 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
SegmentsMap: segmentsMap,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
deleteRecordsMap: make(map[TimeRange]DeleteRecords),
}
}
@ -95,19 +106,6 @@ func (node *QueryNode) DeleteCollection(collection *Collection) {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) doQueryNode(wg *sync.WaitGroup) {
wg.Add(3)
// Do insert and delete messages sort, do insert
go node.InsertAndDelete(node.messageClient.InsertOrDeleteMsg, wg)
// Do delete messages sort
go node.searchDeleteInMap()
// Do delete
go node.Delete()
// Do search
go node.Search(node.messageClient.SearchMsg, wg)
wg.Wait()
}
func (node *QueryNode) PrepareBatchMsg() {
node.messageClient.PrepareBatchMsg()
}
@ -119,60 +117,6 @@ func (node *QueryNode) StartMessageClient() {
go node.messageClient.ReceiveMessage()
}
// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs
func (node *QueryNode) GetKey2Segments() ([]int64, []uint64, []int64) {
// TODO: get id2segment info from pulsar
return nil, nil, nil
}
func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) {
var targetPartition *Partition
for _, collection := range node.Collections {
if *collectionName == collection.CollectionName {
for _, partition := range collection.Partitions {
if *partitionTag == partition.PartitionName {
targetPartition = partition
break
}
}
}
}
if targetPartition == nil {
return nil, errors.New("cannot found target partition")
}
for _, segment := range targetPartition.OpenedSegments {
// TODO: add other conditions
return segment, nil
}
return nil, errors.New("cannot found target segment")
}
func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Collection, error) {
for _, collection := range node.Collections {
if collection.CollectionName == collectionName {
return collection, nil
}
}
return nil, errors.New("Cannot found collection: " + collectionName)
}
func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) {
targetSegment := node.SegmentsMap[segmentID]
if targetSegment == nil {
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
}
return targetSegment, nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) InitQueryNodeCollection() {
// TODO: remove hard code, add collection creation request
// TODO: error handle
@ -182,70 +126,47 @@ func (node *QueryNode) InitQueryNodeCollection() {
var _ = newPartition.NewSegment(0)
}
func (node *QueryNode) SegmentsManagement() {
node.queryNodeTimeSync.UpdateTSOTimeSync()
var timeNow = node.queryNodeTimeSync.TSOTimeSync
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// start new segment and add it into partition.OpenedSegments
// TODO: get segmentID from master
var segmentID int64 = 0
var newSegment = partition.NewSegment(segmentID)
newSegment.SegmentCloseTime = timeNow + SegmentLifetime
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[segmentID] = newSegment
////////////////////////////////////////////////////////////////////////////////////////////////////
// close old segment and move it into partition.ClosedSegments
// TODO: check status
var _ = oldSegment.Close()
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
}
}
}
}
}
func (node *QueryNode) SegmentService() {
func (node *QueryNode) RunInsertDelete() {
for {
time.Sleep(200 * time.Millisecond)
node.SegmentsManagement()
fmt.Println("do segments management in 200ms")
// TODO: get timeRange from message client
var timeRange = TimeRange{0, 0}
node.PrepareBatchMsg()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
node.WriterDelete()
node.PreInsertAndDelete()
node.DoInsertAndDelete()
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// TODO: receive delete messages individually
func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, wg *sync.WaitGroup) msgPb.Status {
node.queryNodeTimeSync.UpdateReadTimeSync()
func (node *QueryNode) RunSearch() {
for {
node.Search(node.messageClient.SearchMsg)
}
}
var tMin = node.queryNodeTimeSync.ReadTimeSyncMin
var tMax = node.queryNodeTimeSync.ReadTimeSyncMax
var readTimeSyncRange = TimeRange{timestampMin: tMin, timestampMax: tMax}
////////////////////////////////////////////////////////////////////////////////////////////////////
var clientId = insertDeleteMessages[0].ClientId
var insertIDs = make(map[int64][]int64)
var insertTimestamps = make(map[int64][]uint64)
var insertRecords = make(map[int64][][]byte)
func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOrDeleteMsg, timeRange TimeRange) msgPb.Status {
var tMax = timeRange.timestampMax
// 1. Extract messages before readTimeSync from QueryNodeDataBuffer.
// Set valid bitmap to false.
for i, msg := range node.buffer.InsertDeleteBuffer {
if msg.Timestamp <= tMax {
if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid)
insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp)
insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob)
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord {
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
*node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r)
node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
node.deletePreprocessData.count <- <- node.deletePreprocessData.count + 1
}
node.buffer.validInsertDeleteBuffer[i] = false
}
@ -264,18 +185,18 @@ func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDel
// Move massages after readTimeSync to QueryNodeDataBuffer.
// Set valid bitmap to true.
for _, msg := range insertDeleteMessages {
if msg.Timestamp <= tMax {
if msg.Timestamp < tMax {
if msg.Op == msgPb.OpType_INSERT {
insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid)
insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp)
insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob)
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord {
entityID: msg.Uid,
timestamp: msg.Timestamp,
}
*node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r)
node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1
node.deletePreprocessData.deleteRecords = append(node.deletePreprocessData.deleteRecords, &r)
node.deletePreprocessData.count <- <- node.deletePreprocessData.count + 1
}
} else {
node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg)
@ -283,81 +204,118 @@ func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*msgPb.InsertOrDel
}
}
// 4. Do insert
// TODO: multi-thread insert
for segmentID, records := range insertRecords {
return msgPb.Status{ErrorCode: 0}
}
func (node *QueryNode) WriterDelete() msgPb.Status {
// TODO: set timeout
for {
var ids, timestamps, segmentIDs = node.GetKey2Segments()
for i := 0; i <= len(ids); i++ {
id := ids[i]
timestamp := timestamps[i]
segmentID := segmentIDs[i]
for _, r := range node.deletePreprocessData.deleteRecords {
if r.timestamp == timestamp && r.entityID == id {
r.segmentID = segmentID
node.deletePreprocessData.count <- <- node.deletePreprocessData.count - 1
}
}
}
if <- node.deletePreprocessData.count == 0 {
return msgPb.Status{ErrorCode: 0}
}
}
}
func (node *QueryNode) PreInsertAndDelete() msgPb.Status {
// 1. Do PreInsert
for segmentID := range node.insertData.insertRecords {
var targetSegment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
ids := insertIDs[segmentID]
timestamps := insertTimestamps[segmentID]
err = targetSegment.SegmentInsert(&ids, &timestamps, &records, tMin, tMax)
var numOfRecords = len(node.insertData.insertRecords[segmentID])
var offset = targetSegment.SegmentPreInsert(numOfRecords)
node.insertData.insertOffset[segmentID] = offset
}
// 2. Sort delete preprocess data by segment id
for _, r := range node.deletePreprocessData.deleteRecords {
node.deleteData.deleteIDs[r.segmentID] = append(node.deleteData.deleteIDs[r.segmentID], r.entityID)
node.deleteData.deleteTimestamps[r.segmentID] = append(node.deleteData.deleteTimestamps[r.segmentID], r.timestamp)
}
// 3. Do PreDelete
for segmentID := range node.deleteData.deleteIDs {
var targetSegment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
}
wg.Done()
return publishResult(nil, clientId)
}
func (node *QueryNode) searchDeleteInMap() {
var ids, timestamps, segmentIDs = node.GetKey2Segments()
for i := 0; i <= len(ids); i++ {
id := ids[i]
timestamp := timestamps[i]
segmentID := segmentIDs[i]
for timeRange, records := range node.deleteRecordsMap {
if timestamp < timeRange.timestampMax && timestamp > timeRange.timestampMin {
for _, r := range *records.deleteRecords {
if r.timestamp == timestamp && r.entityID == id {
r.segmentID = segmentID
records.count <- <- records.count - 1
}
}
}
}
}
}
func (node *QueryNode) Delete() msgPb.Status {
type DeleteData struct {
ids *[]int64
timestamp *[]uint64
}
for timeRange, records := range node.deleteRecordsMap {
// TODO: multi-thread delete
if <- records.count == 0 {
// 1. Sort delete records by segment id
segment2records := make(map[int64]DeleteData)
for _, r := range *records.deleteRecords {
*segment2records[r.segmentID].ids = append(*segment2records[r.segmentID].ids, r.entityID)
*segment2records[r.segmentID].timestamp = append(*segment2records[r.segmentID].timestamp, r.timestamp)
}
// 2. Do batched delete
for segmentID, deleteData := range segment2records {
var segment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
err = segment.SegmentDelete(deleteData.ids, deleteData.timestamp, timeRange.timestampMin, timeRange.timestampMax)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
}
}
var numOfRecords = len(node.deleteData.deleteIDs[segmentID])
var offset = targetSegment.SegmentPreDelete(numOfRecords)
node.deleteData.deleteOffset[segmentID] = offset
}
return msgPb.Status{ErrorCode: 0}
}
func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGroup) msgPb.Status {
func (node *QueryNode) DoInsertAndDelete() msgPb.Status {
var wg sync.WaitGroup
// Do insert
for segmentID, records := range node.insertData.insertRecords {
wg.Add(1)
go node.DoInsert(segmentID, &records, &wg)
}
// Do delete
for segmentID, deleteIDs := range node.deleteData.deleteIDs {
wg.Add(1)
var deleteTimestamps = node.deleteData.deleteTimestamps[segmentID]
go node.DoDelete(segmentID, &deleteIDs, &deleteTimestamps, &wg)
}
wg.Wait()
return msgPb.Status{ErrorCode: 0}
}
func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.WaitGroup) msgPb.Status {
var targetSegment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
ids := node.insertData.insertIDs[segmentID]
timestamps := node.insertData.insertTimestamps[segmentID]
err = targetSegment.SegmentInsert(&ids, &timestamps, records)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
wg.Done()
return msgPb.Status{ErrorCode: 0}
}
func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimestamps *[]uint64, wg *sync.WaitGroup) msgPb.Status {
var segment, err = node.GetSegmentBySegmentID(segmentID)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
err = segment.SegmentDelete(deleteIDs, deleteTimestamps)
if err != nil {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
wg.Done()
return msgPb.Status{ErrorCode: 0}
}
func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
var clientId = searchMessages[0].ClientId
type SearchResultTmp struct {
@ -379,9 +337,16 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGr
// TODO: get top-k's k from queryString
const TopK = 1
// 1. Do search in all segments
var timestamp = msg.Timestamp
var vector = msg.Records
// 1. Timestamp check
// TODO: return or wait? Or adding graceful time
if timestamp > node.queryNodeTimeSync.SearchTimeSync {
return msgPb.Status{ErrorCode: 1}
}
// 2. Do search in all segments
for _, partition := range targetCollection.Partitions {
for _, openSegment := range partition.OpenedSegments {
var res, err = openSegment.SegmentSearch("", timestamp, vector)
@ -420,6 +385,5 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg, wg *sync.WaitGr
publishSearchResult(&results, clientId)
}
wg.Done()
return msgPb.Status{ErrorCode: 0}
}

View File

@ -24,9 +24,8 @@ func (t *QueryNodeTime) UpdateWriteTimeSync() {
t.WriteTimeSync = 0
}
func (t *QueryNodeTime) UpdateSearchTimeSync() {
// TODO: Add time sync
t.SearchTimeSync = 0
func (t *QueryNodeTime) UpdateSearchTimeSync(timeRange TimeRange) {
t.SearchTimeSync = timeRange.timestampMax
}
func (t *QueryNodeTime) UpdateTSOTimeSync() {

View File

@ -1,22 +1,11 @@
package reader
import (
"fmt"
"sync"
"time"
)
func startQueryNode() {
qn := NewQueryNode(0, 0)
qn.InitQueryNodeCollection()
go qn.SegmentService()
qn.StartMessageClient()
var wg sync.WaitGroup
for {
time.Sleep(200 * time.Millisecond)
qn.PrepareBatchMsg()
qn.doQueryNode(&wg)
fmt.Println("do a batch in 200ms")
}
go qn.RunInsertDelete()
go qn.RunSearch()
}

View File

@ -76,7 +76,19 @@ func (s *Segment) Close() error {
}
////////////////////////////////////////////////////////////////////////////
func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte, timestampMin uint64, timestampMax uint64) error {
func (s *Segment) SegmentPreInsert(numOfRecords int) int64 {
var offset = C.PreInsert(numOfRecords)
return offset
}
func (s *Segment) SegmentPreDelete(numOfRecords int) int64 {
var offset = C.PreDelete(numOfRecords)
return offset
}
func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte) error {
/*C.Insert
int
Insert(CSegmentBase c_segment,
@ -121,7 +133,7 @@ func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, record
return nil
}
func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64, timestampMin uint64, timestampMax uint64) error {
func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error {
/*C.Delete
int
Delete(CSegmentBase c_segment,

View File

@ -0,0 +1,40 @@
package reader
import (
"fmt"
"time"
)
func (node *QueryNode) SegmentsManagement() {
node.queryNodeTimeSync.UpdateTSOTimeSync()
var timeNow = node.queryNodeTimeSync.TSOTimeSync
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// start new segment and add it into partition.OpenedSegments
// TODO: get segmentID from master
var segmentID int64 = 0
var newSegment = partition.NewSegment(segmentID)
newSegment.SegmentCloseTime = timeNow + SegmentLifetime
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[segmentID] = newSegment
// close old segment and move it into partition.ClosedSegments
// TODO: check status
var _ = oldSegment.Close()
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
}
}
}
}
}
func (node *QueryNode) SegmentService() {
for {
time.Sleep(200 * time.Millisecond)
node.SegmentsManagement()
fmt.Println("do segments management in 200ms")
}
}

View File

@ -26,7 +26,7 @@ func TestSegmentInsert(t *testing.T) {
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
var err = segment.SegmentInsert(&ids, &timestamps, nil, 0, 0)
var err = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, err)
partition.DeleteSegment(segment)
@ -43,7 +43,7 @@ func TestSegmentDelete(t *testing.T) {
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
var err = segment.SegmentDelete(&ids, &timestamps, 0, 0)
var err = segment.SegmentDelete(&ids, &timestamps)
assert.NoError(t, err)
partition.DeleteSegment(segment)
@ -60,7 +60,7 @@ func TestSegmentSearch(t *testing.T) {
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
var insertErr = segment.SegmentInsert(&ids, &timestamps, nil, 0, 0)
var insertErr = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, insertErr)
var searchRes, searchErr = segment.SegmentSearch("fake query string", timestamps[0], nil)
@ -109,7 +109,7 @@ func TestSegment_GetRowCount(t *testing.T) {
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
var err = segment.SegmentInsert(&ids, &timestamps, nil, 0, 0)
var err = segment.SegmentInsert(&ids, &timestamps, nil)
assert.NoError(t, err)
var rowCount = segment.GetRowCount()
@ -129,7 +129,7 @@ func TestSegment_GetDeletedCount(t *testing.T) {
ids :=[] int64{1, 2, 3}
timestamps :=[] uint64 {0, 0, 0}
var err = segment.SegmentDelete(&ids, &timestamps, 0, 0)
var err = segment.SegmentDelete(&ids, &timestamps)
assert.NoError(t, err)
var deletedCount = segment.GetDeletedCount()

58
reader/util_functions.go Normal file
View File

@ -0,0 +1,58 @@
package reader
import (
"errors"
"strconv"
)
// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs
func (node *QueryNode) GetKey2Segments() ([]int64, []uint64, []int64) {
// TODO: get id2segment info from pulsar
return nil, nil, nil
}
func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) {
var targetPartition *Partition
for _, collection := range node.Collections {
if *collectionName == collection.CollectionName {
for _, partition := range collection.Partitions {
if *partitionTag == partition.PartitionName {
targetPartition = partition
break
}
}
}
}
if targetPartition == nil {
return nil, errors.New("cannot found target partition")
}
for _, segment := range targetPartition.OpenedSegments {
// TODO: add other conditions
return segment, nil
}
return nil, errors.New("cannot found target segment")
}
func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Collection, error) {
for _, collection := range node.Collections {
if collection.CollectionName == collectionName {
return collection, nil
}
}
return nil, errors.New("Cannot found collection: " + collectionName)
}
func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) {
targetSegment := node.SegmentsMap[segmentID]
if targetSegment == nil {
return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10))
}
return targetSegment, nil
}

View File

@ -17,6 +17,7 @@ type ReaderTimeSync interface {
Close()
TimeSync() <-chan TimeSyncMsg
InsertOrDelete() <-chan *pb.InsertOrDeleteMsg
IsInsertDeleteChanFull() bool
}
type TimeSyncMsg struct {
@ -45,9 +46,9 @@ type readerTimeSyncCfg struct {
cancel context.CancelFunc
}
func toTimeStamp(ts *pb.TimeSyncMsg) int {
func toMillisecond(ts *pb.TimeSyncMsg) int {
// get Millisecond in second
return int(ts.GetTimestamp()>>18) % 1000
return int(ts.GetTimestamp() >> 18)
}
func NewReaderTimeSync(
@ -86,7 +87,7 @@ func NewReaderTimeSync(
}
//set default value
if r.readerQueueSize == 0 {
r.readerQueueSize = 128
r.readerQueueSize = 1024
}
r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize)
@ -158,13 +159,17 @@ func (r *readerTimeSyncCfg) TimeSync() <-chan TimeSyncMsg {
return r.timesyncMsgChan
}
func (r *readerTimeSyncCfg) IsInsertDeleteChanFull() bool {
return len(r.insertOrDeleteChan) == len(r.readerProducer)*r.readerQueueSize
}
func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg {
if len(r.proxyIdList) > 1 {
if len(ts) > 1 {
for i := 1; i < len(r.proxyIdList); i++ {
curIdx := len(ts) - 1 - i
preIdx := len(ts) - i
timeGap := toTimeStamp(ts[curIdx]) - toTimeStamp(ts[preIdx])
timeGap := toMillisecond(ts[curIdx]) - toMillisecond(ts[preIdx])
if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) {
ts = ts[preIdx:]
return ts
@ -274,6 +279,9 @@ func (r *readerTimeSyncCfg) startReadTopics() {
r.revTimesyncFromReader[imsg.Timestamp] = gval
}
} else {
if r.IsInsertDeleteChanFull() {
log.Printf("WARN : Insert or delete chan is full ...")
}
tsm.NumRecorders++
r.insertOrDeleteChan <- &imsg
}

View File

@ -320,7 +320,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64
t.Fatalf("send msg error %v", err)
}
log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp)
//log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp)
if i%20 == 0 {
tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18}
@ -331,7 +331,7 @@ func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64
if _, err := pt.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil {
t.Fatalf("send msg error %v", err)
}
log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp)
//log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp)
}
}
}