Use exception instead of runtime error

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-11-30 17:58:23 +08:00 committed by yefu.chen
parent 51a9f49d35
commit 0cd3e8d86c
17 changed files with 164 additions and 155 deletions

View File

@ -55,13 +55,13 @@ verifiers: cppcheck fmt lint ruleguard
# Builds various components locally.
build-go:
@echo "Building each component's binary to './bin'"
@echo "Building each component's binary to './'"
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/query_node.go 1>/dev/null
@echo "Building master ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/master $(PWD)/cmd/master/main.go 1>/dev/null
@echo "Building proxy ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="0" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null
@mkdir -p $(INSTALL_PATH) && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxy $(PWD)/cmd/proxy/proxy.go 1>/dev/null
build-cpp:
@(env bash $(PWD)/scripts/core_build.sh)

View File

@ -23,7 +23,7 @@ pipeline {
stage ('Build and UnitTest') {
agent {
kubernetes {
label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-build"
label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-build"
defaultContainer 'build-env'
customWorkspace '/home/jenkins/agent/workspace'
yamlFile "build/ci/jenkins/pod/build-env.yaml"
@ -46,7 +46,7 @@ pipeline {
stage ('Publish Docker Images') {
agent {
kubernetes {
label "${env.PROJECT_NAME}-${SEMVER}-${env.BUILD_NUMBER}-publish"
label "${env.PROJECT_NAME}-${env.BUILD_NUMBER}-publish"
defaultContainer 'publish-images'
yamlFile "build/ci/jenkins/pod/docker-pod.yaml"
}

View File

@ -11,6 +11,7 @@ services:
environment:
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
networks:
- milvus
ports:
@ -25,6 +26,7 @@ services:
- ${SOURCE_REPO}/proxy:${SOURCE_TAG}
environment:
PULSAR_ADDRESS: ${PULSAR_ADDRESS}
ETCD_ADDRESS: ${ETCD_ADDRESS}
MASTER_ADDRESS: ${MASTER_ADDRESS}
ports:
- "19530:19530"

View File

@ -9,10 +9,22 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
enum ErrorCode {
Success = 0,
UnexpectedException = 1,
};
typedef struct CStatus {
int error_code;
const char* error_msg;
} CStatus;
typedef void* CCollection;
CCollection

View File

@ -13,21 +13,52 @@
#include "query/Plan.h"
#include "segcore/Collection.h"
CPlan
CreatePlan(CCollection c_col, const char* dsl) {
CStatus
CreatePlan(CCollection c_col, const char* dsl, CPlan* res_plan) {
auto col = (milvus::segcore::Collection*)c_col;
auto res = milvus::query::CreatePlan(*col->get_schema(), dsl);
return (CPlan)res.release();
try {
auto res = milvus::query::CreatePlan(*col->get_schema(), dsl);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
auto plan = (CPlan)res.release();
*res_plan = plan;
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
*res_plan = nullptr;
return status;
}
}
CPlaceholderGroup
ParsePlaceholderGroup(CPlan c_plan, void* placeholder_group_blob, int64_t blob_size) {
CStatus
ParsePlaceholderGroup(CPlan c_plan,
void* placeholder_group_blob,
int64_t blob_size,
CPlaceholderGroup* res_placeholder_group) {
std::string blob_string((char*)placeholder_group_blob, (char*)placeholder_group_blob + blob_size);
auto plan = (milvus::query::Plan*)c_plan;
auto res = milvus::query::ParsePlaceholderGroup(plan, blob_string);
return (CPlaceholderGroup)res.release();
try {
auto res = milvus::query::ParsePlaceholderGroup(plan, blob_string);
auto status = CStatus();
status.error_code = Success;
status.error_msg = "";
auto group = (CPlaceholderGroup)res.release();
*res_placeholder_group = group;
return status;
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
*res_placeholder_group = nullptr;
return status;
}
}
int64_t

View File

@ -20,11 +20,14 @@ extern "C" {
typedef void* CPlan;
typedef void* CPlaceholderGroup;
CPlan
CreatePlan(CCollection col, const char* dsl);
CStatus
CreatePlan(CCollection col, const char* dsl, CPlan* res_plan);
CPlaceholderGroup
ParsePlaceholderGroup(CPlan plan, void* placeholder_group_blob, int64_t blob_size);
CStatus
ParsePlaceholderGroup(CPlan plan,
void* placeholder_group_blob,
int64_t blob_size,
CPlaceholderGroup* res_placeholder_group);
int64_t
GetNumOfQueries(CPlaceholderGroup placeholder_group);

View File

@ -71,7 +71,7 @@ Insert(CSegmentBase c_segment,
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::runtime_error& e) {
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
@ -103,7 +103,7 @@ Delete(
status.error_code = Success;
status.error_msg = "";
return status;
} catch (std::runtime_error& e) {
} catch (std::exception& e) {
auto status = CStatus();
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
@ -141,7 +141,7 @@ Search(CSegmentBase c_segment,
auto res = segment->Search(plan, placeholder_groups.data(), timestamps, num_groups, *query_result);
status.error_code = Success;
status.error_msg = "";
} catch (std::runtime_error& e) {
} catch (std::exception& e) {
status.error_code = UnexpectedException;
status.error_msg = strdup(e.what());
}

View File

@ -17,22 +17,11 @@ extern "C" {
#include <stdlib.h>
#include <stdint.h>
#include "segcore/collection_c.h"
#include "segcore/plan_c.h"
typedef void* CSegmentBase;
typedef void* CQueryResult;
enum ErrorCode {
Success = 0,
UnexpectedException = 1,
};
typedef struct CStatus {
int error_code;
const char* error_msg;
} CStatus;
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id);

View File

@ -14,7 +14,6 @@
#include <random>
#include <gtest/gtest.h>
#include "segcore/collection_c.h"
#include "pb/service_msg.pb.h"
#include "segcore/reduce_c.h"
@ -151,8 +150,15 @@ TEST(CApiTest, SearchTest) {
}
auto blob = raw_group.SerializeAsString();
auto plan = CreatePlan(collection, dsl_string);
auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
void *plan = nullptr;
auto status = CreatePlan(collection, dsl_string, &plan);
assert(status.error_code == Success);
void *placeholderGroup = nullptr;
status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup);
assert(status.error_code == Success);
std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup);
timestamps.clear();
@ -611,8 +617,15 @@ TEST(CApiTest, Reduce) {
}
auto blob = raw_group.SerializeAsString();
auto plan = CreatePlan(collection, dsl_string);
auto placeholderGroup = ParsePlaceholderGroup(plan, blob.data(), blob.length());
void *plan = nullptr;
auto status = CreatePlan(collection, dsl_string, &plan);
assert(status.error_code == Success);
void *placeholderGroup = nullptr;
status = ParsePlaceholderGroup(plan, blob.data(), blob.length(), &placeholderGroup);
assert(status.error_code == Success);
std::vector<CPlaceholderGroup> placeholderGroups;
placeholderGroups.push_back(placeholderGroup);
timestamps.clear();

View File

@ -11,14 +11,14 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)
type Cache interface {
type MetaCache interface {
Hit(collectionName string) bool
Get(collectionName string) (*servicepb.CollectionDescription, error)
Update(collectionName string) error
Remove(collectionName string) error
//Write(collectionName string, schema *servicepb.CollectionDescription) error
}
var globalMetaCache Cache
var globalMetaCache MetaCache
type SimpleMetaCache struct {
mu sync.RWMutex
@ -30,54 +30,29 @@ type SimpleMetaCache struct {
ctx context.Context
}
func (metaCache *SimpleMetaCache) Hit(collectionName string) bool {
metaCache.mu.RLock()
defer metaCache.mu.RUnlock()
_, ok := metaCache.metas[collectionName]
func (smc *SimpleMetaCache) Hit(collectionName string) bool {
smc.mu.RLock()
defer smc.mu.RUnlock()
_, ok := smc.metas[collectionName]
return ok
}
func (metaCache *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
metaCache.mu.RLock()
defer metaCache.mu.RUnlock()
schema, ok := metaCache.metas[collectionName]
func (smc *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
smc.mu.RLock()
defer smc.mu.RUnlock()
schema, ok := smc.metas[collectionName]
if !ok {
return nil, errors.New("collection meta miss")
}
return schema, nil
}
func (metaCache *SimpleMetaCache) Update(collectionName string) error {
reqID, err := metaCache.reqIDAllocator.AllocOne()
func (smc *SimpleMetaCache) Update(collectionName string) error {
reqID, err := smc.reqIDAllocator.AllocOne()
if err != nil {
return err
}
ts, err := metaCache.tsoAllocator.AllocOne()
if err != nil {
return err
}
hasCollectionReq := &internalpb.HasCollectionRequest{
MsgType: internalpb.MsgType_kHasCollection,
ReqID: reqID,
Timestamp: ts,
ProxyID: metaCache.proxyID,
CollectionName: &servicepb.CollectionName{
CollectionName: collectionName,
},
}
has, err := metaCache.masterClient.HasCollection(metaCache.ctx, hasCollectionReq)
if err != nil {
return err
}
if !has.Value {
return errors.New("collection " + collectionName + " not exists")
}
reqID, err = metaCache.reqIDAllocator.AllocOne()
if err != nil {
return err
}
ts, err = metaCache.tsoAllocator.AllocOne()
ts, err := smc.tsoAllocator.AllocOne()
if err != nil {
return err
}
@ -85,32 +60,20 @@ func (metaCache *SimpleMetaCache) Update(collectionName string) error {
MsgType: internalpb.MsgType_kDescribeCollection,
ReqID: reqID,
Timestamp: ts,
ProxyID: metaCache.proxyID,
ProxyID: smc.proxyID,
CollectionName: &servicepb.CollectionName{
CollectionName: collectionName,
},
}
resp, err := metaCache.masterClient.DescribeCollection(metaCache.ctx, req)
resp, err := smc.masterClient.DescribeCollection(smc.ctx, req)
if err != nil {
return err
}
metaCache.mu.Lock()
defer metaCache.mu.Unlock()
metaCache.metas[collectionName] = resp
return nil
}
func (metaCache *SimpleMetaCache) Remove(collectionName string) error {
metaCache.mu.Lock()
defer metaCache.mu.Unlock()
_, ok := metaCache.metas[collectionName]
if !ok {
return errors.New("cannot find collection: " + collectionName)
}
delete(metaCache.metas, collectionName)
smc.mu.Lock()
defer smc.mu.Unlock()
smc.metas[collectionName] = resp
return nil
}

View File

@ -291,7 +291,7 @@ func (dct *DropCollectionTask) Execute() error {
}
func (dct *DropCollectionTask) PostExecute() error {
return globalMetaCache.Remove(dct.CollectionName.CollectionName)
return nil
}
type QueryTask struct {
@ -329,18 +329,6 @@ func (qt *QueryTask) SetTs(ts Timestamp) {
}
func (qt *QueryTask) PreExecute() error {
collectionName := qt.query.CollectionName
if !globalMetaCache.Hit(collectionName) {
err := globalMetaCache.Update(collectionName)
if err != nil {
return err
}
}
_, err := globalMetaCache.Get(collectionName)
if err != nil { // err is not nil if collection not exists
return err
}
if err := ValidateCollectionName(qt.query.CollectionName); err != nil {
return err
}
@ -394,29 +382,22 @@ func (qt *QueryTask) PostExecute() error {
log.Print("wait to finish failed, timeout!")
return errors.New("wait to finish failed, timeout")
case searchResults := <-qt.resultBuf:
filterSearchResult := make([]*internalpb.SearchResult, 0)
for _, partialSearchResult := range searchResults {
if partialSearchResult.Status.ErrorCode == commonpb.ErrorCode_SUCCESS {
filterSearchResult = append(filterSearchResult, partialSearchResult)
}
}
rlen := len(filterSearchResult) // query num
rlen := len(searchResults) // query num
if rlen <= 0 {
qt.result = &servicepb.QueryResult{}
return nil
}
n := len(filterSearchResult[0].Hits) // n
n := len(searchResults[0].Hits) // n
if n <= 0 {
qt.result = &servicepb.QueryResult{}
return nil
}
hits := make([][]*servicepb.Hits, rlen)
for i, partialSearchResult := range filterSearchResult {
for i, searchResult := range searchResults {
hits[i] = make([]*servicepb.Hits, n)
for j, bs := range partialSearchResult.Hits {
for j, bs := range searchResult.Hits {
hits[i][j] = &servicepb.Hits{}
err := proto.Unmarshal(bs, hits[i][j])
if err != nil {
@ -452,17 +433,6 @@ func (qt *QueryTask) PostExecute() error {
}
}
choiceOffset := locs[choice]
// check if distance is valid, `invalid` here means very very big,
// in this process, distance here is the smallest, so the rest of distance are all invalid
if hits[choice][i].Scores[choiceOffset] >= float32(math.MaxFloat32) {
qt.result = &servicepb.QueryResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "topk in dsl greater than the row nums of collection",
},
}
return nil
}
reducedHits.IDs = append(reducedHits.IDs, hits[choice][i].IDs[choiceOffset])
if hits[choice][i].RowData != nil && len(hits[choice][i].RowData) > 0 {
reducedHits.RowData = append(reducedHits.RowData, hits[choice][i].RowData[choiceOffset])

View File

@ -10,6 +10,8 @@ package querynode
*/
import "C"
import (
"errors"
"strconv"
"unsafe"
)
@ -17,11 +19,21 @@ type Plan struct {
cPlan C.CPlan
}
func createPlan(col Collection, dsl string) *Plan {
func createPlan(col Collection, dsl string) (*Plan, error) {
cDsl := C.CString(dsl)
cPlan := C.CreatePlan(col.collectionPtr, cDsl)
var cPlan C.CPlan
status := C.CreatePlan(col.collectionPtr, cDsl, &cPlan)
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return nil, errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
var newPlan = &Plan{cPlan: cPlan}
return newPlan
return newPlan, nil
}
func (plan *Plan) getTopK() int64 {
@ -37,12 +49,22 @@ type PlaceholderGroup struct {
cPlaceholderGroup C.CPlaceholderGroup
}
func parserPlaceholderGroup(plan *Plan, placeHolderBlob []byte) *PlaceholderGroup {
func parserPlaceholderGroup(plan *Plan, placeHolderBlob []byte) (*PlaceholderGroup, error) {
var blobPtr = unsafe.Pointer(&placeHolderBlob[0])
blobSize := C.long(len(placeHolderBlob))
cPlaceholderGroup := C.ParsePlaceholderGroup(plan.cPlan, blobPtr, blobSize)
var cPlaceholderGroup C.CPlaceholderGroup
status := C.ParsePlaceholderGroup(plan.cPlan, blobPtr, blobSize, &cPlaceholderGroup)
errorCode := status.error_code
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return nil, errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
var newPlaceholderGroup = &PlaceholderGroup{cPlaceholderGroup: cPlaceholderGroup}
return newPlaceholderGroup
return newPlaceholderGroup, nil
}
func (pg *PlaceholderGroup) getNumOfQuery() int64 {

View File

@ -64,7 +64,8 @@ func TestPlan_Plan(t *testing.T) {
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
plan := createPlan(*collection, dslString)
plan, err := createPlan(*collection, dslString)
assert.NoError(t, err)
assert.NotEqual(t, plan, nil)
topk := plan.getTopK()
assert.Equal(t, int(topk), 10)
@ -122,7 +123,8 @@ func TestPlan_PlaceholderGroup(t *testing.T) {
dslString := "{\"bool\": { \n\"vector\": {\n \"vec\": {\n \"metric_type\": \"L2\", \n \"params\": {\n \"nprobe\": 10 \n},\n \"query\": \"$0\",\"topk\": 10 \n } \n } \n } \n }"
plan := createPlan(*collection, dslString)
plan, err := createPlan(*collection, dslString)
assert.NoError(t, err)
assert.NotNil(t, plan)
var searchRawData1 []byte
@ -151,7 +153,8 @@ func TestPlan_PlaceholderGroup(t *testing.T) {
placeGroupByte, err := proto.Marshal(&placeholderGroup)
assert.Nil(t, err)
holder := parserPlaceholderGroup(plan, placeGroupByte)
holder, err := parserPlaceholderGroup(plan, placeGroupByte)
assert.NoError(t, err)
assert.NotNil(t, holder)
numQueries := holder.getNumOfQuery()
assert.Equal(t, int(numQueries), 2)

View File

@ -99,8 +99,10 @@ func TestReduce_AllFunc(t *testing.T) {
log.Print("marshal placeholderGroup failed")
}
plan := createPlan(*collection, dslString)
holder := parserPlaceholderGroup(plan, placeGroupByte)
plan, err := createPlan(*collection, dslString)
assert.NoError(t, err)
holder, err := parserPlaceholderGroup(plan, placeGroupByte)
assert.NoError(t, err)
placeholderGroups := make([]*PlaceholderGroup, 0)
placeholderGroups = append(placeholderGroups, holder)

View File

@ -225,9 +225,15 @@ func (ss *searchService) search(msg msgstream.TsMsg) error {
}
collectionID := collection.ID()
dsl := query.Dsl
plan := createPlan(*collection, dsl)
plan, err := createPlan(*collection, dsl)
if err != nil {
return err
}
placeHolderGroupBlob := query.PlaceholderGroup
placeholderGroup := parserPlaceholderGroup(plan, placeHolderGroupBlob)
placeholderGroup, err := parserPlaceholderGroup(plan, placeHolderGroupBlob)
if err != nil {
return err
}
placeholderGroups := make([]*PlaceholderGroup, 0)
placeholderGroups = append(placeholderGroups, placeholderGroup)

View File

@ -690,8 +690,10 @@ func TestSegment_segmentSearch(t *testing.T) {
}
searchTimestamp := Timestamp(1020)
plan := createPlan(*collection, dslString)
holder := parserPlaceholderGroup(plan, placeHolderGroupBlob)
plan, err := createPlan(*collection, dslString)
assert.NoError(t, err)
holder, err := parserPlaceholderGroup(plan, placeHolderGroupBlob)
assert.NoError(t, err)
placeholderGroups := make([]*PlaceholderGroup, 0)
placeholderGroups = append(placeholderGroups, holder)

View File

@ -82,17 +82,8 @@ func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []strin
func (gp *BaseTable) LoadYaml(fileName string) error {
config := viper.New()
_, fpath, _, _ := runtime.Caller(0)
configFile := path.Dir(fpath) + "/../../../configs/" + fileName
_, err := os.Stat(configFile)
if os.IsNotExist(err) {
runPath, err := os.Getwd()
if err != nil {
panic(err)
}
configFile = runPath + "/configs/" + fileName
}
config.SetConfigFile(configFile)
configPath := path.Dir(fpath) + "/../../../configs/"
config.SetConfigFile(configPath + fileName)
if err := config.ReadInConfig(); err != nil {
panic(err)
}