mirror of https://github.com/milvus-io/milvus.git
/kind improvement Signed-off-by: MrPresent-Han <chun.han@zilliz.com>pull/26750/head
parent
5f08e3a72b
commit
8330c18dc9
|
@ -188,18 +188,23 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
|
|||
auto parallel_degree = static_cast<uint64_t>(
|
||||
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
|
||||
field_data_info.channel->set_capacity(parallel_degree * 2);
|
||||
|
||||
auto& pool =
|
||||
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
|
||||
auto load_future = pool.Submit(
|
||||
LoadFieldDatasFromRemote, insert_files, field_data_info.channel);
|
||||
|
||||
LOG_SEGCORE_INFO_ << "finish submitting LoadFieldDatasFromRemote task "
|
||||
"to thread pool, "
|
||||
<< "segmentID:" << this->id_
|
||||
<< ", fieldID:" << info.field_id;
|
||||
if (load_info.mmap_dir_path.empty() ||
|
||||
SystemProperty::Instance().IsSystem(field_id)) {
|
||||
LoadFieldData(field_id, field_data_info);
|
||||
} else {
|
||||
MapFieldData(field_id, field_data_info);
|
||||
}
|
||||
LOG_SEGCORE_INFO_ << "finish loading segment field, "
|
||||
<< "segmentID:" << this->id_
|
||||
<< ", fieldID:" << info.field_id;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -706,7 +706,10 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
|
|||
zap.Int64("collectionID", s.Collection()),
|
||||
zap.Int64("partitionID", s.Partition()),
|
||||
zap.Int64("segmentID", s.ID()),
|
||||
zap.Int64("fieldID", fieldID),
|
||||
zap.Int64("rowCount", rowCount),
|
||||
)
|
||||
log.Info("start loading field data for field")
|
||||
|
||||
loadFieldDataInfo, err := newLoadFieldDataInfo()
|
||||
defer deleteFieldDataInfo(loadFieldDataInfo)
|
||||
|
@ -729,6 +732,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
|
|||
|
||||
var status C.CStatus
|
||||
GetDynamicPool().Submit(func() (any, error) {
|
||||
log.Info("submitted loadFieldData task to dy pool")
|
||||
status = C.LoadFieldData(s.ptr, loadFieldDataInfo.cLoadFieldDataInfo)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
|
@ -736,10 +740,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap
|
|||
return err
|
||||
}
|
||||
|
||||
log.Info("load field done",
|
||||
zap.Int64("fieldID", fieldID),
|
||||
zap.Int64("row count", rowCount),
|
||||
zap.Int64("segmentID", s.ID()))
|
||||
log.Info("load field done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -584,7 +584,7 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen
|
|||
return err
|
||||
}
|
||||
|
||||
log.Info("load field binlogs done for sealed segment",
|
||||
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
|
||||
zap.Int64("collection", segment.collectionID),
|
||||
zap.Int64("segment", segment.segmentID),
|
||||
zap.Int("len(field)", len(fields)),
|
||||
|
|
|
@ -216,6 +216,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.String("channel", channel.GetChannelName()),
|
||||
zap.Int64("currentNodeID", paramtable.GetNodeID()),
|
||||
)
|
||||
|
||||
log.Info("received watch channel request",
|
||||
|
@ -360,6 +361,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
|||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.String("channel", req.GetChannelName()),
|
||||
zap.Int64("currentNodeID", paramtable.GetNodeID()),
|
||||
)
|
||||
|
||||
log.Info("received unsubscribe channel request")
|
||||
|
@ -433,6 +435,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
|||
zap.Int64("partitionID", segment.GetPartitionID()),
|
||||
zap.String("shard", segment.GetInsertChannel()),
|
||||
zap.Int64("segmentID", segment.GetSegmentID()),
|
||||
zap.Int64("currentNodeID", paramtable.GetNodeID()),
|
||||
)
|
||||
|
||||
log.Info("received load segments request",
|
||||
|
@ -552,6 +555,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
|
|||
zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.String("shard", req.GetShard()),
|
||||
zap.Int64s("segmentIDs", req.GetSegmentIDs()),
|
||||
zap.Int64("currentNodeID", paramtable.GetNodeID()),
|
||||
)
|
||||
|
||||
log.Info("received release segment request",
|
||||
|
@ -1244,7 +1248,8 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
|
|||
}
|
||||
|
||||
func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
|
||||
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
|
||||
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()),
|
||||
zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", paramtable.GetNodeID()))
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
|
@ -1266,7 +1271,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||
// get shard delegator
|
||||
shardDelegator, ok := node.delegators.Get(req.GetChannel())
|
||||
if !ok {
|
||||
log.Warn("failed to find shard cluster when sync ", zap.String("channel", req.GetChannel()))
|
||||
log.Warn("failed to find shard cluster when sync")
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "shard not exist",
|
||||
|
|
Loading…
Reference in New Issue