enhance: find collection schema from cache (#28782)

issue: #28781 #28329

1. There is no need to call `DescribeCollection`, if the collection's
schema is found in the globalMetaCache
2. did `GetProperties` to check the access to Azure Blob Service while
construct the ChunkManager

Signed-off-by: PowderLi <min.li@zilliz.com>
pull/28900/head
PowderLi 2023-12-03 19:22:33 +08:00 committed by GitHub
parent 342635ed61
commit 20fc90c591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 115 additions and 117 deletions

View File

@ -71,14 +71,23 @@ AzureChunkManager::AzureChunkManager(const StorageConfig& storage_config)
azure::AzureBlobChunkManager::InitLog(storage_config.log_level,
AzureLogger);
}
client_ = std::make_shared<azure::AzureBlobChunkManager>(
storage_config.access_key_id,
storage_config.access_key_value,
storage_config.address,
storage_config.requestTimeoutMs == 0
? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS
: storage_config.requestTimeoutMs,
storage_config.useIAM);
try {
client_ = std::make_shared<azure::AzureBlobChunkManager>(
storage_config.access_key_id,
storage_config.access_key_value,
storage_config.address,
storage_config.requestTimeoutMs == 0
? DEFAULT_CHUNK_MANAGER_REQUEST_TIMEOUT_MS
: storage_config.requestTimeoutMs,
storage_config.useIAM);
} catch (std::exception& err) {
ThrowAzureError(
"PreCheck",
err,
"precheck chunk manager client failed, error:{}, configuration:{}",
err.what(),
storage_config.ToString());
}
}
AzureChunkManager::~AzureChunkManager() {
@ -106,14 +115,7 @@ AzureChunkManager::ListWithPrefix(const std::string& filepath) {
uint64_t
AzureChunkManager::Read(const std::string& filepath, void* buf, uint64_t size) {
try {
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
} catch (const std::exception& e) {
std::stringstream err_msg;
err_msg << "read object('" << default_bucket_name_ << "', '" << filepath
<< "' fail: " << e.what();
throw SegcoreError(ObjectNotExist, err_msg.str());
}
return GetObjectBuffer(default_bucket_name_, filepath, buf, size);
}
void

View File

@ -79,6 +79,7 @@ AzureBlobChunkManager::AzureBlobChunkManager(
CreateFromConnectionString(GetConnectionString(
access_key_id, access_key_value, address)));
}
client_->GetProperties();
}
AzureBlobChunkManager::~AzureBlobChunkManager() {

View File

@ -22,7 +22,7 @@ using namespace milvus;
using namespace milvus::storage;
StorageConfig
get_default_storage_config() {
get_default_storage_config(bool useIam) {
auto endpoint = "core.windows.net";
auto accessKey = "devstoreaccount1";
auto accessValue =
@ -30,7 +30,6 @@ get_default_storage_config() {
"K1SZFPTOtr/KBHBeksoGMGw==";
auto rootPath = "files";
auto useSSL = false;
auto useIam = false;
auto iamEndPoint = "";
auto bucketName = "a-bucket";
@ -57,7 +56,7 @@ class AzureChunkManagerTest : public testing::Test {
virtual void
SetUp() {
configs_ = get_default_storage_config();
configs_ = get_default_storage_config(false);
chunk_manager_ = make_unique<AzureChunkManager>(configs_);
chunk_manager_ptr_ = CreateChunkManager(configs_);
}
@ -68,6 +67,18 @@ class AzureChunkManagerTest : public testing::Test {
StorageConfig configs_;
};
TEST_F(AzureChunkManagerTest, WrongConfig) {
StorageConfig configs = get_default_storage_config(true);
try {
AzureChunkManagerPtr chunk_manager =
make_unique<AzureChunkManager>(configs);
EXPECT_TRUE(false);
} catch (SegcoreError& e) {
EXPECT_TRUE(std::string(e.what()).find("precheck") != string::npos);
}
}
TEST_F(AzureChunkManagerTest, AzureLogger) {
AzureLogger(Azure::Core::Diagnostics::Logger::Level::Error, "");
AzureLogger(Azure::Core::Diagnostics::Logger::Level::Warning, "");

View File

@ -112,8 +112,7 @@ TEST_F(RemoteChunkManagerTest, BucketNegtive) {
try {
aws_chunk_manager_->CreateBucket(testBucketName);
} catch (SegcoreError& e) {
EXPECT_TRUE(std::string(e.what()).find("exists") !=
string::npos);
EXPECT_TRUE(std::string(e.what()).find("exists") != string::npos);
}
aws_chunk_manager_->DeleteBucket(testBucketName);
}
@ -138,9 +137,9 @@ TEST_F(RemoteChunkManagerTest, WritePositive) {
aws_chunk_manager_->SetBucketName(testBucketName);
EXPECT_EQ(aws_chunk_manager_->GetBucketName(), testBucketName);
if (!aws_chunk_manager_->BucketExists(testBucketName)) {
aws_chunk_manager_->CreateBucket(testBucketName);
}
if (!aws_chunk_manager_->BucketExists(testBucketName)) {
aws_chunk_manager_->CreateBucket(testBucketName);
}
uint8_t data[5] = {0x17, 0x32, 0x45, 0x34, 0x23};
string path = "1";
aws_chunk_manager_->Write(path, data, sizeof(data));

View File

@ -46,6 +46,9 @@ func (h *Handlers) checkDatabase(ctx context.Context, c *gin.Context, dbName str
if dbName == DefaultDbName {
return nil
}
if proxy.CheckDatabase(ctx, dbName) {
return nil
}
response, err := h.proxy.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{})
if err == nil {
err = merr.Error(response.GetStatus())
@ -66,7 +69,11 @@ func (h *Handlers) checkDatabase(ctx context.Context, c *gin.Context, dbName str
return RestRequestInterceptorErr
}
func (h *Handlers) describeCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
func (h *Handlers) describeCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (*schemapb.CollectionSchema, error) {
collSchema, err := proxy.GetCachedCollectionSchema(ctx, dbName, collectionName)
if err == nil {
return collSchema, nil
}
req := milvuspb.DescribeCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
@ -84,7 +91,7 @@ func (h *Handlers) describeCollection(ctx context.Context, c *gin.Context, dbNam
log.Warn("primary filed autoID VS schema autoID", zap.String("collectionName", collectionName), zap.Bool("primary Field", primaryField.AutoID), zap.Bool("schema", response.Schema.AutoID))
response.Schema.AutoID = EnableAutoID
}
return response, nil
return response.Schema, nil
}
func (h *Handlers) hasCollection(ctx context.Context, c *gin.Context, dbName string, collectionName string) (bool, error) {
@ -541,12 +548,12 @@ func (h *Handlers) get(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
coll, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || coll == nil {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(coll.Schema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
c.JSON(http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
@ -609,15 +616,15 @@ func (h *Handlers) delete(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
coll, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || coll == nil {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
deleteReq := req.(*milvuspb.DeleteRequest)
deleteReq.Expr = httpReq.Filter
if deleteReq.Expr == "" {
body, _ := c.Get(gin.BodyBytesKey)
filter, err := checkGetPrimaryKey(coll.Schema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
filter, err := checkGetPrimaryKey(collSchema, gjson.Get(string(body.([]byte)), DefaultPrimaryFieldName))
if err != nil {
c.JSON(http.StatusOK, gin.H{
HTTPReturnCode: merr.Code(merr.ErrCheckPrimaryKey),
@ -679,12 +686,12 @@ func (h *Handlers) insert(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
coll, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || coll == nil {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), coll)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("body", body), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
@ -694,7 +701,7 @@ func (h *Handlers) insert(c *gin.Context) {
return nil, RestRequestInterceptorErr
}
insertReq := req.(*milvuspb.InsertRequest)
insertReq.FieldsData, err = anyToColumns(httpReq.Data, coll.Schema)
insertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with insert data", zap.Any("data", httpReq.Data), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
@ -771,17 +778,17 @@ func (h *Handlers) upsert(c *gin.Context) {
username, _ := c.Get(ContextUsername)
ctx := proxy.NewContextWithMetadata(c, username.(string), req.DbName)
response, err := h.executeRestRequestInterceptor(ctx, c, req, func(reqCtx context.Context, req any) (any, error) {
coll, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || coll == nil {
collSchema, err := h.describeCollection(ctx, c, httpReq.DbName, httpReq.CollectionName)
if err != nil || collSchema == nil {
return nil, RestRequestInterceptorErr
}
if coll.Schema.AutoID {
if collSchema.AutoID {
err := merr.WrapErrParameterInvalid("autoID: false", "autoID: true", "cannot upsert an autoID collection")
c.AbortWithStatusJSON(http.StatusOK, gin.H{HTTPReturnCode: merr.Code(err), HTTPReturnMessage: err.Error()})
return nil, RestRequestInterceptorErr
}
body, _ := c.Get(gin.BodyBytesKey)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), coll)
err, httpReq.Data = checkAndSetData(string(body.([]byte)), collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("body", body), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{
@ -791,7 +798,7 @@ func (h *Handlers) upsert(c *gin.Context) {
return nil, RestRequestInterceptorErr
}
upsertReq := req.(*milvuspb.UpsertRequest)
upsertReq.FieldsData, err = anyToColumns(httpReq.Data, coll.Schema)
upsertReq.FieldsData, err = anyToColumns(httpReq.Data, collSchema)
if err != nil {
log.Warn("high level restful api, fail to deal with upsert data", zap.Any("data", httpReq.Data), zap.Error(err))
c.AbortWithStatusJSON(http.StatusOK, gin.H{

View File

@ -1726,73 +1726,6 @@ func wrapWithDescribeIndex(t *testing.T, mp *mocks.MockProxy, returnType int, ti
return mp, testCases
}
func getCollectionSchema(collectionName string) *schemapb.CollectionSchema {
sch := &schemapb.CollectionSchema{
Name: collectionName,
AutoID: false,
}
sch.Fields = getFieldSchema()
return sch
}
func getFieldSchema() []*schemapb.FieldSchema {
fields := []*schemapb.FieldSchema{
{
FieldID: 0,
Name: "RowID",
Description: "RowID field",
DataType: schemapb.DataType_Int64,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "f0_tk1",
Value: "f0_tv1",
},
},
},
{
FieldID: 1,
Name: "Timestamp",
Description: "Timestamp field",
DataType: schemapb.DataType_Int64,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "f1_tk1",
Value: "f1_tv1",
},
},
},
{
FieldID: 100,
Name: "float_vector_field",
Description: "field 100",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "2",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "indexkey",
Value: "indexvalue",
},
},
},
{
FieldID: 101,
Name: "int64_field",
Description: "field 106",
DataType: schemapb.DataType_Int64,
TypeParams: []*commonpb.KeyValuePair{},
IndexParams: []*commonpb.KeyValuePair{},
IsPrimaryKey: true,
},
}
return fields
}
func TestInterceptor(t *testing.T) {
h := Handlers{}
v := atomic.NewInt32(0)

View File

@ -172,7 +172,7 @@ func printIndexes(indexes []*milvuspb.IndexDescription) []gin.H {
// --------------------- insert param --------------------- //
func checkAndSetData(body string, collDescResp *milvuspb.DescribeCollectionResponse) (error, []map[string]interface{}) {
func checkAndSetData(body string, collSchema *schemapb.CollectionSchema) (error, []map[string]interface{}) {
var reallyDataArray []map[string]interface{}
dataResult := gjson.Get(body, "data")
dataResultArray := dataResult.Array()
@ -181,7 +181,7 @@ func checkAndSetData(body string, collDescResp *milvuspb.DescribeCollectionRespo
}
var fieldNames []string
for _, field := range collDescResp.Schema.Fields {
for _, field := range collSchema.Fields {
fieldNames = append(fieldNames, field.Name)
}
@ -190,13 +190,13 @@ func checkAndSetData(body string, collDescResp *milvuspb.DescribeCollectionRespo
var vectorArray []float32
var binaryArray []byte
if data.Type == gjson.JSON {
for _, field := range collDescResp.Schema.Fields {
for _, field := range collSchema.Fields {
fieldType := field.DataType
fieldName := field.Name
dataString := gjson.Get(data.Raw, fieldName).String()
if field.IsPrimaryKey && collDescResp.Schema.AutoID {
if field.IsPrimaryKey && collSchema.AutoID {
if dataString != "" {
return merr.WrapErrParameterInvalid("", "set primary key but autoID == true"), reallyDataArray
}
@ -268,7 +268,7 @@ func checkAndSetData(body string, collDescResp *milvuspb.DescribeCollectionRespo
}
// fill dynamic schema
if collDescResp.Schema.EnableDynamicField {
if collSchema.EnableDynamicField {
for mapKey, mapValue := range data.Map() {
if !containsString(fieldNames, mapKey) {
mapValueStr := mapValue.String()

View File

@ -350,10 +350,7 @@ func TestInsertWithDynamicFields(t *testing.T) {
req := InsertReq{}
coll := generateCollectionSchema(schemapb.DataType_Int64, false)
var err error
err, req.Data = checkAndSetData(body, &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Schema: coll,
})
err, req.Data = checkAndSetData(body, coll)
assert.Equal(t, nil, err)
assert.Equal(t, int64(0), req.Data[0]["id"])
assert.Equal(t, int64(1), req.Data[0]["book_id"])

View File

@ -84,6 +84,7 @@ type Cache interface {
InitPolicyInfo(info []string, userRoles []string)
RemoveDatabase(ctx context.Context, database string)
HasDatabase(ctx context.Context, database string) bool
}
type collectionBasicInfo struct {
@ -991,3 +992,8 @@ func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) {
defer m.mu.Unlock()
delete(m.collInfo, database)
}
func (m *MetaCache) HasDatabase(ctx context.Context, database string) bool {
_, ok := m.collInfo[database]
return ok
}

View File

@ -892,3 +892,26 @@ func TestGlobalMetaCache_ShuffleShardLeaders(t *testing.T) {
assert.Len(t, result["channel-1"], 3)
assert.Equal(t, int64(3), result["channel-1"][0].nodeID)
}
func TestMetaCache_Database(t *testing.T) {
ctx := context.Background()
rootCoord := &MockRootCoordClientInterface{}
queryCoord := &mocks.MockQueryCoordClient{}
shardMgr := newShardClientMgr()
err := InitMetaCache(ctx, rootCoord, queryCoord, shardMgr)
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), false)
queryCoord.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
Status: merr.Success(),
CollectionIDs: []UniqueID{1, 2},
InMemoryPercentages: []int64{100, 50},
}, nil)
_, err = globalMetaCache.GetCollectionInfo(ctx, dbName, "collection1", 1)
assert.NoError(t, err)
_, err = GetCachedCollectionSchema(ctx, dbName, "collection1")
assert.NoError(t, err)
assert.Equal(t, globalMetaCache.HasDatabase(ctx, dbName), true)
assert.Equal(t, CheckDatabase(ctx, dbName), true)
}

View File

@ -844,6 +844,10 @@ func (_m *MockCache) RemoveDatabase(ctx context.Context, database string) {
_m.Called(ctx, database)
}
func (_m *MockCache) HasDatabase(ctx context.Context, database string) bool {
return true
}
// MockCache_RemoveDatabase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveDatabase'
type MockCache_RemoveDatabase_Call struct {
*mock.Call

View File

@ -46,6 +46,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -1583,3 +1584,17 @@ func SendReplicateMessagePack(ctx context.Context, replicateMsgStream msgstream.
log.Warn("send replicate msg failed", zap.Any("pack", msgPack), zap.Error(msgErr))
}
}
func GetCachedCollectionSchema(ctx context.Context, dbName string, colName string) (*schemapb.CollectionSchema, error) {
if globalMetaCache != nil {
return globalMetaCache.GetCollectionSchema(ctx, dbName, colName)
}
return nil, merr.WrapErrServiceNotReady(paramtable.GetRole(), paramtable.GetNodeID(), "initialization")
}
func CheckDatabase(ctx context.Context, dbName string) bool {
if globalMetaCache != nil {
return globalMetaCache.HasDatabase(ctx, dbName)
}
return false
}