mirror of https://github.com/milvus-io/milvus.git
Check the status of search results before reduce in Proxy
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/4973/head^2
parent
0cd3e8d86c
commit
2358fae703
8
Makefile
8
Makefile
|
@ -55,13 +55,13 @@ verifiers: cppcheck fmt lint ruleguard
|
|||
|
||||
# Builds various components locally.
|
||||
build-go:
|
||||
@echo "Building each component's binary to './'"
|
||||
@echo "Building each component's binary to './bin'"
|
||||
@echo "Building query node ..."
|
||||
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
|
||||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
|
||||
@echo "Building master ..."
|
||||
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
|
||||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
|
||||
@echo "Building proxy ..."
|
||||
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null
|
||||
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null
|
||||
|
||||
build-cpp:
|
||||
@(env bash $(PWD)/scripts/core_build.sh)
|
||||
|
|
|
@ -23,7 +23,7 @@ pipeline {
|
|||
stage ('Build and UnitTest') {
|
||||
agent {
|
||||
kubernetes {
|
||||
label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-build"
|
||||
label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-build"
|
||||
defaultContainer 'build-env'
|
||||
customWorkspace '/home/jenkins/agent/workspace'
|
||||
yamlFile "build/ci/jenkins/pod/build-env.yaml"
|
||||
|
@ -46,7 +46,7 @@ pipeline {
|
|||
stage ('Publish Docker Images') {
|
||||
agent {
|
||||
kubernetes {
|
||||
label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-publish"
|
||||
label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-publish"
|
||||
defaultContainer 'publish-images'
|
||||
yamlFile "build/ci/jenkins/pod/docker-pod.yaml"
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ services:
|
|||
environment:
|
||||
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
|
||||
ETCD_ADDRESS: ${ETCD_ADDRESS}
|
||||
MASTER_ADDRESS: ${MASTER_ADDRESS}
|
||||
networks:
|
||||
- milvus
|
||||
ports:
|
||||
|
@ -26,7 +25,6 @@ services:
|
|||
- ${SOURCE_REPO}/proxy:${SOURCE_TAG}
|
||||
environment:
|
||||
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
|
||||
ETCD_ADDRESS: ${ETCD_ADDRESS}
|
||||
MASTER_ADDRESS: ${MASTER_ADDRESS}
|
||||
ports:
|
||||
- "19530:19530"
|
||||
|
|
|
@ -11,14 +11,14 @@ import (
|
|||
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||
)
|
||||
|
||||
type MetaCache interface {
|
||||
type Cache interface {
|
||||
Hit(collectionName string) bool
|
||||
Get(collectionName string) (*servicepb.CollectionDescription, error)
|
||||
Update(collectionName string) error
|
||||
//Write(collectionName string, schema *servicepb.CollectionDescription) error
|
||||
Remove(collectionName string) error
|
||||
}
|
||||
|
||||
var globalMetaCache MetaCache
|
||||
var globalMetaCache Cache
|
||||
|
||||
type SimpleMetaCache struct {
|
||||
mu sync.RWMutex
|
||||
|
@ -30,29 +30,54 @@ type SimpleMetaCache struct {
|
|||
ctx context.Context
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Hit(collectionName string) bool {
|
||||
smc.mu.RLock()
|
||||
defer smc.mu.RUnlock()
|
||||
_, ok := smc.metas[collectionName]
|
||||
func (metaCache *SimpleMetaCache) Hit(collectionName string) bool {
|
||||
metaCache.mu.RLock()
|
||||
defer metaCache.mu.RUnlock()
|
||||
_, ok := metaCache.metas[collectionName]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
|
||||
smc.mu.RLock()
|
||||
defer smc.mu.RUnlock()
|
||||
schema, ok := smc.metas[collectionName]
|
||||
func (metaCache *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
|
||||
metaCache.mu.RLock()
|
||||
defer metaCache.mu.RUnlock()
|
||||
schema, ok := metaCache.metas[collectionName]
|
||||
if !ok {
|
||||
return nil, errors.New("collection meta miss")
|
||||
}
|
||||
return schema, nil
|
||||
}
|
||||
|
||||
func (smc *SimpleMetaCache) Update(collectionName string) error {
|
||||
reqID, err := smc.reqIDAllocator.AllocOne()
|
||||
func (metaCache *SimpleMetaCache) Update(collectionName string) error {
|
||||
reqID, err := metaCache.reqIDAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts, err := smc.tsoAllocator.AllocOne()
|
||||
ts, err := metaCache.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hasCollectionReq := &internalpb.HasCollectionRequest{
|
||||
MsgType: internalpb.MsgType_kHasCollection,
|
||||
ReqID: reqID,
|
||||
Timestamp: ts,
|
||||
ProxyID: metaCache.proxyID,
|
||||
CollectionName: &servicepb.CollectionName{
|
||||
CollectionName: collectionName,
|
||||
},
|
||||
}
|
||||
has, err := metaCache.masterClient.HasCollection(metaCache.ctx, hasCollectionReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !has.Value {
|
||||
return errors.New("collection " + collectionName + " not exists")
|
||||
}
|
||||
|
||||
reqID, err = metaCache.reqIDAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ts, err = metaCache.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -60,20 +85,32 @@ func (smc *SimpleMetaCache) Update(collectionName string) error {
|
|||
MsgType: internalpb.MsgType_kDescribeCollection,
|
||||
ReqID: reqID,
|
||||
Timestamp: ts,
|
||||
ProxyID: smc.proxyID,
|
||||
ProxyID: metaCache.proxyID,
|
||||
CollectionName: &servicepb.CollectionName{
|
||||
CollectionName: collectionName,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := smc.masterClient.DescribeCollection(smc.ctx, req)
|
||||
resp, err := metaCache.masterClient.DescribeCollection(metaCache.ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
smc.mu.Lock()
|
||||
defer smc.mu.Unlock()
|
||||
smc.metas[collectionName] = resp
|
||||
metaCache.mu.Lock()
|
||||
defer metaCache.mu.Unlock()
|
||||
metaCache.metas[collectionName] = resp
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (metaCache *SimpleMetaCache) Remove(collectionName string) error {
|
||||
metaCache.mu.Lock()
|
||||
defer metaCache.mu.Unlock()
|
||||
|
||||
_, ok := metaCache.metas[collectionName]
|
||||
if !ok {
|
||||
return errors.New("cannot find collection: " + collectionName)
|
||||
}
|
||||
delete(metaCache.metas, collectionName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -291,7 +291,7 @@ func (dct *DropCollectionTask) Execute() error {
|
|||
}
|
||||
|
||||
func (dct *DropCollectionTask) PostExecute() error {
|
||||
return nil
|
||||
return globalMetaCache.Remove(dct.CollectionName.CollectionName)
|
||||
}
|
||||
|
||||
type QueryTask struct {
|
||||
|
@ -329,6 +329,18 @@ func (qt *QueryTask) SetTs(ts Timestamp) {
|
|||
}
|
||||
|
||||
func (qt *QueryTask) PreExecute() error {
|
||||
collectionName := qt.query.CollectionName
|
||||
if !globalMetaCache.Hit(collectionName) {
|
||||
err := globalMetaCache.Update(collectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err := globalMetaCache.Get(collectionName)
|
||||
if err != nil { // err is not nil if collection not exists
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ValidateCollectionName(qt.query.CollectionName); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -382,22 +394,37 @@ func (qt *QueryTask) PostExecute() error {
|
|||
log.Print("wait to finish failed, timeout!")
|
||||
return errors.New("wait to finish failed, timeout")
|
||||
case searchResults := <-qt.resultBuf:
|
||||
rlen := len(searchResults) // query num
|
||||
if rlen <= 0 {
|
||||
qt.result = &servicepb.QueryResult{}
|
||||
return nil
|
||||
filterSearchResult := make([]*internalpb.SearchResult, 0)
|
||||
var filterReason string
|
||||
for _, partialSearchResult := range searchResults {
|
||||
if partialSearchResult.Status.ErrorCode == commonpb.ErrorCode_SUCCESS {
|
||||
filterSearchResult = append(filterSearchResult, partialSearchResult)
|
||||
} else {
|
||||
filterReason += partialSearchResult.Status.Reason + "\n"
|
||||
}
|
||||
}
|
||||
|
||||
n := len(searchResults[0].Hits) // n
|
||||
rlen := len(filterSearchResult) // query node num
|
||||
if rlen <= 0 {
|
||||
qt.result = &servicepb.QueryResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: filterReason,
|
||||
},
|
||||
}
|
||||
return errors.New(filterReason)
|
||||
}
|
||||
|
||||
n := len(filterSearchResult[0].Hits) // n
|
||||
if n <= 0 {
|
||||
qt.result = &servicepb.QueryResult{}
|
||||
return nil
|
||||
}
|
||||
|
||||
hits := make([][]*servicepb.Hits, rlen)
|
||||
for i, searchResult := range searchResults {
|
||||
for i, partialSearchResult := range filterSearchResult {
|
||||
hits[i] = make([]*servicepb.Hits, n)
|
||||
for j, bs := range searchResult.Hits {
|
||||
for j, bs := range partialSearchResult.Hits {
|
||||
hits[i][j] = &servicepb.Hits{}
|
||||
err := proto.Unmarshal(bs, hits[i][j])
|
||||
if err != nil {
|
||||
|
@ -433,6 +460,17 @@ func (qt *QueryTask) PostExecute() error {
|
|||
}
|
||||
}
|
||||
choiceOffset := locs[choice]
|
||||
// check if distance is valid, `invalid` here means very very big,
|
||||
// in this process, distance here is the smallest, so the rest of distance are all invalid
|
||||
if hits[choice][i].Scores[choiceOffset] >= float32(math.MaxFloat32) {
|
||||
qt.result = &servicepb.QueryResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: "topk in dsl greater than the row nums of collection",
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
reducedHits.IDs = append(reducedHits.IDs, hits[choice][i].IDs[choiceOffset])
|
||||
if hits[choice][i].RowData != nil && len(hits[choice][i].RowData) > 0 {
|
||||
reducedHits.RowData = append(reducedHits.RowData, hits[choice][i].RowData[choiceOffset])
|
||||
|
|
|
@ -82,8 +82,17 @@ func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []strin
|
|||
func (gp *BaseTable) LoadYaml(fileName string) error {
|
||||
config := viper.New()
|
||||
_, fpath, _, _ := runtime.Caller(0)
|
||||
configPath := path.Dir(fpath) + "/../../../configs/"
|
||||
config.SetConfigFile(configPath + fileName)
|
||||
configFile := path.Dir(fpath) + "/../../../configs/" + fileName
|
||||
_, err := os.Stat(configFile)
|
||||
if os.IsNotExist(err) {
|
||||
runPath, err := os.Getwd()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
configFile = runPath + "/configs/" + fileName
|
||||
}
|
||||
|
||||
config.SetConfigFile(configFile)
|
||||
if err := config.ReadInConfig(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue