From b948a3438ecdeaf63a575dc0eba2ba3647c10b27 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 7 Sep 2020 17:01:46 +0800 Subject: [PATCH] Add api about insertion, deletion and search Signed-off-by: bigsheeper --- core/src/dog_segment/SegmentBase.h | 35 +-- reader/index.go | 6 +- reader/query_node.go | 401 ++++++++++++++++++----------- reader/query_node_time.go | 35 +++ reader/result.go | 14 +- reader/segment.go | 48 ++-- 6 files changed, 321 insertions(+), 218 deletions(-) create mode 100644 reader/query_node_time.go diff --git a/core/src/dog_segment/SegmentBase.h b/core/src/dog_segment/SegmentBase.h index 8bbdc71684..c15cdf1a36 100644 --- a/core/src/dog_segment/SegmentBase.h +++ b/core/src/dog_segment/SegmentBase.h @@ -5,6 +5,7 @@ #include "dog_segment/SegmentDefs.h" // #include "knowhere/index/Index.h" #include "query/GeneralQuery.h" +using idx_t = int64_t; namespace milvus { namespace dog_segment { @@ -24,15 +25,17 @@ class SegmentBase { public: virtual ~SegmentBase() = default; // SegmentBase(std::shared_ptr collection); - + // single threaded virtual Status - Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values) = 0; + Insert(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, const DogDataChunk& values, std::pair timestamp_range) = 0; // TODO: add id into delete log, possibly bitmap + // single threaded virtual Status - Delete(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps) = 0; + Delete(int64_t size, const idx_t* primary_keys, const Timestamp* timestamps, std::pair timestamp_range) = 0; // query contains metadata of + // multi-threaded virtual Status Query(const query::QueryPtr& query, Timestamp timestamp, QueryResult& results) = 0; @@ -41,6 +44,7 @@ class SegmentBase { // GetEntityByIds(Timestamp timestamp, const std::vector& ids, DataChunkPtr& results) = 0; // stop receive insert requests + // single threaded virtual Status Close() = 0; @@ -75,31 +79,6 @@ class SegmentBase { virtual ssize_t get_deleted_count() const = 0; - public: - // getter and setter - Timestamp get_time_begin() { - return time_begin_; - } - void set_time_begin(Timestamp time_begin) { - this->time_begin_ = time_begin; - } - Timestamp get_time_end() { - return time_end_; - } - void set_time_end(Timestamp time_end) { - this->time_end_ = time_end; - } - uint64_t get_segment_id() { - return segment_id_; - } - void set_segment_id(uint64_t segment_id) { - this->segment_id_ = segment_id; - } - - private: - Timestamp time_begin_; - Timestamp time_end_; - uint64_t segment_id_; }; using SegmentBasePtr = std::unique_ptr; diff --git a/reader/index.go b/reader/index.go index 9bc078bb7b..1a125ece22 100644 --- a/reader/index.go +++ b/reader/index.go @@ -1,15 +1,15 @@ package reader import ( - schema2 "github.com/czs007/suvlim/pulsar/client-go/schema" + schema2 "github.com/czs007/suvlim/pulsar/client-go/pb" ) type IndexConfig struct {} func buildIndex(config IndexConfig) schema2.Status { - return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS} + return schema2.Status{ErrorCode: schema2.ErrorCode_SUCCESS} } func dropIndex(fieldName string) schema2.Status { - return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS} + return schema2.Status{ErrorCode: schema2.ErrorCode_SUCCESS} } diff --git a/reader/query_node.go b/reader/query_node.go index 297af962e6..4911b74fa0 100644 --- a/reader/query_node.go +++ b/reader/query_node.go @@ -16,49 +16,61 @@ import "C" import ( "errors" "fmt" - "github.com/czs007/suvlim/pulsar/client-go" - "github.com/czs007/suvlim/pulsar/client-go/schema" + messageClient "github.com/czs007/suvlim/pkg/message" + schema "github.com/czs007/suvlim/pkg/message" + "strconv" "sync" "time" ) -type QueryNodeDataBuffer struct { - InsertBuffer []*schema.InsertMsg - DeleteBuffer []*schema.DeleteMsg - SearchBuffer []*schema.SearchMsg - validInsertBuffer []bool - validDeleteBuffer []bool - validSearchBuffer []bool +type DeleteRecord struct { + entityID int64 + timestamp uint64 + segmentID int64 } -type QueryNodeTimeSync struct { - deleteTimeSync uint64 - insertTimeSync uint64 - searchTimeSync uint64 +type DeleteRecords struct { + deleteRecords *[]DeleteRecord + count chan int +} + +type QueryNodeDataBuffer struct { + InsertDeleteBuffer []*schema.InsertOrDeleteMsg + SearchBuffer []*schema.SearchMsg + validInsertDeleteBuffer []bool + validSearchBuffer []bool } type QueryNode struct { - QueryNodeId uint64 - Collections []*Collection - messageClient client_go.MessageClient - queryNodeTimeSync *QueryNodeTimeSync - buffer QueryNodeDataBuffer + QueryNodeId uint64 + Collections []*Collection + SegmentsMap map[int64]*Segment + messageClient client_go.MessageClient + queryNodeTimeSync *QueryNodeTime + deleteRecordsMap map[TimeRange]DeleteRecords + buffer QueryNodeDataBuffer } func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { - mc := client_go.MessageClient{} + mc := messageClient.MessageClient{} - queryNodeTimeSync := &QueryNodeTimeSync { - deleteTimeSync: timeSync, - insertTimeSync: timeSync, - searchTimeSync: timeSync, + queryNodeTimeSync := &QueryNodeTime{ + ReadTimeSyncMin: timeSync, + ReadTimeSyncMax: timeSync, + WriteTimeSync: timeSync, + SearchTimeSync: timeSync, + TSOTimeSync: timeSync, } + segmentsMap := make(map[int64]*Segment) + return &QueryNode{ - QueryNodeId: queryNodeId, - Collections: nil, - messageClient: mc, - queryNodeTimeSync: queryNodeTimeSync, + QueryNodeId: queryNodeId, + Collections: nil, + SegmentsMap: segmentsMap, + messageClient: mc, + queryNodeTimeSync: queryNodeTimeSync, + deleteRecordsMap: make(map[int64]DeleteRecords), } } @@ -82,10 +94,15 @@ func (node *QueryNode) DeleteCollection(collection *Collection) { //////////////////////////////////////////////////////////////////////////////////////////////////// -func (node *QueryNode) doQueryNode (wg *sync.WaitGroup) { +func (node *QueryNode) doQueryNode(wg *sync.WaitGroup) { wg.Add(3) - go node.Insert(node.messageClient.InsertMsg, wg) - go node.Delete(node.messageClient.DeleteMsg, wg) + // Do insert and delete messages sort, do insert + go node.InsertAndDelete(node.messageClient.InsertMsg, wg) + // Do delete messages sort + go node.searchDeleteInMap() + // Do delete + go node.Delete() + // Do search go node.Search(node.messageClient.SearchMsg, wg) wg.Wait() } @@ -102,9 +119,9 @@ func (node *QueryNode) StartMessageClient() { go node.messageClient.ReceiveMessage() } -func (node *QueryNode) GetSegmentByEntityId(entityId uint64) *Segment { +func (node *QueryNode) GetSegmentByEntityId() ([]int64, []uint64, []int64) { // TODO: get id2segment info from pulsar - return nil + return nil, nil, nil } func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) { @@ -133,9 +150,14 @@ func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *st return nil, errors.New("cannot found target segment") } -func (node *QueryNode) GetTSOTime() uint64 { - // TODO: Add time sync - return 0 +func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) { + targetSegment := node.SegmentsMap[segmentID] + + if targetSegment == nil { + return nil, errors.New("cannot found segment with id = " + strconv.FormatInt(segmentID, 10)) + } + + return targetSegment, nil } //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -150,7 +172,8 @@ func (node *QueryNode) InitQueryNodeCollection() { } func (node *QueryNode) SegmentsManagement() { - var timeNow = node.GetTSOTime() + node.queryNodeTimeSync.UpdateTSOTimeSync() + var timeNow = node.queryNodeTimeSync.TSOTimeSync for _, collection := range node.Collections { for _, partition := range collection.Partitions { for _, oldSegment := range partition.OpenedSegments { @@ -181,155 +204,227 @@ func (node *QueryNode) SegmentService() { } /////////////////////////////////////////////////////////////////////////////////////////////////// -func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitGroup) schema.Status { - var timeNow = node.GetTSOTime() - var collectionName = insertMessages[0].CollectionName - var partitionTag = insertMessages[0].PartitionTag - var clientId = insertMessages[0].ClientId +// TODO: receive delete messages individually +func (node *QueryNode) InsertAndDelete(insertDeleteMessages []*schema.InsertOrDeleteMsg, wg *sync.WaitGroup) schema.Status { + node.queryNodeTimeSync.UpdateReadTimeSync() - // TODO: prevent Memory copy - var entityIds []uint64 - var timestamps []uint64 - var vectorRecords [][]*schema.FieldValue + var tMin = node.queryNodeTimeSync.ReadTimeSyncMin + var tMax = node.queryNodeTimeSync.ReadTimeSyncMax + var readTimeSyncRange = TimeRange{timestampMin: tMin, timestampMax: tMax} - for i, msg := range node.buffer.InsertBuffer { - if msg.Timestamp <= timeNow { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - vectorRecords = append(vectorRecords, msg.Fields) - node.buffer.validInsertBuffer[i] = false + var clientId = insertDeleteMessages[0].ClientId + + var insertIDs = make(map[int64][]int64) + var insertTimestamps = make(map[int64][]uint64) + var insertRecords = make(map[int64][][]byte) + + // 1. Extract messages before readTimeSync from QueryNodeDataBuffer. + // Set valid bitmap to false. + for i, msg := range node.buffer.InsertDeleteBuffer { + if msg.Timestamp <= tMax { + if msg.Op == schema.OpType_INSERT { + insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid) + insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp) + insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob) + } else if msg.Op == schema.OpType_DELETE { + var r = DeleteRecord { + entityID: msg.Uid, + timestamp: msg.Timestamp, + } + *node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r) + node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1 + } + node.buffer.validInsertDeleteBuffer[i] = false } } - for i, isValid := range node.buffer.validInsertBuffer { + // 2. Remove invalid messages from buffer. + for i, isValid := range node.buffer.validInsertDeleteBuffer { if !isValid { - copy(node.buffer.InsertBuffer[i:], node.buffer.InsertBuffer[i+1:]) // Shift a[i+1:] left one index. - node.buffer.InsertBuffer[len(node.buffer.InsertBuffer)-1] = nil // Erase last element (write zero value). - node.buffer.InsertBuffer = node.buffer.InsertBuffer[:len(node.buffer.InsertBuffer)-1] // Truncate slice. + copy(node.buffer.InsertDeleteBuffer[i:], node.buffer.InsertDeleteBuffer[i+1:]) // Shift a[i+1:] left one index. + node.buffer.InsertDeleteBuffer[len(node.buffer.InsertDeleteBuffer)-1] = nil // Erase last element (write zero value). + node.buffer.InsertDeleteBuffer = node.buffer.InsertDeleteBuffer[:len(node.buffer.InsertDeleteBuffer)-1] // Truncate slice. } } - for _, msg := range insertMessages { - if msg.Timestamp <= timeNow { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - vectorRecords = append(vectorRecords, msg.Fields) + // 3. Extract messages before readTimeSync from current messageClient. + // Move massages after readTimeSync to QueryNodeDataBuffer. + // Set valid bitmap to true. + for _, msg := range insertDeleteMessages { + if msg.Timestamp <= tMax { + if msg.Op == schema.OpType_INSERT { + insertIDs[msg.SegmentId] = append(insertIDs[msg.SegmentId], msg.Uid) + insertTimestamps[msg.SegmentId] = append(insertTimestamps[msg.SegmentId], msg.Timestamp) + insertRecords[msg.SegmentId] = append(insertRecords[msg.SegmentId], msg.RowsData.Blob) + } else if msg.Op == schema.OpType_DELETE { + var r = DeleteRecord { + entityID: msg.Uid, + timestamp: msg.Timestamp, + } + *node.deleteRecordsMap[readTimeSyncRange].deleteRecords = append(*node.deleteRecordsMap[readTimeSyncRange].deleteRecords, r) + node.deleteRecordsMap[readTimeSyncRange].count <- <- node.deleteRecordsMap[readTimeSyncRange].count + 1 + } } else { - node.buffer.InsertBuffer = append(node.buffer.InsertBuffer, msg) - node.buffer.validInsertBuffer = append(node.buffer.validInsertBuffer, true) + node.buffer.InsertDeleteBuffer = append(node.buffer.InsertDeleteBuffer, msg) + node.buffer.validInsertDeleteBuffer = append(node.buffer.validInsertDeleteBuffer, true) } } - var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) - if err != nil { - // TODO: throw runtime error - fmt.Println(err.Error()) - return schema.Status{} + // 4. Do insert + // TODO: multi-thread insert + for segmentID, records := range insertRecords { + var targetSegment, err = node.GetSegmentBySegmentID(segmentID) + if err != nil { + fmt.Println(err.Error()) + return schema.Status{ErrorCode: 1} + } + ids := insertIDs[segmentID] + timestamps := insertTimestamps[segmentID] + err = targetSegment.SegmentInsert(&ids, ×tamps, &records, tMin, tMax) + if err != nil { + fmt.Println(err.Error()) + return schema.Status{ErrorCode: 1} + } } - // TODO: check error - var result, _ = SegmentInsert(targetSegment, &entityIds, ×tamps, vectorRecords) - wg.Done() - return publishResult(&result, clientId) + return publishResult(nil, clientId) } -func (node *QueryNode) Delete(deleteMessages []*schema.DeleteMsg, wg *sync.WaitGroup) schema.Status { - var timeNow = node.GetTSOTime() - var clientId = deleteMessages[0].ClientId +//func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitGroup) schema.Status { +// var timeNow = node.GetTSOTime() +// var collectionName = insertMessages[0].CollectionName +// var partitionTag = insertMessages[0].PartitionTag +// var clientId = insertMessages[0].ClientId +// +// // TODO: prevent Memory copy +// var entityIds []uint64 +// var timestamps []uint64 +// var vectorRecords [][]*schema.FieldValue +// +// for i, msg := range node.buffer.InsertBuffer { +// if msg.Timestamp <= timeNow { +// entityIds = append(entityIds, msg.EntityId) +// timestamps = append(timestamps, msg.Timestamp) +// vectorRecords = append(vectorRecords, msg.Fields) +// node.buffer.validInsertBuffer[i] = false +// } +// } +// +// for i, isValid := range node.buffer.validInsertBuffer { +// if !isValid { +// copy(node.buffer.InsertBuffer[i:], node.buffer.InsertBuffer[i+1:]) // Shift a[i+1:] left one index. +// node.buffer.InsertBuffer[len(node.buffer.InsertBuffer)-1] = nil // Erase last element (write zero value). +// node.buffer.InsertBuffer = node.buffer.InsertBuffer[:len(node.buffer.InsertBuffer)-1] // Truncate slice. +// } +// } +// +// for _, msg := range insertMessages { +// if msg.Timestamp <= timeNow { +// entityIds = append(entityIds, msg.EntityId) +// timestamps = append(timestamps, msg.Timestamp) +// vectorRecords = append(vectorRecords, msg.Fields) +// } else { +// node.buffer.InsertBuffer = append(node.buffer.InsertBuffer, msg) +// node.buffer.validInsertBuffer = append(node.buffer.validInsertBuffer, true) +// } +// } +// +// var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) +// if err != nil { +// // TODO: throw runtime error +// fmt.Println(err.Error()) +// return schema.Status{} +// } +// +// // TODO: check error +// var result, _ = SegmentInsert(targetSegment, &entityIds, ×tamps, vectorRecords) +// +// wg.Done() +// return publishResult(&result, clientId) +//} - // TODO: prevent Memory copy - var entityIds []uint64 - var timestamps []uint64 +func (node *QueryNode) searchDeleteInMap() { + var ids, timestamps, segmentIDs = node.GetSegmentByEntityId() - for i, msg := range node.buffer.DeleteBuffer { - if msg.Timestamp <= timeNow { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - node.buffer.validDeleteBuffer[i] = false + for i := 0; i <= len(ids); i++ { + id := ids[i] + timestamp := timestamps[i] + segmentID := segmentIDs[i] + for timeRange, records := range node.deleteRecordsMap { + if timestamp < timeRange.timestampMax && timestamp > timeRange.timestampMin { + for _, r := range *records.deleteRecords { + if r.timestamp == timestamp && r.entityID == id { + r.segmentID = segmentID + records.count <- <- records.count - 1 + } + } + } + } + } +} + +func (node *QueryNode) Delete() schema.Status { + type DeleteData struct { + ids *[]int64 + timestamp *[]uint64 + } + for _, records := range node.deleteRecordsMap { + // TODO: multi-thread delete + if <- records.count == 0 { + // 1. Sort delete records by segment id + segment2records := make(map[int64]DeleteData) + for _, r := range *records.deleteRecords { + *segment2records[r.segmentID].ids = append(*segment2records[r.segmentID].ids, r.entityID) + *segment2records[r.segmentID].timestamp = append(*segment2records[r.segmentID].timestamp, r.timestamp) + } + // 2. Do batched delete + for segmentID, deleteData := range segment2records { + var segment, err = node.GetSegmentBySegmentID(segmentID) + if err != nil { + fmt.Println(err.Error()) + return schema.Status{ErrorCode: 1} + } + err = segment.SegmentDelete(deleteData.ids, deleteData.timestamp) + if err != nil { + fmt.Println(err.Error()) + return schema.Status{ErrorCode: 1} + } + } } } - for i, isValid := range node.buffer.validDeleteBuffer { - if !isValid { - copy(node.buffer.DeleteBuffer[i:], node.buffer.DeleteBuffer[i+1:]) // Shift a[i+1:] left one index. - node.buffer.DeleteBuffer[len(node.buffer.DeleteBuffer)-1] = nil // Erase last element (write zero value). - node.buffer.DeleteBuffer = node.buffer.DeleteBuffer[:len(node.buffer.DeleteBuffer)-1] // Truncate slice. - } - } - - for _, msg := range deleteMessages { - if msg.Timestamp <= timeNow { - entityIds = append(entityIds, msg.EntityId) - timestamps = append(timestamps, msg.Timestamp) - } else { - node.buffer.DeleteBuffer = append(node.buffer.DeleteBuffer, msg) - node.buffer.validDeleteBuffer = append(node.buffer.validDeleteBuffer, true) - } - } - - if entityIds == nil { - // TODO: throw runtime error - fmt.Println("no entities found") - return schema.Status{} - } - // TODO: does all entities from a common batch are in the same segment? - var targetSegment = node.GetSegmentByEntityId(entityIds[0]) - - // TODO: check error - var result, _ = SegmentDelete(targetSegment, &entityIds, ×tamps) - - wg.Done() - return publishResult(&result, clientId) + return schema.Status{ErrorCode: 0} } func (node *QueryNode) Search(searchMessages []*schema.SearchMsg, wg *sync.WaitGroup) schema.Status { - var timeNow = node.GetTSOTime() - var collectionName = searchMessages[0].CollectionName - var partitionTag = searchMessages[0].PartitionTag var clientId = searchMessages[0].ClientId - var queryString = searchMessages[0].VectorParam.Json - - // TODO: prevent Memory copy - var records []schema.VectorRecord - var timestamps []uint64 - - for i, msg := range node.buffer.SearchBuffer { - if msg.Timestamp <= timeNow { - records = append(records, *msg.VectorParam.RowRecord) - timestamps = append(timestamps, msg.Timestamp) - node.buffer.validSearchBuffer[i] = false - } - } - - for i, isValid := range node.buffer.validSearchBuffer { - if !isValid { - copy(node.buffer.SearchBuffer[i:], node.buffer.SearchBuffer[i+1:]) // Shift a[i+1:] left one index. - node.buffer.SearchBuffer[len(node.buffer.SearchBuffer)-1] = nil // Erase last element (write zero value). - node.buffer.SearchBuffer = node.buffer.SearchBuffer[:len(node.buffer.SearchBuffer)-1] // Truncate slice. - } - } + // Traverse all messages in the current messageClient. + // TODO: Do not receive batched search requests for _, msg := range searchMessages { - if msg.Timestamp <= timeNow { - records = append(records, *msg.VectorParam.RowRecord) - timestamps = append(timestamps, msg.Timestamp) - } else { - node.buffer.SearchBuffer = append(node.buffer.SearchBuffer, msg) - node.buffer.validSearchBuffer = append(node.buffer.validSearchBuffer, true) + var results []*SearchResult + // TODO: get top-k's k from queryString + const TopK = 1 + + // 1. Do search in all segment + var timestamp = msg.Timestamp + var vector = msg.Records + for _, segment := range node.SegmentsMap { + var res, err = segment.SegmentSearch("", timestamp, vector) + if err != nil { + fmt.Println(err.Error()) + return schema.Status{ErrorCode: 1} + } + results = append(results, res) } - } - var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag) - if err != nil { - // TODO: throw runtime error - fmt.Println(err.Error()) - return schema.Status{} - } + // 2. Reduce results - // TODO: check error - var result, _ = SegmentSearch(targetSegment, queryString, ×tamps, &records) + // 3. publish result to pulsar + publishSearchResult(&results, clientId) + } wg.Done() - return publishSearchResult(result, clientId) + return schema.Status{ErrorCode: 0} } diff --git a/reader/query_node_time.go b/reader/query_node_time.go new file mode 100644 index 0000000000..1c9afd3f18 --- /dev/null +++ b/reader/query_node_time.go @@ -0,0 +1,35 @@ +package reader + +type QueryNodeTime struct { + ReadTimeSyncMin uint64 + ReadTimeSyncMax uint64 + WriteTimeSync uint64 + SearchTimeSync uint64 + TSOTimeSync uint64 +} + +type TimeRange struct { + timestampMin uint64 + timestampMax uint64 +} + +func (t *QueryNodeTime) UpdateReadTimeSync() { + t.ReadTimeSyncMin = t.ReadTimeSyncMax + // TODO: Add time sync + t.ReadTimeSyncMax = 1 +} + +func (t *QueryNodeTime) UpdateWriteTimeSync() { + // TODO: Add time sync + t.WriteTimeSync = 0 +} + +func (t *QueryNodeTime) UpdateSearchTimeSync() { + // TODO: Add time sync + t.SearchTimeSync = 0 +} + +func (t *QueryNodeTime) UpdateTSOTimeSync() { + // TODO: Add time sync + t.TSOTimeSync = 0 +} diff --git a/reader/result.go b/reader/result.go index 20e9da48ce..7e1a509de4 100644 --- a/reader/result.go +++ b/reader/result.go @@ -2,7 +2,7 @@ package reader import ( "fmt" - schema2 "github.com/czs007/suvlim/pulsar/client-go/schema" + msgpb "github.com/czs007/suvlim/pkg/message" "strconv" ) @@ -18,24 +18,24 @@ func getResultTopicByClientId(clientId int64) string { return "result-topic/partition-" + strconv.FormatInt(clientId, 10) } -func publishResult(ids *ResultEntityIds, clientId int64) schema2.Status { +func publishResult(ids *ResultEntityIds, clientId int64) msgpb.Status { // TODO: Pulsar publish var resultTopic = getResultTopicByClientId(clientId) fmt.Println(resultTopic) - return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS} + return msgpb.Status{ErrorCode: msgpb.ErrorCode_SUCCESS} } -func publishSearchResult(searchResults *[]SearchResult, clientId int64) schema2.Status { +func publishSearchResult(searchResults *[]*SearchResult, clientId int64) msgpb.Status { // TODO: Pulsar publish var resultTopic = getResultTopicByClientId(clientId) fmt.Println(resultTopic) - return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS} + return msgpb.Status{ErrorCode: msgpb.ErrorCode_SUCCESS} } -func publicStatistic(statisticTopic string) schema2.Status { +func publicStatistic(statisticTopic string) msgpb.Status { // TODO: get statistic info // getStatisticInfo() // var info = getStatisticInfo() // TODO: Pulsar publish - return schema2.Status{Error_code: schema2.ErrorCode_SUCCESS} + return msgpb.Status{ErrorCode: msgpb.ErrorCode_SUCCESS} } diff --git a/reader/segment.go b/reader/segment.go index 756e5b3342..0c3423b064 100644 --- a/reader/segment.go +++ b/reader/segment.go @@ -14,7 +14,7 @@ package reader import "C" import ( "github.com/czs007/suvlim/errors" - "github.com/czs007/suvlim/pulsar/client-go/schema" + schema "github.com/czs007/suvlim/pkg/message" "strconv" "unsafe" ) @@ -28,7 +28,7 @@ const ( type Segment struct { SegmentPtr C.CSegmentBase - SegmentId uint64 + SegmentId int64 SegmentCloseTime uint64 } @@ -45,21 +45,21 @@ func (s *Segment) GetStatus() int { } } -func (s *Segment) GetSegmentID() uint64 { +func (s *Segment) GetSegmentID() int64 { /*C.GetSegmentId unsigned long GetSegmentId(CSegmentBase c_segment); */ var segmentID = C.GetSegmentId(s.SegmentPtr) - return uint64(segmentID) + return int64(segmentID) } -func (s *Segment) SetSegmentID(segmentID uint64) { +func (s *Segment) SetSegmentID(segmentID int64) { /*C.SetSegmentId void SetSegmentId(CSegmentBase c_segment, unsigned long segment_id); */ - C.SetSegmentId(s.SegmentPtr, C.ulong(segmentID)) + C.SetSegmentId(s.SegmentPtr, C.long(segmentID)) } func (s *Segment) GetMaxTimestamp() uint64 { @@ -127,7 +127,7 @@ func (s *Segment) Close() error { } //////////////////////////////////////////////////////////////////////////// -func SegmentInsert(segment *Segment, entityIds *[]uint64, timestamps *[]uint64, dataChunk [][]*schema.FieldValue) (ResultEntityIds, error) { +func (s *Segment) SegmentInsert(entityIds *[]int64, timestamps *[]uint64, records *[][]byte, timestampMin uint64, timestampMax uint64) error { /*C.Insert int Insert(CSegmentBase c_segment, @@ -156,16 +156,16 @@ func SegmentInsert(segment *Segment, entityIds *[]uint64, timestamps *[]uint64, } const sizeofPerRow = 4 + DIM * 4 - var status = C.Insert(segment.SegmentPtr, C.long(N), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N)) + var status = C.Insert(s.SegmentPtr, C.long(N), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0]), unsafe.Pointer(&rawData[0]), C.int(sizeofPerRow), C.long(N)) if status != 0 { - return nil, errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) + return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) } - return ResultEntityIds{}, nil + return nil } -func SegmentDelete(segment *Segment, entityIds *[]uint64, timestamps *[]uint64) (ResultEntityIds, error) { +func (s *Segment) SegmentDelete(entityIds *[]int64, timestamps *[]uint64) error { /*C.Delete int Delete(CSegmentBase c_segment, @@ -175,16 +175,16 @@ func SegmentDelete(segment *Segment, entityIds *[]uint64, timestamps *[]uint64) */ size := len(*entityIds) - var status = C.Delete(segment.SegmentPtr, C.long(size), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0])) + var status = C.Delete(s.SegmentPtr, C.long(size), (*C.ulong)(&(*entityIds)[0]), (*C.ulong)(&(*timestamps)[0])) if status != 0 { - return nil, errors.New("Delete failed, error code = " + strconv.Itoa(int(status))) + return errors.New("Delete failed, error code = " + strconv.Itoa(int(status))) } - return ResultEntityIds{}, nil + return nil } -func SegmentSearch(segment *Segment, queryString string, timestamps *[]uint64, vectorRecord *[]schema.VectorRecord) (*[]SearchResult, error) { +func (s *Segment) SegmentSearch(queryString string, timestamp uint64, vectorRecord *schema.VectorRowRecord) (*SearchResult, error) { /*C.Search int Search(CSegmentBase c_segment, @@ -193,22 +193,16 @@ func SegmentSearch(segment *Segment, queryString string, timestamps *[]uint64, v long int* result_ids, float* result_distances); */ - var results []SearchResult - // TODO: get top-k's k from queryString const TopK = 1 - for timestamp := range *timestamps { - resultIds := make([]int64, TopK) - resultDistances := make([]float32, TopK) + resultIds := make([]int64, TopK) + resultDistances := make([]float32, TopK) - var status = C.Search(segment.SegmentPtr, unsafe.Pointer(nil), C.ulong(timestamp), (*C.long)(&resultIds[0]), (*C.float)(&resultDistances[0])) - if status != 0 { - return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) - } - - results = append(results, SearchResult{ResultIds: resultIds, ResultDistances: resultDistances}) + var status = C.Search(s.SegmentPtr, unsafe.Pointer(nil), C.ulong(timestamp), (*C.long)(&resultIds[0]), (*C.float)(&resultDistances[0])) + if status != 0 { + return nil, errors.New("Search failed, error code = " + strconv.Itoa(int(status))) } - return &results, nil + return &SearchResult{ResultIds: resultIds, ResultDistances: resultDistances}, nil }