mirror of https://github.com/milvus-io/milvus.git
fix: jsonstats check if cache schema is nil lazy describecollection (#41075)
fix: jsonstats check if cache schema is nil lazy describecollection pr:https://github.com/milvus-io/milvus/pull/38039 pr:https://github.com/milvus-io/milvus/pull/41068 issue:https://github.com/milvus-io/milvus/issues/36995 --------- Signed-off-by: chyezh <chyezh@outlook.com> Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com> Co-authored-by: Zhen Ye <chyezh@outlook.com>pull/41076/head
parent
21069e2a8c
commit
a360a07919
|
@ -879,7 +879,7 @@ LoadArrowReaderFromRemote(const std::vector<std::string>& remote_files,
|
|||
futures;
|
||||
futures.reserve(remote_files.size());
|
||||
for (const auto& file : remote_files) {
|
||||
auto future = pool.Submit([&]() {
|
||||
auto future = pool.Submit([rcm, file]() {
|
||||
auto fileSize = rcm->Size(file);
|
||||
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
|
||||
rcm->Read(file, buf.get(), fileSize);
|
||||
|
|
|
@ -97,6 +97,13 @@ func (c *IndexChecker) Check(ctx context.Context) []task.Task {
|
|||
log.Warn("collection released during check index", zap.Int64("collection", collectionID))
|
||||
continue
|
||||
}
|
||||
if schema == nil && paramtable.Get().CommonCfg.EnabledJSONKeyStats.GetAsBool() {
|
||||
collectionSchema, err1 := c.broker.DescribeCollection(ctx, collectionID)
|
||||
if err1 == nil {
|
||||
schema = collectionSchema.GetSchema()
|
||||
c.meta.PutCollectionSchema(ctx, collectionID, collectionSchema.GetSchema())
|
||||
}
|
||||
}
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
|
||||
for _, replica := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, collection, replica, indexInfos, schema)...)
|
||||
|
|
|
@ -266,6 +266,17 @@ func (m *CollectionManager) GetCollectionSchema(ctx context.Context, collectionI
|
|||
return collection.Schema
|
||||
}
|
||||
|
||||
func (m *CollectionManager) PutCollectionSchema(ctx context.Context, collectionID typeutil.UniqueID, schema *schemapb.CollectionSchema) {
|
||||
m.rwmutex.Lock()
|
||||
defer m.rwmutex.Unlock()
|
||||
|
||||
collection, ok := m.collections[collectionID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
collection.Schema = schema
|
||||
}
|
||||
|
||||
func (m *CollectionManager) GetPartition(ctx context.Context, partitionID typeutil.UniqueID) *Partition {
|
||||
m.rwmutex.RLock()
|
||||
defer m.rwmutex.RUnlock()
|
||||
|
|
Loading…
Reference in New Issue