milvus/internal/datanode/importv2/task.go

322 lines
8.7 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"context"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type TaskType int
const (
PreImportTaskType TaskType = 0
ImportTaskType TaskType = 1
)
var ImportTaskTypeName = map[TaskType]string{
0: "PreImportTask",
1: "ImportTask",
}
func (t TaskType) String() string {
return ImportTaskTypeName[t]
}
type TaskFilter func(task Task) bool
func WithStates(states ...datapb.ImportTaskStateV2) TaskFilter {
return func(task Task) bool {
for _, state := range states {
if task.GetState() == state {
return true
}
}
return false
}
}
func WithType(taskType TaskType) TaskFilter {
return func(task Task) bool {
return task.GetType() == taskType
}
}
type UpdateAction func(task Task)
func UpdateState(state datapb.ImportTaskStateV2) UpdateAction {
return func(t Task) {
switch t.GetType() {
case PreImportTaskType:
t.(*PreImportTask).PreImportTask.State = state
case ImportTaskType:
t.(*ImportTask).ImportTaskV2.State = state
}
}
}
func UpdateReason(reason string) UpdateAction {
return func(t Task) {
switch t.GetType() {
case PreImportTaskType:
t.(*PreImportTask).PreImportTask.Reason = reason
case ImportTaskType:
t.(*ImportTask).ImportTaskV2.Reason = reason
}
}
}
func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction {
return func(task Task) {
if it, ok := task.(*PreImportTask); ok {
it.PreImportTask.FileStats[idx].FileSize = fileStat.GetFileSize()
it.PreImportTask.FileStats[idx].TotalRows = fileStat.GetTotalRows()
it.PreImportTask.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize()
it.PreImportTask.FileStats[idx].HashedStats = fileStat.GetHashedStats()
}
}
}
func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction {
mergeFn := func(current []*datapb.FieldBinlog, new []*datapb.FieldBinlog) []*datapb.FieldBinlog {
for _, binlog := range new {
fieldBinlogs, ok := lo.Find(current, func(log *datapb.FieldBinlog) bool {
return log.GetFieldID() == binlog.GetFieldID()
})
if !ok || fieldBinlogs == nil {
current = append(current, binlog)
} else {
fieldBinlogs.Binlogs = append(fieldBinlogs.Binlogs, binlog.Binlogs...)
}
}
return current
}
return func(task Task) {
if it, ok := task.(*ImportTask); ok {
segment := info.GetSegmentID()
if _, ok = it.segmentsInfo[segment]; ok {
it.segmentsInfo[segment].ImportedRows = info.GetImportedRows()
it.segmentsInfo[segment].Binlogs = mergeFn(it.segmentsInfo[segment].Binlogs, info.GetBinlogs())
it.segmentsInfo[segment].Statslogs = mergeFn(it.segmentsInfo[segment].Statslogs, info.GetStatslogs())
return
}
it.segmentsInfo[segment] = info
}
}
}
type Task interface {
GetJobID() int64
GetTaskID() int64
GetCollectionID() int64
GetPartitionIDs() []int64
GetVchannels() []string
GetType() TaskType
GetState() datapb.ImportTaskStateV2
GetReason() string
GetSchema() *schemapb.CollectionSchema
GetCtx() context.Context
GetOptions() []*commonpb.KeyValuePair
Cancel()
Clone() Task
}
type PreImportTask struct {
*datapb.PreImportTask
ctx context.Context
cancel context.CancelFunc
partitionIDs []int64
vchannels []string
schema *schemapb.CollectionSchema
options []*commonpb.KeyValuePair
}
func NewPreImportTask(req *datapb.PreImportRequest) Task {
fileStats := lo.Map(req.GetImportFiles(), func(file *internalpb.ImportFile, _ int) *datapb.ImportFileStats {
return &datapb.ImportFileStats{
ImportFile: file,
}
})
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
return &PreImportTask{
PreImportTask: &datapb.PreImportTask{
JobID: req.GetJobID(),
TaskID: req.GetTaskID(),
CollectionID: req.GetCollectionID(),
State: datapb.ImportTaskStateV2_Pending,
FileStats: fileStats,
},
ctx: ctx,
cancel: cancel,
partitionIDs: req.GetPartitionIDs(),
vchannels: req.GetVchannels(),
schema: req.GetSchema(),
options: req.GetOptions(),
}
}
func (p *PreImportTask) GetPartitionIDs() []int64 {
return p.partitionIDs
}
func (p *PreImportTask) GetVchannels() []string {
return p.vchannels
}
func (p *PreImportTask) GetType() TaskType {
return PreImportTaskType
}
func (p *PreImportTask) GetSchema() *schemapb.CollectionSchema {
return p.schema
}
func (p *PreImportTask) GetOptions() []*commonpb.KeyValuePair {
return p.options
}
func (p *PreImportTask) GetCtx() context.Context {
return p.ctx
}
func (p *PreImportTask) Cancel() {
p.cancel()
}
func (p *PreImportTask) Clone() Task {
ctx, cancel := context.WithCancel(p.GetCtx())
return &PreImportTask{
PreImportTask: proto.Clone(p.PreImportTask).(*datapb.PreImportTask),
ctx: ctx,
cancel: cancel,
partitionIDs: p.GetPartitionIDs(),
vchannels: p.GetVchannels(),
schema: p.GetSchema(),
options: p.GetOptions(),
}
}
type ImportTask struct {
*datapb.ImportTaskV2
ctx context.Context
cancel context.CancelFunc
segmentsInfo map[int64]*datapb.ImportSegmentInfo
req *datapb.ImportRequest
metaCaches map[string]metacache.MetaCache
}
func NewImportTask(req *datapb.ImportRequest) Task {
ctx, cancel := context.WithCancel(context.Background())
// During binlog import, even if the primary key's autoID is set to true,
// the primary key from the binlog should be used instead of being reassigned.
if importutilv2.IsBackup(req.GetOptions()) {
UnsetAutoID(req.GetSchema())
}
task := &ImportTask{
ImportTaskV2: &datapb.ImportTaskV2{
JobID: req.GetJobID(),
TaskID: req.GetTaskID(),
CollectionID: req.GetCollectionID(),
State: datapb.ImportTaskStateV2_Pending,
},
ctx: ctx,
cancel: cancel,
segmentsInfo: make(map[int64]*datapb.ImportSegmentInfo),
req: req,
}
task.initMetaCaches(req)
return task
}
func (t *ImportTask) initMetaCaches(req *datapb.ImportRequest) {
metaCaches := make(map[string]metacache.MetaCache)
schema := typeutil.AppendSystemFields(req.GetSchema())
for _, channel := range req.GetVchannels() {
info := &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
CollectionID: req.GetCollectionID(),
ChannelName: channel,
},
Schema: schema,
}
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
metaCaches[channel] = metaCache
}
t.metaCaches = metaCaches
}
func (t *ImportTask) GetType() TaskType {
return ImportTaskType
}
func (t *ImportTask) GetPartitionIDs() []int64 {
return t.req.GetPartitionIDs()
}
func (t *ImportTask) GetVchannels() []string {
return t.req.GetVchannels()
}
func (t *ImportTask) GetSchema() *schemapb.CollectionSchema {
return t.req.GetSchema()
}
func (t *ImportTask) GetOptions() []*commonpb.KeyValuePair {
return t.req.GetOptions()
}
func (t *ImportTask) GetCtx() context.Context {
return t.ctx
}
func (t *ImportTask) Cancel() {
t.cancel()
}
func (t *ImportTask) GetSegmentsInfo() []*datapb.ImportSegmentInfo {
return lo.Values(t.segmentsInfo)
}
func (t *ImportTask) Clone() Task {
ctx, cancel := context.WithCancel(t.GetCtx())
return &ImportTask{
ImportTaskV2: proto.Clone(t.ImportTaskV2).(*datapb.ImportTaskV2),
ctx: ctx,
cancel: cancel,
segmentsInfo: t.segmentsInfo,
req: t.req,
metaCaches: t.metaCaches,
}
}