diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index 1ce93bcb4c..49f9b729b5 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -55,7 +55,8 @@ func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { } //fmt.Printf("Received message msgId: %#v -- content: '%s'\n", // msg.ID(), m.SegementID) - fmt.Println("Received SegmentStats -- segmentID:", m.SegementID, ",memSize:", m.MemorySize, ",memRate:", m.MemoryRate) + fmt.Println("Received SegmentStats -- segmentID:", m.SegementID, + ",memSize:", m.MemorySize, ",memRate:", m.MemoryRate, ",numRows:", m.Rows, ",status:", m.Status) ssChan <- m consumer.Ack(msg) } diff --git a/pkg/master/server.go b/pkg/master/server.go index aa06c0600f..f147828045 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -175,7 +175,7 @@ func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error { if err != nil { return err } - err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData) + err = kvbase.Save("segment/"+strconv.Itoa(int(seg.SegmentID)), segData) if err != nil { return err } diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index a0e75158d5..d766d0cd38 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -16,6 +16,7 @@ import "C" import ( "encoding/json" "fmt" + "github.com/czs007/suvlim/conf" "github.com/stretchr/testify/assert" "log" "sort" @@ -332,16 +333,16 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") - for { - if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { + //for { + //if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync { var status = node.Search(node.messageClient.SearchMsg) if status.ErrorCode != 0 { fmt.Println("Search Failed") node.PublishFailedSearchResult() } - break - } - } + //break + //} + //} default: } } @@ -583,8 +584,8 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // So the ServiceTimeSync is always less than searchTimestamp. // Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds. // Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`. - // var logicTimestamp = searchTimestamp << 46 >> 46 - // searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp + var logicTimestamp = searchTimestamp << 46 >> 46 + searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval + 600)) << 18 + logicTimestamp var vector = msg.Records // We now only the first Json is valid. @@ -602,6 +603,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 3. Do search in all segments for _, segment := range node.SegmentsMap { + if segment.GetRowCount() <= 0 { + // Skip empty segment + continue + } + fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) var res, err = segment.SegmentSearch(query, searchTimestamp, vector) if err != nil { @@ -618,7 +624,9 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { sort.Slice(resultsTmp, func(i, j int) bool { return resultsTmp[i].ResultDistance < resultsTmp[j].ResultDistance }) - resultsTmp = resultsTmp[:query.TopK] + if len(resultsTmp) > query.TopK { + resultsTmp = resultsTmp[:query.TopK] + } var entities = msgPb.Entities{ Ids: make([]int64, 0), } diff --git a/reader/read_node/segment.go b/reader/read_node/segment.go index f6f8044c3c..3da047a4e8 100644 --- a/reader/read_node/segment.go +++ b/reader/read_node/segment.go @@ -144,8 +144,10 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[] var sizeofPerRow = len((*records)[0]) var rawData = make([]byte, numOfRow*sizeofPerRow) + var copyOffset = 0 for i := 0; i < len(*records); i++ { - copy(rawData, (*records)[i]) + copy(rawData[copyOffset:], (*records)[i]) + copyOffset += len((*records)[i]) } var cOffset = C.long(offset) diff --git a/sdk/examples/simple/insert.cpp b/sdk/examples/simple/insert.cpp index 86c1967d28..7d4c8859d5 100644 --- a/sdk/examples/simple/insert.cpp +++ b/sdk/examples/simple/insert.cpp @@ -23,7 +23,7 @@ const int N = 200000; const int DIM = 16; -const int LOOP = 10; +const int LOOP = 1; const milvus::FieldValue GetData() { milvus::FieldValue value_map; @@ -32,7 +32,7 @@ const milvus::FieldValue GetData() { for (int i = 0; i < N; i++) { int32_data.push_back(i); } - std::default_random_engine eng(rand() % 20); + std::default_random_engine eng(42); std::normal_distribution dis(0, 1); std::vector vector_data; for (int i = 0; i < N; i++) { diff --git a/sdk/examples/simple/search.cpp b/sdk/examples/simple/search.cpp index 1da315232a..f4d75eec72 100644 --- a/sdk/examples/simple/search.cpp +++ b/sdk/examples/simple/search.cpp @@ -41,13 +41,13 @@ int main(int argc , char**argv) { milvus::VectorParam vectorParam; std::vector vector_records; - std::default_random_engine eng(rand() % 20); + std::default_random_engine eng(42); std::normal_distribution dis(0, 1); - for (int j = 0; j < 10; ++j) { + for (int j = 0; j < 1; ++j) { milvus::VectorData vectorData; std::vector float_data; - for (int i = 0; i < 100; ++i) { + for (int i = 0; i < 16; ++i) { float_data.emplace_back(dis(eng)); }