mirror of https://github.com/milvus-io/milvus.git
Fix segment insertion error, and improve search strategy
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/4973/head^2
parent
567232b10f
commit
5e429b2a94
|
@ -55,7 +55,8 @@ func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error {
|
||||||
}
|
}
|
||||||
//fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
|
//fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
|
||||||
// msg.ID(), m.SegementID)
|
// msg.ID(), m.SegementID)
|
||||||
fmt.Println("Received SegmentStats -- segmentID:", m.SegementID, ",memSize:", m.MemorySize, ",memRate:", m.MemoryRate)
|
fmt.Println("Received SegmentStats -- segmentID:", m.SegementID,
|
||||||
|
",memSize:", m.MemorySize, ",memRate:", m.MemoryRate, ",numRows:", m.Rows, ",status:", m.Status)
|
||||||
ssChan <- m
|
ssChan <- m
|
||||||
consumer.Ack(msg)
|
consumer.Ack(msg)
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,7 +175,7 @@ func UpdateSegmentStatus(ss mock.SegmentStats, kvbase kv.Base) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = kvbase.Save("segment/"+strconv.Itoa(int(seg.CollectionID)), segData)
|
err = kvbase.Save("segment/"+strconv.Itoa(int(seg.SegmentID)), segData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ import "C"
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/czs007/suvlim/conf"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"log"
|
"log"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -332,16 +333,16 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
|
||||||
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
|
node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0]
|
||||||
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
|
node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg)
|
||||||
fmt.Println("Do Search...")
|
fmt.Println("Do Search...")
|
||||||
for {
|
//for {
|
||||||
if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync {
|
//if node.messageClient.SearchMsg[0].Timestamp < node.queryNodeTimeSync.ServiceTimeSync {
|
||||||
var status = node.Search(node.messageClient.SearchMsg)
|
var status = node.Search(node.messageClient.SearchMsg)
|
||||||
if status.ErrorCode != 0 {
|
if status.ErrorCode != 0 {
|
||||||
fmt.Println("Search Failed")
|
fmt.Println("Search Failed")
|
||||||
node.PublishFailedSearchResult()
|
node.PublishFailedSearchResult()
|
||||||
}
|
}
|
||||||
break
|
//break
|
||||||
}
|
//}
|
||||||
}
|
//}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -583,8 +584,8 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
||||||
// So the ServiceTimeSync is always less than searchTimestamp.
|
// So the ServiceTimeSync is always less than searchTimestamp.
|
||||||
// Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds.
|
// Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds.
|
||||||
// Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`.
|
// Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`.
|
||||||
// var logicTimestamp = searchTimestamp << 46 >> 46
|
var logicTimestamp = searchTimestamp << 46 >> 46
|
||||||
// searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp
|
searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval + 600)) << 18 + logicTimestamp
|
||||||
|
|
||||||
var vector = msg.Records
|
var vector = msg.Records
|
||||||
// We now only the first Json is valid.
|
// We now only the first Json is valid.
|
||||||
|
@ -602,6 +603,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
||||||
|
|
||||||
// 3. Do search in all segments
|
// 3. Do search in all segments
|
||||||
for _, segment := range node.SegmentsMap {
|
for _, segment := range node.SegmentsMap {
|
||||||
|
if segment.GetRowCount() <= 0 {
|
||||||
|
// Skip empty segment
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount())
|
fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount())
|
||||||
var res, err = segment.SegmentSearch(query, searchTimestamp, vector)
|
var res, err = segment.SegmentSearch(query, searchTimestamp, vector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -618,7 +624,9 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
|
||||||
sort.Slice(resultsTmp, func(i, j int) bool {
|
sort.Slice(resultsTmp, func(i, j int) bool {
|
||||||
return resultsTmp[i].ResultDistance < resultsTmp[j].ResultDistance
|
return resultsTmp[i].ResultDistance < resultsTmp[j].ResultDistance
|
||||||
})
|
})
|
||||||
resultsTmp = resultsTmp[:query.TopK]
|
if len(resultsTmp) > query.TopK {
|
||||||
|
resultsTmp = resultsTmp[:query.TopK]
|
||||||
|
}
|
||||||
var entities = msgPb.Entities{
|
var entities = msgPb.Entities{
|
||||||
Ids: make([]int64, 0),
|
Ids: make([]int64, 0),
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,8 +144,10 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
|
||||||
var sizeofPerRow = len((*records)[0])
|
var sizeofPerRow = len((*records)[0])
|
||||||
|
|
||||||
var rawData = make([]byte, numOfRow*sizeofPerRow)
|
var rawData = make([]byte, numOfRow*sizeofPerRow)
|
||||||
|
var copyOffset = 0
|
||||||
for i := 0; i < len(*records); i++ {
|
for i := 0; i < len(*records); i++ {
|
||||||
copy(rawData, (*records)[i])
|
copy(rawData[copyOffset:], (*records)[i])
|
||||||
|
copyOffset += len((*records)[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
var cOffset = C.long(offset)
|
var cOffset = C.long(offset)
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
|
|
||||||
const int N = 200000;
|
const int N = 200000;
|
||||||
const int DIM = 16;
|
const int DIM = 16;
|
||||||
const int LOOP = 10;
|
const int LOOP = 1;
|
||||||
|
|
||||||
const milvus::FieldValue GetData() {
|
const milvus::FieldValue GetData() {
|
||||||
milvus::FieldValue value_map;
|
milvus::FieldValue value_map;
|
||||||
|
@ -32,7 +32,7 @@ const milvus::FieldValue GetData() {
|
||||||
for (int i = 0; i < N; i++) {
|
for (int i = 0; i < N; i++) {
|
||||||
int32_data.push_back(i);
|
int32_data.push_back(i);
|
||||||
}
|
}
|
||||||
std::default_random_engine eng(rand() % 20);
|
std::default_random_engine eng(42);
|
||||||
std::normal_distribution<float> dis(0, 1);
|
std::normal_distribution<float> dis(0, 1);
|
||||||
std::vector<milvus::VectorData> vector_data;
|
std::vector<milvus::VectorData> vector_data;
|
||||||
for (int i = 0; i < N; i++) {
|
for (int i = 0; i < N; i++) {
|
||||||
|
|
|
@ -41,13 +41,13 @@ int main(int argc , char**argv) {
|
||||||
milvus::VectorParam vectorParam;
|
milvus::VectorParam vectorParam;
|
||||||
std::vector<milvus::VectorData> vector_records;
|
std::vector<milvus::VectorData> vector_records;
|
||||||
|
|
||||||
std::default_random_engine eng(rand() % 20);
|
std::default_random_engine eng(42);
|
||||||
std::normal_distribution<float> dis(0, 1);
|
std::normal_distribution<float> dis(0, 1);
|
||||||
|
|
||||||
for (int j = 0; j < 10; ++j) {
|
for (int j = 0; j < 1; ++j) {
|
||||||
milvus::VectorData vectorData;
|
milvus::VectorData vectorData;
|
||||||
std::vector<float> float_data;
|
std::vector<float> float_data;
|
||||||
for (int i = 0; i < 100; ++i) {
|
for (int i = 0; i < 16; ++i) {
|
||||||
float_data.emplace_back(dis(eng));
|
float_data.emplace_back(dis(eng));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue