mirror of https://github.com/milvus-io/milvus.git
350 lines
10 KiB
Go
350 lines
10 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package importv2
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/samber/lo"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
|
"github.com/milvus-io/milvus/internal/storage"
|
|
"github.com/milvus-io/milvus/internal/util/importutilv2"
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
"github.com/milvus-io/milvus/pkg/util/conc"
|
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
)
|
|
|
|
type Scheduler interface {
|
|
Start()
|
|
Slots() int64
|
|
Close()
|
|
}
|
|
|
|
type scheduler struct {
|
|
manager TaskManager
|
|
syncMgr syncmgr.SyncManager
|
|
cm storage.ChunkManager
|
|
|
|
pool *conc.Pool[any]
|
|
|
|
closeOnce sync.Once
|
|
closeChan chan struct{}
|
|
}
|
|
|
|
func NewScheduler(manager TaskManager, syncMgr syncmgr.SyncManager, cm storage.ChunkManager) Scheduler {
|
|
pool := conc.NewPool[any](
|
|
paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt(),
|
|
conc.WithPreAlloc(true),
|
|
)
|
|
return &scheduler{
|
|
manager: manager,
|
|
syncMgr: syncMgr,
|
|
cm: cm,
|
|
pool: pool,
|
|
closeChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) Start() {
|
|
log.Info("start import scheduler")
|
|
var (
|
|
exeTicker = time.NewTicker(1 * time.Second)
|
|
logTicker = time.NewTicker(10 * time.Minute)
|
|
)
|
|
defer exeTicker.Stop()
|
|
defer logTicker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
log.Info("import scheduler exited")
|
|
return
|
|
case <-exeTicker.C:
|
|
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending))
|
|
futures := make(map[int64][]*conc.Future[any])
|
|
for _, task := range tasks {
|
|
switch task.GetType() {
|
|
case PreImportTaskType:
|
|
fs := s.PreImport(task)
|
|
futures[task.GetTaskID()] = fs
|
|
tryFreeFutures(futures)
|
|
case ImportTaskType:
|
|
fs := s.Import(task)
|
|
futures[task.GetTaskID()] = fs
|
|
tryFreeFutures(futures)
|
|
}
|
|
}
|
|
for taskID, fs := range futures {
|
|
err := conc.AwaitAll(fs...)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
s.manager.Update(taskID, UpdateState(datapb.ImportTaskStateV2_Completed))
|
|
log.Info("preimport/import done", zap.Int64("taskID", taskID))
|
|
}
|
|
case <-logTicker.C:
|
|
LogStats(s.manager)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) Slots() int64 {
|
|
tasks := s.manager.GetBy(WithStates(datapb.ImportTaskStateV2_Pending, datapb.ImportTaskStateV2_InProgress))
|
|
return paramtable.Get().DataNodeCfg.MaxConcurrentImportTaskNum.GetAsInt64() - int64(len(tasks))
|
|
}
|
|
|
|
func (s *scheduler) Close() {
|
|
s.closeOnce.Do(func() {
|
|
close(s.closeChan)
|
|
})
|
|
}
|
|
|
|
func WrapLogFields(task Task, fields ...zap.Field) []zap.Field {
|
|
res := []zap.Field{
|
|
zap.Int64("taskID", task.GetTaskID()),
|
|
zap.Int64("jobID", task.GetJobID()),
|
|
zap.Int64("collectionID", task.GetCollectionID()),
|
|
zap.String("type", task.GetType().String()),
|
|
}
|
|
res = append(res, fields...)
|
|
return res
|
|
}
|
|
|
|
func tryFreeFutures(futures map[int64][]*conc.Future[any]) {
|
|
for k, fs := range futures {
|
|
fs = lo.Filter(fs, func(f *conc.Future[any], _ int) bool {
|
|
if f.Done() {
|
|
_, err := f.Await()
|
|
return err != nil
|
|
}
|
|
return true
|
|
})
|
|
futures[k] = fs
|
|
}
|
|
}
|
|
|
|
func (s *scheduler) handleErr(task Task, err error, msg string) {
|
|
log.Warn(msg, WrapLogFields(task, zap.Error(err))...)
|
|
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(err.Error()))
|
|
}
|
|
|
|
func (s *scheduler) PreImport(task Task) []*conc.Future[any] {
|
|
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
|
log.Info("start to preimport", WrapLogFields(task,
|
|
zap.Int("bufferSize", bufferSize),
|
|
zap.Any("schema", task.GetSchema()))...)
|
|
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
|
files := lo.Map(task.(*PreImportTask).GetFileStats(),
|
|
func(fileStat *datapb.ImportFileStats, _ int) *internalpb.ImportFile {
|
|
return fileStat.GetImportFile()
|
|
})
|
|
|
|
fn := func(i int, file *internalpb.ImportFile) error {
|
|
reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
|
|
if err != nil {
|
|
s.handleErr(task, err, "new reader failed")
|
|
return err
|
|
}
|
|
defer reader.Close()
|
|
start := time.Now()
|
|
err = s.readFileStat(reader, task, i)
|
|
if err != nil {
|
|
s.handleErr(task, err, "preimport failed")
|
|
return err
|
|
}
|
|
log.Info("read file stat done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
|
|
zap.Duration("dur", time.Since(start)))...)
|
|
return nil
|
|
}
|
|
|
|
futures := make([]*conc.Future[any], 0, len(files))
|
|
for i, file := range files {
|
|
i := i
|
|
file := file
|
|
f := s.pool.Submit(func() (any, error) {
|
|
err := fn(i, file)
|
|
return err, err
|
|
})
|
|
futures = append(futures, f)
|
|
}
|
|
return futures
|
|
}
|
|
|
|
func (s *scheduler) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
|
|
fileSize, err := reader.Size()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
|
|
if fileSize > int64(maxSize) {
|
|
return errors.New(fmt.Sprintf(
|
|
"The import file size has reached the maximum limit allowed for importing, "+
|
|
"fileSize=%d, maxSize=%d", fileSize, int64(maxSize)))
|
|
}
|
|
|
|
totalRows := 0
|
|
totalSize := 0
|
|
hashedStats := make(map[string]*datapb.PartitionImportStats)
|
|
for {
|
|
data, err := reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
err = CheckRowsEqual(task.GetSchema(), data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rowsCount, err := GetRowsStats(task, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
MergeHashedStats(rowsCount, hashedStats)
|
|
rows := data.GetRowNum()
|
|
size := data.GetMemorySize()
|
|
totalRows += rows
|
|
totalSize += size
|
|
log.Info("reading file stat...", WrapLogFields(task, zap.Int("readRows", rows), zap.Int("readSize", size))...)
|
|
}
|
|
|
|
stat := &datapb.ImportFileStats{
|
|
FileSize: fileSize,
|
|
TotalRows: int64(totalRows),
|
|
TotalMemorySize: int64(totalSize),
|
|
HashedStats: hashedStats,
|
|
}
|
|
s.manager.Update(task.GetTaskID(), UpdateFileStat(fileIdx, stat))
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) Import(task Task) []*conc.Future[any] {
|
|
bufferSize := paramtable.Get().DataNodeCfg.ReadBufferSizeInMB.GetAsInt() * 1024 * 1024
|
|
log.Info("start to import", WrapLogFields(task,
|
|
zap.Int("bufferSize", bufferSize),
|
|
zap.Any("schema", task.GetSchema()))...)
|
|
s.manager.Update(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_InProgress))
|
|
|
|
req := task.(*ImportTask).req
|
|
|
|
fn := func(file *internalpb.ImportFile) error {
|
|
reader, err := importutilv2.NewReader(task.GetCtx(), s.cm, task.GetSchema(), file, task.GetOptions(), bufferSize)
|
|
if err != nil {
|
|
s.handleErr(task, err, fmt.Sprintf("new reader failed, file: %s", file.String()))
|
|
return err
|
|
}
|
|
defer reader.Close()
|
|
start := time.Now()
|
|
err = s.importFile(reader, task)
|
|
if err != nil {
|
|
s.handleErr(task, err, fmt.Sprintf("do import failed, file: %s", file.String()))
|
|
return err
|
|
}
|
|
log.Info("import file done", WrapLogFields(task, zap.Strings("files", file.GetPaths()),
|
|
zap.Duration("dur", time.Since(start)))...)
|
|
return nil
|
|
}
|
|
|
|
futures := make([]*conc.Future[any], 0, len(req.GetFiles()))
|
|
for _, file := range req.GetFiles() {
|
|
file := file
|
|
f := s.pool.Submit(func() (any, error) {
|
|
err := fn(file)
|
|
return err, err
|
|
})
|
|
futures = append(futures, f)
|
|
}
|
|
return futures
|
|
}
|
|
|
|
func (s *scheduler) importFile(reader importutilv2.Reader, task Task) error {
|
|
iTask := task.(*ImportTask)
|
|
syncFutures := make([]*conc.Future[struct{}], 0)
|
|
syncTasks := make([]syncmgr.Task, 0)
|
|
for {
|
|
data, err := reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
err = AppendSystemFieldsData(iTask, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
hashedData, err := HashData(iTask, data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fs, sts, err := s.Sync(iTask, hashedData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
syncFutures = append(syncFutures, fs...)
|
|
syncTasks = append(syncTasks, sts...)
|
|
}
|
|
err := conc.AwaitAll(syncFutures...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, syncTask := range syncTasks {
|
|
segmentInfo, err := NewImportSegmentInfo(syncTask, iTask)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.manager.Update(task.GetTaskID(), UpdateSegmentInfo(segmentInfo))
|
|
log.Info("sync import data done", WrapLogFields(task, zap.Any("segmentInfo", segmentInfo))...)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *scheduler) Sync(task *ImportTask, hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
|
|
log.Info("start to sync import data", WrapLogFields(task)...)
|
|
futures := make([]*conc.Future[struct{}], 0)
|
|
syncTasks := make([]syncmgr.Task, 0)
|
|
segmentImportedSizes := make(map[int64]int)
|
|
for channelIdx, datas := range hashedData {
|
|
channel := task.GetVchannels()[channelIdx]
|
|
for partitionIdx, data := range datas {
|
|
if data.GetRowNum() == 0 {
|
|
continue
|
|
}
|
|
partitionID := task.GetPartitionIDs()[partitionIdx]
|
|
size := data.GetMemorySize()
|
|
segmentID := PickSegment(task, segmentImportedSizes, channel, partitionID, size)
|
|
syncTask, err := NewSyncTask(task.GetCtx(), task, segmentID, partitionID, channel, data)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
segmentImportedSizes[segmentID] += size
|
|
future := s.syncMgr.SyncData(task.GetCtx(), syncTask)
|
|
futures = append(futures, future)
|
|
syncTasks = append(syncTasks, syncTask)
|
|
}
|
|
}
|
|
return futures, syncTasks, nil
|
|
}
|